Source code for catatom2osm.osmxml

"""OSM XML format serializer."""
import logging

from lxml import etree

from catatom2osm import config, osm

log = logging.getLogger(config.app_name)


[docs]def write_elem(outfile, e): try: outfile.write(etree.tostring(e, pretty_print=True).decode()) except TypeError: # pragma: no cover outfile.write(etree.tostring(e).decode())
[docs]def serialize(outfile, data): """Output XML for an OSM data set.""" outfile.write("<?xml version='1.0' encoding='UTF-8'?>\n") attrs = "".join([" {}='{}'".format(k, v) for (k, v) in data.attrs.items()]) outfile.write("<osm{}>\n".format(attrs)) if data.note is not None: e = etree.Element("note") e.text = data.note write_elem(outfile, e) if data.meta is not None: e = etree.Element("meta") for (k, v) in data.meta.items(): e.set(k, v) write_elem(outfile, e) if data.tags: e = etree.Element("changeset") for (key, value) in data.tags.items(): e.append(etree.Element("tag", dict(k=key, v=str(value)))) write_elem(outfile, e) for node in data.nodes: e = etree.Element("node", node.attrs) for key, value in node.tags.items(): e.append(etree.Element("tag", dict(k=key, v=str(value)))) write_elem(outfile, e) for way in data.ways: e = etree.Element("way", way.attrs) for node in way.nodes: e.append(etree.Element("nd", dict(ref=str(node.id)))) for key, value in way.tags.items(): e.append(etree.Element("tag", dict(k=key, v=str(value)))) write_elem(outfile, e) for rel in data.relations: e = etree.Element("relation", rel.attrs) for m in rel.members: e.append(etree.Element("member", m.attrs)) for key, value in rel.tags.items(): e.append(etree.Element("tag", dict(k=key, v=str(value)))) write_elem(outfile, e) outfile.write("</osm>\n")
[docs]def deserialize(infile, data=None): """Generate a OSM data set from OSM XML or append to existing data.""" if data is None: data = osm.Osm() context = etree.iterparse(infile, events=("end",)) childs = [] tags = {} for event, elem in context: if elem.tag == "osm": data.upload = elem.get("upload") data.version = elem.get("version") data.generator = elem.get("generator") elif elem.tag == "changeset": data.tags = tags tags = {} elif elem.tag == "note": data.note = str(elem.text) elif elem.tag == "meta": data.meta = dict(elem.attrib) elif elem.tag == "node": lon = float(elem.get("lon")) lat = float(elem.get("lat")) n = data.Node(lon, lat, tags=tags, attrs=dict(elem.attrib)) tags = {} elif elem.tag == "way": w = data.Way(tags=tags, attrs=dict(elem.attrib)) w.nodes = childs childs = [] tags = {} elif elem.tag == "nd": childs.append(elem.get("ref")) elif elem.tag == "relation": r = data.Relation(tags=tags, attrs=dict(elem.attrib)) r.members = childs childs = [] tags = {} elif elem.tag == "member": childs.append( { "ref": elem.get("ref"), "type": elem.get("type"), "role": elem.get("role"), } ) elif elem.tag == "tag": tags[elem.get("k")] = elem.get("v") elem.clear() if hasattr(elem, "xpath"): for ancestor in elem.xpath("ancestor-or-self::*"): while ancestor.getprevious() is not None: del ancestor.getparent()[0] del context for way in data.ways: missing = [] for i, ref in enumerate(way.nodes): if isinstance(ref, str): if "n{}".format(ref) in data.index: n = data.get(ref) way.nodes[i] = n data.parents[n].add(way) else: missing.append(i) if len(missing) > 0: for i in sorted(missing, reverse=True): way.nodes.pop(i) if way.version is not None: way.version = str(int(way.version) + 1) for rel in data.relations: missing = [] for i, m in enumerate(rel.members): if isinstance(m, dict): if m["type"][0].lower() + str(m["ref"]) in data.index: el = data.get(m["ref"], m["type"]) rel.members[i] = osm.Relation.Member(el, m["role"]) data.parents[el].add(rel) else: missing.append(i) if len(missing) > 0: for i in sorted(missing, reverse=True): rel.members.pop(i) if rel.version is not None: rel.version = str(int(rel.version) + 1) return data