Source code for catatom2osm.boundary

import io
import json
import os
import re

from lxml import etree

from catatom2osm import config, csvtools, download, hgwnames, osmxml, overpass
from catatom2osm.exceptions import CatValueError


[docs]def list_code(code): if code == "99": list_provincial_offices() else: code = f"{code:>02}" if len(code) > 2: list_districts(code) else: list_municipalities(code)
[docs]def list_provincial_offices(): title = _("Territorial office") print(title) print("=" * len(title)) for code, prov in config.prov_codes.items(): print(f"{code} {prov}")
[docs]def get_districts(code): id, name = get_municipality(code) query = overpass.Query(id) query.add('wr["boundary"="administrative"]["admin_level"="9"]') query.add('wr["boundary"="administrative"]["admin_level"="10"]') result = query.read() with io.BytesIO(result) as fo: data = osmxml.deserialize(fo) district = {} subarea = [] ward = [] for e in data.elements: if e.tags.get("boundary", "") == "administrative": if e.type == "relation": if e.tags.get("admin_level", "") == "9": id = e.id district[id] = {"boundary": e, "subarea": []} for m in e.members: if m.role == "subarea": district[id]["subarea"].append(m.element) subarea.append(m.element.id) if m.element in ward: ward.remove(m.element) elif e.tags.get("admin_level", "") == "10" and e.id not in subarea: ward.append(e) elif e.type == "way" and e.is_closed(): if e.tags.get("admin_level", "") == "9": district[e.id] = {"boundary": e} elif e.tags.get("admin_level", "") == "10": ward.append(e) by_name_dist = lambda d: d["boundary"].tags.get("name", "") by_name_ward = lambda w: w.tags.get("name", "") districts = [] for d in sorted(district.values(), key=by_name_dist): e = d["boundary"] districts.append((False, str(e.id), _("District"), e.tags.get("name", ""))) subarea = d.get("subarea", []) for m in sorted(subarea, key=by_name_ward): districts.append((True, str(m.id), _("Ward"), m.tags.get("name", ""))) for w in sorted(ward, key=by_name_ward): districts.append((False, str(w.id), _("Ward"), w.tags.get("name", ""))) return districts
[docs]def list_districts(code): districts = get_districts(code) for row in districts: tab = " " if row[0] else "" print(tab + " ".join(row[1:]))
[docs]def get_municipality(mun_code): fn = os.path.join(config.app_path, "municipalities.csv") result = csvtools.get_key(fn, mun_code) if result: __, id, name = result return (id, name) return (None, None)
[docs]def search_municipality(name, bounding_box): if bounding_box is None: return (None, None) query = overpass.Query(bounding_box, "json", False, False) query.add('rel["admin_level"="8"]') try: data = json.loads(query.read()) matching = hgwnames.dsmatch( name, data["elements"], lambda e: e["tags"].get("name", "") ) except ConnectionError: pass if matching: id = str(matching["id"]) name = matching["tags"]["name"] return (id, name) return (None, None)
[docs]def get_municipalities(prov_code): url = config.prov_url["BU"].format(code=prov_code) response = download.get_response(url) root = etree.fromstring(response.content) ns = {"atom": "http://www.w3.org/2005/Atom"} municipios = [] for entry in root.findall("atom:entry", namespaces=ns): row = entry.find("atom:title", ns).text.replace("buildings", "") row = row.replace("Territorial office", "") municipios.append(row.strip().split("-")) return municipios
[docs]def list_municipalities(prov_code): municipalities = get_municipalities(prov_code) if prov_code not in config.prov_codes.keys(): msg = _("Province code '%s' is not valid") % prov_code raise CatValueError(msg) office = config.prov_codes[prov_code] title = _("Territorial office %s - %s") % (prov_code, office) print(title) print("=" * len(title)) for mun_code, mun_name in municipalities: print(f"{mun_code} {mun_name}")
[docs]def get_boundary(cat_path, boundary_search_area, id_or_name): query = overpass.Query(boundary_search_area) if re.search(r"^[0-9]+$", id_or_name): query.add(f"wr({id_or_name})") else: query.add(f'wr["boundary"="administrative"]["name"="{id_or_name}"]') result = query.read() with io.BytesIO(result) as fo: data = osmxml.deserialize(fo) fn = id_or_name for e in data.elements: if e.type == "relation" or (e.type == "way" and e.is_closed()): if e.tags.get("boundary", "") == "administrative": fn = e.tags.get("name", fn) break fn = os.path.join(cat_path, fn.replace(" ", "_") + ".osm") with open(fn, "wb") as fo: fo.write(result) fn += "|layername=multipolygons" return fn