"""Main application processes."""
import codecs
import gzip
import io
import logging
import os
import shutil
from collections import defaultdict
from glob import glob
from zipfile import ZIP_DEFLATED, ZipFile
# isort: off
from past.builtins import basestring # NOQA: F401 - qgis/utils.py:744: Warning
# isort: on
import qgis.utils
from osgeo import gdal
from qgis.core import QgsApplication, QgsGeometry, QgsVectorLayer
from catatom2osm import cdau # NOQA: F401 - Used in get_auxiliary_addresses
from catatom2osm import boundary, catatom, cbcn, config, csvtools, geo, osmxml, overpass
from catatom2osm.exceptions import CatIOError, CatValueError
from catatom2osm.report import instance as report
qgis.utils.uninstallErrorHook()
qgis_utils = getattr(qgis.utils, "QGis", getattr(qgis.utils, "Qgis", None))
log = logging.getLogger(config.app_name)
if config.silence_gdal:
gdal.PushErrorHandler("CPLQuietErrorHandler")
tasks_folder = "tasks"
[docs]class QgsSingleton(QgsApplication):
"""Keep a unique instance of QGIS for the application (and tests)."""
_qgs = None
def __new__(cls):
if QgsSingleton._qgs is None:
# Init qGis API
QgsSingleton._qgs = QgsApplication([], False)
qgis_prefix = os.getenv("QGISHOME")
if qgis_prefix:
QgsApplication.setPrefixPath(qgis_prefix, True)
QgsApplication.initQgis()
# sets GDAL to convert xlink references to fields but not resolve
gdal.SetConfigOption("GML_ATTRIBUTES_TO_OGR_FIELDS", "YES")
gdal.SetConfigOption("GML_SKIP_RESOLVE_ELEMS", "ALL")
return QgsSingleton._qgs
[docs]class CatAtom2Osm(object):
"""Main application class."""
def __init__(self, a_path, options):
"""
Application constructor.
Args:
a_path (str): Directory where the source files are located.
options (dict): Dictionary of options.
"""
self.options = options
self.cat = catatom.Reader(a_path)
self.path = self.cat.path
report.clear(options=self.options.args, mun_code=self.cat.zip_code)
if config.report_system_info:
report.qgs_version = qgis_utils.QGIS_VERSION
report.gdal_version = gdal.__version__
log.debug(_("Initialized QGIS %s API"), report.qgs_version)
log.debug(_("Using GDAL %s"), report.gdal_version)
if not options.building and not options.address:
options.address = True
options.building = True
opt = ""
if not self.options.address:
opt = "-b"
elif not self.options.building:
opt = "-d"
self.tasks_folder = tasks_folder + opt
self.tasks_path = self.cat.get_path(self.tasks_folder)
if self.options.zoning:
self.options.address = False
fn = self.options.split or ""
bkp_dir = os.path.splitext(os.path.basename(fn))[0]
self.bkp_path = self.cat.get_path(bkp_dir)
self.highway_names_path = self.cat.get_path("highway_names.csv")
self.is_new = not os.path.exists(self.highway_names_path)
self.source = "building"
self.aux_path = os.path.join(os.path.dirname(self.path), config.aux_path)
if self.options.address and not self.options.building:
self.source = "address"
[docs] @staticmethod
def create_and_run(a_path, options):
app = CatAtom2Osm(a_path, options)
app.run()
app.exit()
[docs] def run(self):
"""Launch the app processing."""
if self.options.comment:
self.add_comments()
return
self.get_boundary()
self.get_split()
if self.options.info:
self.get_parcel()
self.get_building()
self.get_address()
report.tags_for_info()
fn = f"_{self.options.split}" if self.options.split else ""
report.export(self.cat.get_path(f"info{fn}.json"))
return
if self.options.address and not self.is_new:
log.info(_("Resume processing '%s'"), report.mun_code)
self.resume_address()
else:
log.info(_("Start processing '%s'"), report.mun_code)
if report.get("split_name"):
log.info(_("Administrative boundary: '%s'"), report.split_name)
elif self.options.split:
log.info(_("Split: '%s'"), self.options.split)
self.get_parcel()
self.get_building()
self.get_zoning()
if self.options.zoning:
self.export_poly()
self.process_building()
self.process_parcel()
if self.options.address:
self.get_address()
self.stop_address()
return
if self.options.address:
self.process_address()
self.address.reproject()
if not self.options.zoning:
self.building.reproject()
self.process_tasks(getattr(self, self.source))
self.output_zoning()
self.finish()
[docs] def get_split(self):
"""Get boundary file for splitting."""
self.split = None
if self.options.split:
fn = self.options.split
if not os.path.exists(fn):
if "." not in os.path.basename(fn):
fn += ".osm"
if not os.path.exists(fn) and fn == os.path.basename(fn):
fn = self.cat.get_path(fn)
if fn.endswith(".osm"):
fn += "|layername=multipolygons"
split = geo.BaseLayer(fn, "zoningsplit", "ogr")
if split.isValid():
report.split_file = self.options.split
else:
msg = "Can't open %s" % self.options.split
fn = self.options.split
if os.path.basename(fn) == fn and "." not in fn:
report.split_id = fn
fn = boundary.get_boundary(self.path, self.boundary_search_area, fn)
name = fn.replace(".osm|layername=multipolygons", "")
report.split_name = name.split("/")[-1].replace("_", " ")
split = geo.BaseLayer(fn, "zoningsplit", "ogr")
if not split.isValid():
raise CatIOError(msg)
else:
raise CatIOError(msg)
self.split = geo.PolygonLayer("MultiPolygon", "split", "memory")
q = lambda f, __: f.geometry().wkbType() == geo.types.WKBMultiPolygon
self.split.append(split, query=q)
if self.split.featureCount() == 0:
msg = _("'%s' does not include any polygon") % self.options.split
raise CatValueError(msg)
[docs] def get_parcel(self):
"""Get parcels dataset."""
parcel_gml = self.cat.read("cadastralparcel")
report.cat_mun = self.cat.cat_mun
self.parcel = geo.ParcelLayer(self.cat.zip_code)
self.parcel.source_date = parcel_gml.source_date
q = None
if self.split:
if self.split.crs() != parcel_gml.crs():
self.split.reproject(parcel_gml.crs())
q = lambda f, __: self.split.is_inside_area(f)
elif self.options.parcel:
localid = self.options.parcel[0]
try:
pa = next(parcel_gml.search(f"localId = '{localid}'"))
except StopIteration:
msg = _("Parcel '%s' does not exists") % localid
raise CatValueError(msg)
bb = pa.geometry().boundingBox().buffered(config.parcel_buffer)
g = QgsGeometry.fromRect(bb)
q = lambda f, __: geo.tools.is_inside(f, g)
self.parcel_query = q
self.parcel.append(parcel_gml, query=q)
del parcel_gml
if self.parcel.featureCount() == 0:
raise CatValueError(_("No parcels data"))
[docs] def get_building(self):
"""Merge building, parts and pools."""
building_gml = self.cat.read("building")
other_gml = self.cat.read("otherconstruction", True)
if not self.options.info:
self.parcel.delete_void_parcels(building_gml, other_gml)
self.parcel.clean()
self.parcel.create_missing_parcels(building_gml, other_gml, split=self.split)
self.tasks = {f["localId"]: f["localId"] for f in self.parcel.getFeatures()}
self.building = geo.ConsLayer()
self.building.source_date = building_gml.source_date
q = None
if self.split or self.options.parcel:
q = lambda f, kw: self.building.get_id(f) in kw["keys"]
self.building.append(building_gml, query=q, keys=self.tasks.keys())
del building_gml
inbu = self.building.featureCount()
if inbu == 0:
raise CatValueError(_("No buildings data"))
if other_gml:
self.building.append(other_gml, query=q, keys=self.tasks.keys())
del other_gml
if self.options.address and not self.options.building:
return
inpo = self.building.featureCount() - inbu
part_gml = self.cat.read("buildingpart")
self.building.append(part_gml, query=q, keys=self.tasks.keys())
del part_gml
if self.options.building:
report.building_date = self.building.source_date
report.inp_features = self.building.featureCount()
report.inp_buildings = inbu
report.inp_pools = inpo
report.inp_parts = report.inp_features - inbu - inpo
[docs] def process_tasks(self, source):
"""Convert to osm for each task."""
if not os.path.exists(self.tasks_path):
os.makedirs(self.tasks_path)
tasks = self.get_tasks(source)
tasks_r = 0
tasks_u = 0
to_clean = []
to_change = {}
report.parcel_parts = config.parcel_parts
report.parcel_dist = config.parcel_dist
for pa in self.parcel.getFeatures():
label = pa["localId"]
task = tasks.get(label, None)
if task is None:
to_clean.append(pa.id())
continue
if len(pa["zone"]) == 3:
tasks_r += 1
else:
tasks_u += 1
comment = self.get_task_comment(label)
task_osm = task.to_osm(upload="yes", tags={"comment": comment})
if self.options.address and self.options.building:
self.merge_address(task_osm, self.address_osm)
if self.options.address:
report.address_stats(task_osm)
if self.options.building:
report.cons_stats(task_osm, label)
report.osm_stats(task_osm)
fp = self.cat.get_path(self.tasks_folder, label)
if self.split and os.path.exists(fp + ".osm.gz"):
if not os.path.exists(self.bkp_path):
n = len(glob(fp + "*.osm.gz"))
label = f"{label}-{n}"
pa["localId"] = label
to_change[pa.id()] = geo.tools.get_attributes(pa)
self.write_osm(task_osm, self.tasks_folder, label + ".osm.gz")
del task
if to_clean:
self.parcel.writer.deleteFeatures(to_clean)
log.debug(_("Removed %d void parcels"), len(to_clean))
if to_change:
self.parcel.writer.changeAttributeValues(to_change)
log.debug(_("Conflict with %d existing tasks"), len(to_change))
msg = _("Generated %d rustic and %d urban tasks files")
log.debug(msg, tasks_r, tasks_u)
report.tasks_r = tasks_r
report.tasks_u = tasks_u
[docs] def get_tasks(self, source):
"""Put each source feature into a task layer."""
if os.path.exists(self.tasks_path):
for fn in os.listdir(self.tasks_path):
if os.path.isfile(fn):
os.remove(os.path.join(self.tasks_path, fn))
tasks = {}
layer_class = type(source)
last_task = None
to_add = []
fcount = source.featureCount()
for i, feat in enumerate(source.getFeatures()):
localid = source.get_id(feat)
label = self.tasks.get(localid, localid)
if i == 0:
last_task = label
f = source.copy_feature(feat, {}, {})
if i == fcount - 1 or label == last_task:
to_add.append(f)
if i == fcount - 1 or label != last_task:
if last_task not in tasks:
tasks[last_task] = layer_class(baseName=last_task)
tasks[last_task].source_date = source.source_date
tasks[last_task].writer.addFeatures(to_add)
to_add = [f]
last_task = label
return tasks
[docs] def get_zoning(self):
"""Get zoning data."""
zoning_gml = self.cat.read("cadastralzoning")
self.rustic_zoning = geo.ZoningLayer(baseName="rusticzoning")
self.urban_zoning = geo.ZoningLayer(baseName="urbanzoning")
q = lambda f, kw: self.urban_zoning.check_zone(f, kw["level"])
if self.split or self.options.parcel:
q = lambda f, kw: (
self.urban_zoning.check_zone(f, kw["level"])
and self.parcel_query(f, kw)
)
self.rustic_zoning.append(zoning_gml, query=q, level="P")
self.urban_zoning.append(zoning_gml, query=q, level="M")
del zoning_gml
[docs] def get_boundary(self):
"""Get best boundary search area for overpass queries."""
id, name = boundary.get_municipality(self.cat.zip_code)
if id is None:
zoning_gml = self.cat.read("cadastralzoning")
id, name = boundary.search_municipality(
self.cat.cat_mun, zoning_gml.bounding_box()
)
if id is None:
msg = _("Municipality code '%s' don't exists") % self.cat.zip_code
raise CatValueError(msg)
self.boundary_search_area = id
report.mun_name = name
log.info(_("Municipality: '%s'"), name)
[docs] def output_zoning(self):
"""Generate project zoning file."""
if not self.options.parcel:
self.parcel.topology(dup_thr=20 * config.dup_thr)
self.parcel.set_muncode(self.cat.zip_code)
report.tasks = self.parcel.featureCount()
self.export_layer(self.parcel, "zoning.geojson", target_crs_id=4326)
fp = self.cat.get_path("zoning")
with ZipFile(fp + ".zip", "w", ZIP_DEFLATED) as zf:
zf.write(fp + ".geojson", "zoning.geojson")
[docs] def export_poly(self):
bpoly = geo.ZoningLayer()
bpoly.append(self.rustic_zoning, level="P")
bpoly.reproject()
fn = self.cat.get_path("boundary.poly")
bpoly.export_poly(fn)
log.info(_("Generated '%s'"), fn)
del bpoly
[docs] def process_building(self):
"""Process all buildings dataset."""
self.building.remove_outside_parts()
self.building.remove_parts_wo_building()
self.building.explode_multi_parts()
self.building.clean()
if log.app_level <= logging.DEBUG:
fn = "building.geojson"
self.export_layer(self.building, fn, "GeoJSON", target_crs_id=4326)
if self.options.building:
self.building.validate(report.max_level, report.min_level)
[docs] def process_parcel(self):
"""Process parcels dataset."""
self.parcel.set_zones(self.urban_zoning)
self.parcel.set_zones(self.rustic_zoning)
self.parcel.set_missing_zones()
tasks1 = self.parcel.merge_by_adjacent_buildings(self.building)
for k, v in self.tasks.items():
self.tasks[k] = tasks1.get(v, v)
tasks2 = self.parcel.merge_by_parts_count(
config.parcel_parts, config.parcel_dist
)
for k, v in self.tasks.items():
self.tasks[k] = tasks2.get(v, v)
[docs] def finish(self):
"""Generate final report."""
options = self.options
if log.app_level > logging.DEBUG:
geo.BaseLayer.delete_shp(self.cat.get_path("parcel.shp"))
geo.BaseLayer.delete_shp(self.cat.get_path("building.shp"))
if report.fixme_stats():
log.warning(_("Check %d fixme tags"), report.fixme_count)
fn = self.cat.get_path("review.txt")
fixmes = report.get_tasks_with_fixmes()
csvtools.dict2csv(fn, fixmes)
msg = _("Please, check it before publish")
log.info(_("Generated '%s'") + ". " + msg, fn)
if options.building:
report.cons_end_stats()
else:
report.clean_group("building")
report.validate()
report.to_file(self.cat.get_path("report.txt"))
report.export(self.cat.get_path("report.json"))
self.move_project()
log.info(_("Finished!"))
[docs] def exit(self):
"""Exit properly."""
for propname in list(self.__dict__.keys()):
if isinstance(getattr(self, propname), QgsVectorLayer):
delattr(self, propname)
[docs] def get_cbcn(self):
"""Read CartoBCN addresses."""
reader = cbcn.Reader(self.aux_path)
cbcn_src = reader.read()
self.address = cbcn.get_address(cbcn_src, self.parcel)
del cbcn_src
report.inp_address = self.address.featureCount()
report.inp_address_entrance = report.inp_address
report.inp_address_parcel = 0
self.address.remove_address_wo_building(self.building)
report.inp_zip_codes = 0
report.inp_street_names = self.address.count(unique="TN_text")
if self.split or self.options.parcel:
self.boundary_bbox = self.parcel.bounding_box()
self.export_layer(self.address, "address.geojson", target_crs_id=4326)
self.get_translations(self.address)
[docs] def get_address(self):
"""Read Address GML dataset."""
if self.cat.zip_code == "08900":
self.get_cbcn()
return
address_gml = self.cat.read("address")
report.address_date = address_gml.source_date
if address_gml.writer.fieldNameIndex("component_href") == -1:
address_gml = self.cat.read("address", force_zip=True)
if address_gml.writer.fieldNameIndex("component_href") == -1:
msg = (
_("Could not resolve joined tables for the '%s' layer")
% address_gml.name()
)
raise CatIOError(msg)
self.address = geo.AddressLayer(source_date=address_gml.source_date)
q = None
if self.split or self.options.parcel:
q = lambda f, kw: self.address.get_id(f) in kw["keys"] # NOQA: E731
self.boundary_bbox = self.parcel.bounding_box()
self.address.append(address_gml, query=q, keys=self.tasks.keys())
del address_gml
report.inp_address = self.address.featureCount()
report.inp_address_entrance = self.address.count("spec='Entrance'")
report.inp_address_parcel = self.address.count("spec='Parcel'")
self.get_auxiliary_addresses()
self.address.remove_address_wo_building(self.building)
if self.options.info:
return
if report.inp_address == 0:
msg = _("No addresses data")
if not self.options.building:
raise CatValueError(msg)
log.info(msg)
return
postaldescriptor = self.cat.read("postaldescriptor")
thoroughfarename = self.cat.read("thoroughfarename")
self.address.join_field(postaldescriptor, "PD_id", "gml_id", ["postCode"])
self.address.join_field(thoroughfarename, "TN_id", "gml_id", ["text"], "TN_")
del postaldescriptor, thoroughfarename
report.inp_zip_codes = self.address.count(unique="postCode")
report.inp_street_names = self.address.count(unique="TN_text")
self.export_layer(self.address, "address.geojson", target_crs_id=4326)
self.get_translations(self.address)
[docs] def process_address(self):
"""Fix street names, conflate and move addresses."""
highway_names = self.get_translations(self.address)
ia = self.address.translate_field("TN_text", highway_names)
if ia > 0:
log.debug(_("Deleted %d addresses refused by street name"), ia)
report.values["ignored_addresses"] = ia
if not self.is_new and not self.options.manual:
current_address = self.get_current_ad_osm()
self.address.conflate(current_address)
self.building.move_address(self.address)
self.address.reproject()
self.export_layer(self.address, "address_out.geojson")
self.address_osm = self.address.to_osm()
[docs] def stop_address(self):
"""Save current processing status and exits."""
self.export_layer(self.parcel, "parcel.shp", driver_name="ESRI Shapefile")
self.export_layer(self.building, "building.shp", driver_name="ESRI Shapefile")
self.address.reproject()
address_osm = self.address.to_osm()
self.write_osm(address_osm, "address.osm")
fn = self.cat.get_path("tasks.csv")
csvtools.dict2csv(fn, self.tasks)
report.to_file(self.cat.get_path("report.txt"))
report.export(self.cat.get_path("report.json"))
msg = _("Generated '%s'") % self.highway_names_path
msg += ". " + _("Please, check it and run again")
log.info(msg)
[docs] def resume_address(self):
"""Resume processing for second run of addresses dataset."""
report.from_file(self.cat.get_path("report.json"))
fn = self.cat.get_path("tasks.csv")
self.tasks = csvtools.csv2dict(fn, exists=True)
fn = self.cat.get_path("parcel.shp")
parcel = geo.ParcelLayer(self.cat.zip_code, fn, providerLib="ogr")
if not parcel.isValid() or parcel.featureCount() == 0:
raise CatValueError(_("No parcels data"))
self.parcel = geo.ParcelLayer(self.cat.zip_code)
self.parcel.append(parcel)
del parcel
fn = self.cat.get_path("building.shp")
building = geo.ConsLayer(fn, providerLib="ogr")
if not building.isValid() or building.featureCount() == 0:
raise CatValueError(_("No buildings data"))
self.building = geo.ConsLayer()
self.building.append(building)
del building
fn = self.cat.get_path("address.geojson")
address = geo.AddressLayer(fn, providerLib="ogr")
if not address.isValid() or address.featureCount() == 0:
raise CatValueError(_("No addresses data"))
self.address = geo.AddressLayer()
self.address.rename = {}
self.address.resolve = {}
self.address.append(address)
self.address.reproject(self.building.crs())
del address
if self.split or self.options.parcel:
self.boundary_bbox = self.parcel.bounding_box()
[docs] def get_auxiliary_addresses(self):
"""Read and conflate auxiliary sources of addresses data."""
for source in list(config.aux_address.keys()):
if self.cat.zip_code[:2] in config.aux_address[source]:
aux_source = globals()[source]
reader = aux_source.Reader(self.aux_path)
aux = reader.read(self.cat.zip_code[:2])
aux_source.conflate(aux, self.address, self.cat.zip_code, self.split)
[docs] def merge_address(self, building_osm, address_osm):
"""
Copy address from address_osm to building_osm using 'ref' tag.
If there exists one building with the same 'ref' that an address, copy
the address tags to the building if it isn't a 'entrace' type address or
else to the entrance if there exist a node with the address coordinates
in the building.
Precondition: building.move_address deleted addresses belonging to multiple
buildings
Args:
building_osm (Osm): OSM data set with buildings
address_osm (Osm): OSM data set with addresses
"""
if "source:date" in address_osm.tags:
building_osm.tags["source:date:addr"] = address_osm.tags["source:date"]
address_index = defaultdict(list)
building_index = defaultdict(list)
for bu in building_osm.elements:
if "ref" in bu.tags:
building_index[bu.tags["ref"]].append(bu)
for ad in address_osm.nodes:
if ad.tags["ref"] in building_index:
address_index[ad.tags["ref"]].append(ad)
md = 0
for (ref, group) in list(building_index.items()):
parcel_ad = []
entrance_count = 0
for ad in address_index[ref]:
entrance = False
if "entrance" in ad.tags:
for w in building_osm.get_outline(group):
entrance = w.search_node(ad.x, ad.y)
if entrance:
entrance.tags.update(ad.tags)
entrance.tags.pop("ref", None)
entrance.tags.pop("image", None)
break
if entrance:
entrance_count += 1
else:
parcel_ad.append(ad)
if len(parcel_ad) == 1 and entrance_count == 0:
ad = parcel_ad.pop()
bu = group[0]
bu.tags.update(ad.tags)
bu.tags.pop("image", None)
bu.tags.pop("entrance", None)
md += len(parcel_ad)
if md > 0:
log.debug(_("Refused %d 'parcel' addresses not unique for it building"), md)
report.inc("not_unique_addresses", md)
[docs] def get_translations(self, address):
"""
Get the translate file.
If there exists the translation file 'highway_types.csv', read it,
else write one with default values. If the translations file
'highway_names.csv' don't exist, creates one parsing current OSM
highways data, else reads and returns it as a dictionary.
* 'highway_types.csv' List of osm elements in json format located in the
application path that contains translations from abbreviations to full
types of highways.
* 'highway_names.csv' is located in the outputh folder and contains
corrections for original highway names.
"""
highway_types_path = os.path.join(config.app_path, "highway_types.csv")
if not os.path.exists(highway_types_path):
csvtools.dict2csv(highway_types_path, config.highway_types)
else:
csvtools.csv2dict(highway_types_path, config.highway_types)
if self.is_new:
if self.options.manual:
highway = None
place = None
else:
highway = self.get_highway()
highway.reproject(address.crs())
place = self.get_place()
place.reproject(address.crs())
highway_names = address.get_names(highway, place)
csvtools.dict2csv(self.highway_names_path, highway_names, sort=1)
else:
highway_names = csvtools.csv2dict(self.highway_names_path, {})
for key, value in list(highway_names.items()):
v = value if isinstance(value, str) else value[0]
highway_names[key] = v.strip()
return highway_names
[docs] def get_place(self):
"""Get OSM places for street names conflation."""
ql = [
'node["place"]["name"]',
'way["place"]["name"]',
'relation["place"]["name"]',
]
place_osm = self.read_osm("current_place.osm", ql=ql)
place = geo.PlaceLayer()
place.read_from_osm(place_osm)
del place_osm
return place
[docs] def get_highway(self):
"""Get OSM highways for street names conflation."""
ql = [
'way["highway"]["name"]',
'relation["highway"]["name"]',
'way["place"="square"]["name"]',
'relation["place"="square"]["name"]',
]
highway_osm = self.read_osm("current_highway.osm", ql=ql)
highway = geo.HighwayLayer()
highway.read_from_osm(highway_osm)
del highway_osm
return highway
[docs] def get_current_ad_osm(self):
"""Get OSM address for conflation."""
ql = [
'node["addr:street"]["addr:housenumber"]["entrance"]',
'wr["addr:street"]["addr:housenumber"][~"building"~".*"]',
'nwr["addr:place"]["addr:housenumber"]',
]
address_osm = self.read_osm("current_address.osm", ql=ql)
current_address = set()
w = 0
report.osm_addresses = 0
for d in address_osm.elements:
if "addr:housenumber" not in d.tags:
if "addr:street" in d.tags or "addr:place" in d.tags:
w += 1
elif "addr:street" in d.tags:
current_address.add(d.tags["addr:street"] + d.tags["addr:housenumber"])
report.osm_addresses += 1
elif "addr:place" in d.tags:
current_address.add(d.tags["addr:place"] + d.tags["addr:housenumber"])
report.osm_addresses += 1
if w > 0:
msg = _("There are %d address without house number in the OSM data") % w
log.warning(msg)
report.warnings.append(msg)
report.osm_addresses_without_number = w
return current_address
[docs] def move_project(self):
"""
Move to tasks all files needed for the backup the in the repository.
Use a subdirectory if it's a split municipality.
"""
if not os.path.exists(self.bkp_path):
os.makedirs(self.bkp_path)
move_files = [
"current_address.osm",
"current_highway.osm",
"highway_names.csv",
"tasks.csv",
]
copy_files = [
"address.osm",
"address.geojson",
"zoning.geojson",
"zoning.zip",
"report.txt",
"review.txt",
"report.json",
]
if self.options.split:
shutil.move(self.tasks_path, self.bkp_path)
bkp_path = os.path.join(self.bkp_path, self.tasks_folder)
if self.options.split:
if self.options.split == report.get("split_id", " "):
copy_files.append(report.split_name.replace(" ", "_") + ".osm")
else:
copy_files.append(self.options.split)
for f in move_files:
fn = self.cat.get_path(f)
if os.path.exists(fn):
os.rename(fn, os.path.join(bkp_path, f))
for f in copy_files:
fn = self.cat.get_path(f)
if os.path.exists(fn):
shutil.copy(fn, bkp_path)
[docs] def export_layer(self, layer, filename, driver_name="GeoJSON", target_crs_id=None):
"""
Export a vector layer.
Args:
layer (QgsVectorLayer): Source layer.
filename (str): Output filename.
driver_name (str): name of OGR driver (or get it from filename).
target_crs_id (int): Defaults to source CRS.
"""
out_path = self.cat.get_path(filename)
if layer.export(out_path, driver_name, target_crs_id=target_crs_id):
log.info(_("Generated '%s'"), filename)
else:
raise CatIOError(_("Failed to write layer: '%s'") % filename)
[docs] def read_osm(self, *paths, **kwargs):
"""
Read an OSM data set from an OSM XML file.
If the file not exists, downloads data from overpass using ql query.
Args:
paths (str): input filename components relative to self.path
ql (str): Query to put in the url for overpass
Returns
Osm: OSM data set
"""
ql = kwargs.get("ql", False)
osm_path = self.cat.get_path(*paths)
filename = os.path.basename(osm_path)
if not os.path.exists(osm_path):
if not ql:
return None
log.info(_("Downloading '%s'") % filename)
query = overpass.Query(self.boundary_search_area)
if hasattr(self, "boundary_bbox") and self.boundary_bbox:
query.set_search_area(self.boundary_bbox)
query.add(ql)
if log.app_level == logging.DEBUG:
query.download(osm_path, log)
else:
query.download(osm_path)
if osm_path.endswith(".gz"):
fo = gzip.open(osm_path, "rb")
else:
fo = open(osm_path, "rb")
data = osmxml.deserialize(fo)
fo.close()
if len(data.elements) == 0:
msg = _("No OSM data were obtained from '%s'") % filename
log.warning(msg)
report.warnings.append(msg)
else:
log.info(
_("Read '%s': %d nodes, %d ways, %d relations"),
filename,
len(data.nodes),
len(data.ways),
len(data.relations),
)
return data
[docs] def write_osm(self, data, *paths):
"""
Generate an OSM XML file for an OSM data set.
Args:
data (Osm): OSM data set
paths (str): output filename components relative to self.path
(compress if ends with .gz)
"""
for e in data.elements:
if "ref" in e.tags:
del e.tags["ref"]
data.merge_duplicated()
osm_path = self.cat.get_path(*paths)
if osm_path.endswith(".gz"):
file_obj = codecs.getwriter("utf-8")(gzip.open(osm_path, "w"))
else:
file_obj = io.open(osm_path, "w", encoding="utf-8")
osmxml.serialize(file_obj, data)
file_obj.close()
msg = _("Generated '%s': %d nodes, %d ways, %d relations")
log.info(
msg,
os.path.basename(osm_path),
len(data.nodes),
len(data.ways),
len(data.relations),
)