Source code for catatom2osm.osm

"""OpenStreetMap data model."""
from collections import Counter, defaultdict

# Number of significant decimal digits. 0 to cancel rounding. With a value
# greater than 7, JOSM give duplicated points errors
COOR_DIGITS = 0


[docs]class Osm(object): """Class to implement a OSM data set.""" def __init__(self, upload="never", generator=None): self.upload = upload self.version = "0.6" self.generator = generator self.counter = 0 self.parents = defaultdict(set) self.elements = set() self.index = {} # elements by id self.tags = {} self.note = None self.meta = None self._attr_list = ("upload", "version", "generator") @property def nodes(self): """Return list of nodes in elements.""" return [e for e in self.elements if isinstance(e, Node)] @property def ways(self): """Return list of ways in elements.""" return [e for e in self.elements if isinstance(e, Way)] @property def relations(self): """Return list of relations in elements.""" return [e for e in self.elements if isinstance(e, Relation)] @property def attrs(self): """Return dictionary of properties in self._attr_list.""" attrs = { k: getattr(self, k, None) for k in self._attr_list if getattr(self, k, None) is not None } if self.upload in ["yes", "upload"]: attrs.pop("upload") return attrs
[docs] def get(self, eid, etype="n"): """Return element by its id.""" eid = str(eid) if eid[0] not in "nwr": eid = etype[0].lower() + eid return self.index[eid]
[docs] def remove(self, el): """Remove el from element, from its parents and its orphaned children.""" self.elements.discard(el) if el.fid in self.index: del self.index[el.fid] for parent in frozenset(self.parents[el]): parent.remove(el) for child in el.childs: if isinstance(child, Element): if self.parents[child] == set([el]): self.remove(child) else: try: self.parents[child].remove(el) except KeyError: pass
[docs] def replace(self, n1, n2): """Replace n1 witn n2 in elements.""" n1.container = None self.elements.discard(n1) del self.index[n1.fid] n2.container = self self.elements.add(n2) self.index[n2.fid] = n2 self.parents[n2] = self.parents[n1] del self.parents[n1]
[docs] def merge_duplicated(self): """Merge elements with the same geometry.""" geomdupes = defaultdict(list) for el in self.elements: geomdupes[el.geometry()].append(el) for geom, dupes in list(geomdupes.items()): if len(dupes) > 1: i = 0 # first element in dupes with different tags or id while i < len(dupes) - 1 and dupes[i] == geom: i += 1 # see __eq__ method of Element for el in dupes: if el is not dupes[i] and el == dupes[i]: for parent in frozenset(self.parents[el]): parent.container = self parent.replace(el, dupes[i]) self.replace(el, dupes[i]) for way in self.ways: way.clean_duplicated_nodes()
[docs] def append(self, data, query=None): """ Append data elements to this dataset avoiding duplicates. Optionally filter by query. Args: data (iterable, Osm or Element ): source data query (func): function to apply to each element and returns a boolean deciding if it will be included or not Example: >>> d1 = osm.Osm() >>> n1 = d1.Node(1, 1, tags={'tourism': 'viewpoint'}) >>> n2 = d1.Node(2, 2, tags={'amenity': 'cafe'}) >>> d2 = osm.Osm() >>> d2.append(d1) # all >>> d3 = osm.Osm() >>> d3.append(d1, lambda e: 'amenity' in e.tags) # only amenities """ if isinstance(data, Element): if not query or query(data): data.copyto(self) else: if isinstance(data, Osm): data = data.elements for el in data: self.append(el, query)
def __getattr__(self, name): """ Help to create elements. Example: >>> d = osm.Osm() >>> n = d.Node(1,1) # instead of >>> n = osm.Node(d, 1, 1) """ if name in ["Node", "Way", "Relation", "Polygon", "MultiPolygon"]: cls = globals()[name] return lambda *args, **kwargs: cls(self, *args, **kwargs) raise AttributeError
[docs] @staticmethod def get_outline(elements): """For a set of elements, get all Way and outer ways in MultiPolygon.""" outline = [] for el in elements: if isinstance(el, Way): outline.append(el) elif isinstance(el, Relation): outline += [m.element for m in el.members if m.role == "outer"] return outline
[docs]class Element(object): """Base class for Osm elements.""" def __init__(self, container, tags={}, attrs={}): """Each element must belong to a container OSM dataset.""" self.container = container self.action = "modify" self.visible = "true" self.tags = dict((k, v) for (k, v) in list(tags.items())) self.version = None self.timestamp = None self.changeset = None self.uid = None self.user = None self._attr_list = ( "id", "action", "visible", "version", "timestamp", "changeset", "uid", "user", ) self.attrs = dict((k, v) for (k, v) in list(attrs.items())) if not hasattr(self, "id"): container.counter -= 1 self.id = container.counter container.elements.add(self) container.index[self.fid] = self def __eq__(self, other): """Test equality to determine if two elements could be merged.""" if isinstance(other, self.__class__): a = dict(self.__dict__) b = dict(other.__dict__) a.pop("container", None) b.pop("container", None) if other.is_new() or self.is_new(): a["id"] = 0 if other.is_new() or self.is_new(): b["id"] = 0 if b["tags"] == {}: a["tags"] = {} if a["tags"] == {}: b["tags"] = {} return a == b elif self.is_new() and self.tags == {}: return self.geometry() == other return False def __ne__(self, other): return not self.__eq__(other) def __hash__(self): return id(self)
[docs] def is_new(self): """Return true if this element is new to OSM.""" return self.id <= 0
@property def type(self): """Return class name as string.""" return self.__class__.__name__.lower() @property def fid(self): """Return id as unique string.""" return self.type[0] + str(self.id) @property def attrs(self): """Return the element attributes as a dictionary.""" attrs = { k: getattr(self, k, None) for k in self._attr_list if getattr(self, k, None) is not None } if "id" in attrs: attrs["id"] = str(attrs["id"]) return attrs @attrs.setter def attrs(self, attrs): """Set the element attributes from a dictionary.""" for (k, v) in attrs.items(): if k == "id": v = int(v) if k in self._attr_list: setattr(self, k, v)
[docs]class Node(Element): """Define a node as a pair of coordinates.""" def __init__(self, container, x, y=0, *args, **kwargs): """ Construct a node. Use any of this: >>> d = osm.Osm() >>> n1 = d.Node(1,1) >>> p = (1,1) >>> n2 = d.Node(p) """ super(Node, self).__init__(container, *args, **kwargs) (self.x, self.y) = (x[0], x[1]) if hasattr(x, "__getitem__") else (x, y) if COOR_DIGITS: self.x = round(self.x, COOR_DIGITS) self.y = round(self.y, COOR_DIGITS) self._attr_list = self._attr_list + ("lon", "lat") def __getitem__(self, key): """Commodity getter. n[0], n[1] is equivalent to n.x, n.y.""" if key not in (0, 1): raise IndexError return self.x if key == 0 else self.y
[docs] def geometry(self): """Return pair of coordinates.""" return (self.x, self.y)
@property def childs(self): """Only for simetry with ways and relations.""" return set() @property def lon(self): """Return longitude as string.""" return str(self.x) @lon.setter def lon(self, value): """Set longitude from string.""" self.x = float(value) @property def lat(self): """Return latitude as string.""" return str(self.y) @lat.setter def lat(self, value): """Set latitude from string.""" self.y = float(value)
[docs] def copyto(self, container): """Copy self in another container.""" if self.fid not in container.index.keys(): container.Node(self.geometry(), tags=self.tags, attrs=self.attrs)
def __str__(self): return str((self.x, self.y))
[docs]class Way(Element): """Define a way as a list of nodes.""" def __init__(self, container, nodes=[], *args, **kwargs): """ Construct a way. Use any of this: >>> d = osm.Osm() >>> n1 = d.Node(1,1) >>> n2 = d.Node(2,2) >>> w = d.Way([n1, n2]) >>> w = d.Way([(1,1), (2,2)]) """ super(Way, self).__init__(container, *args, **kwargs) self.nodes = [] for n in nodes: self.append(n) @property def childs(self): """Return set of unique nodes.""" return set(self.nodes)
[docs] def is_closed(self): """Return true if the way is closed.""" return (len(self.nodes) > 2) and self.nodes[0] == self.nodes[-1]
[docs] def is_open(self): """Return true if the way is not closed.""" return (len(self.nodes) > 1) and self.nodes[0] != self.nodes[-1]
[docs] def shoelace(self): """Return the area for a closed way or 0, + for CCW nodes, - for CW.""" s = 0 if self.is_closed(): for i in range(len(self.nodes) - 1): n1 = self.nodes[i] n2 = self.nodes[i + 1] s += n1.x * n2.y - n2.x * n1.y return s
[docs] def append(self, n): """Append n to nodes.""" if not isinstance(n, Node): n = Node(self.container, n) self.nodes.append(n) self.container.parents[n].add(self)
[docs] def remove(self, n): """Remove n from nodes.""" self.nodes = [o for o in self.nodes if o is not n] self.container.parents[n].remove(self)
[docs] def replace(self, n1, n2): """Replace first occurence of node n1 with n2.""" self.nodes = [n2 if n is n1 else n for n in self.nodes] self.container.parents[n1].remove(self) self.container.parents[n2].add(self)
def __eq__(self, other): """Test equality to determine if two elements could be merged.""" if self.is_open(): return super(Way, self).__eq__(other) elif isinstance(other, self.__class__): a = dict(self.__dict__) b = dict(other.__dict__) a.pop("container", None) b.pop("container", None) if other.is_new() or self.is_new(): a["id"] = 0 if other.is_new() or self.is_new(): b["id"] = 0 if b["tags"] == {}: a["tags"] = {} if a["tags"] == {}: b["tags"] = {} a["nodes"] = self.geometry() b["nodes"] = other.geometry() return a == b elif self.is_new() and self.tags == {}: if hasattr(other, "index"): i = other.index(min(other)) return self.geometry() == other[i:] + other[1 : i + 1] return False def __ne__(self, other): return not self.__eq__(other) def __hash__(self): return id(self)
[docs] def geometry(self): """Return tuple of coordinates.""" g = tuple(n.geometry() for n in self.nodes) if self.is_closed(): i = g.index(min(g)) g = g[i:] + g[1 : i + 1] if self.shoelace() < 0: g = g[::-1] return g
[docs] def clean_duplicated_nodes(self): """Remove consecutive duplicated nodes.""" if self.nodes: merged = [self.nodes[0]] for i, n in enumerate(self.nodes[1:]): if n != self.nodes[i]: merged.append(n) self.nodes = merged
[docs] def search_node(self, x, y): """Return osm node of way in the given position or None.""" result = None for n in self.nodes: if n.x == x and n.y == y: result = n break return result
[docs] def copyto(self, container): """Copy self in another container.""" if self.fid not in container.index.keys(): for n in self.nodes: n.copyto(container) container.Way(nodes=self.nodes, tags=self.tags, attrs=self.attrs)
[docs]class Relation(Element): """A relation is a collection of nodes, ways or relations with a role.""" def __init__(self, container, members=[], *args, **kwargs): super(Relation, self).__init__(container, *args, **kwargs) self.members = [] for m in members: self.append(m) @property def childs(self): """Return set of unique members elements.""" return set([m.element for m in self.members])
[docs] def append(self, m, role=None): """Add a member.""" if not isinstance(m, Relation.Member): m = Relation.Member(m, role) self.members.append(m) self.container.parents[m.element].add(self)
[docs] def remove(self, e): """Remove e from members.""" self.members = [m for m in self.members if m.element is not e] self.container.parents[e].remove(self)
[docs] def replace(self, e1, e2): """Replace first occurrence of element e1 with e2.""" self.members = [ Relation.Member(e2, m.role) if m.element == e1 else m for m in self.members ] self.container.parents[e1].remove(self) self.container.parents[e2].add(self)
[docs] def is_valid_multipolygon(self): """Return true if this is valid as a multipolygon relation.""" ends = [] for m in self.members: if m.role not in ("outer", "inner") or m.type != "way": return False w = m.element if len(w.nodes) < 2: return False ends.append(w.nodes[0].geometry()) ends.append(w.nodes[-1].geometry()) is_conected = all([c == 2 for c in list(Counter(ends).values())]) return is_conected
[docs] def geometry(self): """Return tuple of coordinates.""" return tuple(m.element.geometry() for m in self.members)
[docs] def outer_geometry(self): """Return equivalent geometry removing inner rings.""" if not self.is_valid_multipolygon(): return [] outer = [m.element.geometry() for m in self.members if m.role == "outer"] i = 0 while i < len(outer): w1 = outer[i] if len(w1) > 1 and w1[0] != w1[-1]: match = True while match: match = False for w2 in frozenset(outer[i + 1 :]): w1 = outer[i] if len(w2) > 1 and w2[0] != w2[-1]: if w2[0] == w1[-1]: outer[i] = w1 + w2[1:] outer.remove(w2) match = True elif w2[-1] == w1[-1]: outer[i] = w1 + w2[-2::-1] outer.remove(w2) match = True i += 1 return outer
[docs] def copyto(self, container): """Copy self in another container.""" if self.fid not in container.index.keys(): for m in self.members: m.element.copyto(container) tags = self.tags attrs = self.attrs container.Relation(members=self.members, tags=tags, attrs=attrs)
[docs] class Member(object): """An element is member of a relation with a role.""" def __init__(self, element, role=None): self.element = element self.role = role def __eq__(self, other): """Test equality to determine if two elements could be merged.""" if isinstance(other, self.__class__): return self.__dict__ == other.__dict__ else: return False def __ne__(self, other): return not self.__eq__(other) @property def type(self): if isinstance(self.element, Node): return "node" elif isinstance(self.element, Way): return "way" return "relation" @property def ref(self): return self.element.id if hasattr(self.element, "id") else None @property def attrs(self): attrs = dict(type=self.type, ref=str(self.ref)) if self.role is not None: attrs["role"] = self.role return attrs
[docs]class Polygon(Relation): """Helper to create a multipolygon type relation with only one outer ring.""" def __init__(self, container, rings=[], *args, **kwargs): super(Polygon, self).__init__(container, *args, **kwargs) self.tags["type"] = "multipolygon" role = "outer" for ring in rings: if isinstance(ring, Way): self.append(ring, role) else: self.append(Way(container, ring), role) role = "inner"
[docs]class MultiPolygon(Polygon): """Helper to create a multipolygon type relation.""" def __init__(self, container, parts=[], *args, **kwargs): super(MultiPolygon, self).__init__(container, *args, **kwargs) for part in parts: role = "outer" for ring in part: if isinstance(ring, Way): self.append(ring, role) else: self.append(Way(container, ring), role) role = "inner"