Source code for catatom2osm.report

"""Statistics report."""
import io
import json
import locale
import platform
import time
from collections import Counter, OrderedDict
from datetime import datetime

import psutil

from catatom2osm import config

TAB = "  "
SEP = ": "
MEMORY_UNIT = 1048576.0
MEMORY_LABEL = "MB"
int_format = lambda v: locale.format_string("%d", v, True)


[docs]class Report(object): def __init__(self, **kwargs): self.titles = OrderedDict( [ ("fixme_counter", None), ("min_level", None), ("max_level", None), ("building_counter", None), ("mun_name", _("Municipality")), ("cat_mun", _("Cadastre name")), ("mun_code", _("Code")), ("split_id", _("Split ID")), ("split_name", _("Split name")), ("split_file", _("Split file")), ("date", _("Date")), ("options", _("Options")), ("language", _("Language")), ("group_system_info", _("System info")), ("app_version", _("Application version")), ("platform", _("Platform")), ("qgs_version", _("QGIS version")), ("gdal_version", _("GDAL version")), ("cpu_count", _("CPU count")), ("cpu_freq", _("CPU frequency")), ("ex_time", _("Execution time")), ("memory", _("Total memory")), ("rss", _("Physical memory usage")), ("vms", _("Virtual memory usage")), ("group_address", _("Addresses")), ("subgroup_ad_cdau", "CDAU"), ("inp_address_cdau", _("Feature count")), ("rep_address_cdau", _("Replaced addresses")), ("add_address_cdau", _("Added addresses")), ("subgroup_ad_input", _("Input data")), ("address_date", _("Source date")), ("inp_address", _("Feature count")), ("inp_address_entrance", TAB + _("Type entrance")), ("inp_address_parcel", TAB + _("Type parcel")), ("inp_zip_codes", _("Postal codes")), ("inp_street_names", _("Street names")), ("subgroup_ad_process", _("Process")), ( "orphaned_addresses", _("Addresses without associated building excluded"), ), ("ignored_addresses", _("Addresses deleted by street name")), ( "addresses_without_number", _("Addresses without house number deleted"), ), ( "multiple_addresses", _("Addresses belonging to multiple buildings deleted"), ), ( "not_unique_addresses", _("'Parcel' addresses not unique for its building deleted"), ), ("subgroup_ad_conflation", _("Conflation")), ("osm_addresses", _("OSM addresses ")), ("osm_addresses_without_number", TAB + _("Without house number")), ( "refused_addresses", _("Addresses rejected because they exist in OSM"), ), ("subgroup_ad_output", _("Output data")), ("out_address", _("Addresses")), ("out_address_entrance", TAB + _("In entrance nodes")), ("out_address_parcel", TAB + _("In parcels")), ("out_addr_str", TAB + _("Type addr:street")), ("out_addr_plc", TAB + _("Type addr:place")), ("group_buildings", _("Buildings")), ("subgroup_bu_input", _("Input data")), ("building_date", _("Source date")), ("inp_features", _("Feature count")), ("inp_buildings", TAB + _("Buildings")), ("inp_parts", TAB + _("Building parts")), ("inp_pools", TAB + _("Swimming pools")), ("orphaned_parts", _("Parts without associated building excluded")), ("subgroup_bu_process", _("Process")), ("parts_wo_building", _("Parts without building deleted")), ("outside_parts", _("Parts outside outline deleted")), ("underground_parts", _("Parts with no floors above ground")), ("multipart_geoms_building", _("Buildings with multipart geometries")), ( "exploded_parts_building", _("Buildings resulting from splitting multiparts"), ), ("parts_to_outline", _("Parts merged to the outline")), ("adjacent_parts", _("Adjacent parts merged")), ( "buildings_in_pools", _("Buildings coincidents with a swimming pool deleted"), ), ("geom_parts_building", _("Invalid geometry parts deleted")), ("geom_rings_building", _("Invalid geometry rings deleted")), ("geom_invalid_building", _("Invalid geometries deleted")), ("vertex_zigzag_building", _("Zig-zag vertices deleted")), ("vertex_spike_building", _("Spike vertices deleted")), ("vertex_close_building", _("Close vertices merged")), ("vertex_topo_building", _("Topological points created")), ("vertex_simplify_building", _("Simplified vertices")), ("subgroup_bu_conflation", _("Conflation")), ("osm_buildings", _("Buildings/pools in OSM")), ("osm_building_conflicts", TAB + _("With conflict")), ("subgroup_bu_output", _("Output data")), ("nodes", _("Nodes")), ("ways", _("Ways")), ("relations", _("Relations")), ("out_features", _("Feature count")), ("out_buildings", TAB + _("Buildings")), ("out_parts", TAB + _("Buildings parts")), ("out_pools", TAB + _("Swimming pools")), ("pools_on_roofs", TAB + TAB + _("Over buildings")), ("building_types", _("Building types counter")), ("dlag", _("Max. levels above ground (level: # of buildings)")), ("dlbg", _("Min. levels below ground (level: # of buildings)")), ("group_tasks", _("Project")), ("parcel_parts", _("Building parts threshold")), ("parcel_dist", _("Distance threshold")), ("tasks", _("Tasks files")), ("tasks_r", TAB + _("Rustic")), ("tasks_u", TAB + _("Urban")), ("group_problems", _("Problems")), ("errors", _("Report validation:")), ("fixme_count", _("Fixmes")), ("fixmes", ""), ("warnings", _("Warnings:")), ] ) self.formats = { "cpu_freq": lambda v: locale.format_string("%.1f Mhz", v, True), "ex_time": lambda v: locale.format_string("%.1f " + _("seconds"), v, True), "memory": lambda v: locale.format_string("%.2f ", v, True) + MEMORY_LABEL, "rss": lambda v: locale.format_string("%.2f ", v, True) + MEMORY_LABEL, "vms": lambda v: locale.format_string("%.2f ", v, True) + MEMORY_LABEL, } self.clear(**kwargs)
[docs] def clear(self, **kwargs): self.start_time = time.time() self.values = { "date": datetime.now().strftime("%x"), "warnings": [], "errors": [], "min_level": {}, "max_level": {}, "fixme_counter": Counter(), "building_counter": Counter(), } self.language = config.language if config.report_system_info: self.get_sys_info() self.tasks_with_fixmes = Counter() for k, v in list(kwargs.items()): self.values[k] = v
def __setattr__(self, key, value): if key != "titles" and key in self.titles.keys(): self.values[key] = value else: super(Report, self).__setattr__(key, value) def __getattr__(self, key): if key != "titles" and key in self.titles.keys(): return self.values[key] else: return super(Report, self).__getattribute__(key)
[docs] def address_stats(self, address_osm): for el in address_osm.elements: if "addr:street" in el.tags: self.inc("out_addr_str") if "addr:place" in el.tags: self.inc("out_addr_plc") if "addr:street" in el.tags or "addr:place" in el.tags: self.inc("out_address") if el.type == "node" and "entrance" in el.tags: self.inc("out_address_entrance") if "entrance" not in el.tags: self.inc("out_address_parcel")
[docs] def cons_stats(self, data, task_label=None): for el in data.elements: if "leisure" in el.tags and el.tags["leisure"] == "swimming_pool": self.inc("out_pools") self.inc("out_features") if "building" in el.tags: self.inc("out_buildings") self.building_counter[el.tags["building"]] += 1 self.inc("out_features") if "building:part" in el.tags: self.inc("out_parts") self.inc("out_features") if "fixme" in el.tags: self.fixme_counter[el.tags["fixme"]] += 1 if task_label is not None: self.tasks_with_fixmes[task_label] += 1
[docs] def get_tasks_with_fixmes(self): fixmes = self.tasks_with_fixmes return {k: fixmes[k] for k in sorted(fixmes.keys())}
[docs] def osm_stats(self, data): self.inc("nodes", len(data.nodes)) self.inc("ways", len(data.ways)) self.inc("relations", len(data.relations))
[docs] def cons_end_stats(self): self.dlag = ", ".join( [ "%d: %d" % (l, c) for (l, c) in list( OrderedDict(Counter(list(self.max_level.values()))).items() ) ] ) self.dlbg = ", ".join( [ "%d: %d" % (l, c) for (l, c) in list( OrderedDict(Counter(list(self.min_level.values()))).items() ) ] ) self.building_types = ", ".join( ["%s: %d" % (b, c) for (b, c) in list(self.building_counter.items())] )
[docs] def fixme_stats(self): fixme_count = sum(self.fixme_counter.values()) if fixme_count: self.fixme_count = fixme_count self.fixmes = [ "%s: %d" % (f, c) for (f, c) in list(self.fixme_counter.items()) ] return fixme_count
[docs] def get(self, key, default=0): return self.values.get(key, default)
[docs] def inc(self, key, step=1): self.values[key] = self.get(key) + step
[docs] def sum(self, *args): return sum(self.get(key) for key in args)
[docs] def get_sys_info(self): p = psutil.Process() v = list(platform.uname()) v.pop(1) self.platform = " ".join(v) self.app_version = config.app_name + " " + config.app_version self.cpu_count = psutil.cpu_count(logical=False) self.cpu_freq = getattr(getattr(psutil, "cpu_freq", lambda: 0)(), "max", 0) self.memory = psutil.virtual_memory().total / MEMORY_UNIT self.rss = p.memory_info().rss / MEMORY_UNIT self.vms = p.memory_info().vms / MEMORY_UNIT
[docs] def clean_group(self, group): group = "_" + group for k in frozenset(self.values.keys()): if k.endswith(group): del self.values[k]
[docs] def tags_for_info(self): self.values.pop("date", None) self.values.pop("warnings", None) self.values.pop("errors", None) self.values.pop("min_level", None) self.values.pop("max_level", None) self.values.pop("fixme_counter", None) self.values.pop("building_counter", None) self.values.pop("options", None)
[docs] def validate(self): if self.sum("tasks_u", "tasks_r") != self.get("tasks"): self.errors.append( _( "Sum of rustic and urban tasks should be equal " "to number of tasks in the project" ) ) if self.sum("inp_address_entrance", "inp_address_parcel") != self.get( "inp_address" ): self.errors.append( _("Sum of address types should be equal to the input addresses") ) if self.sum( "addresses_without_number", "not_unique_addresses", "multiple_addresses", "refused_addresses", "ignored_addresses", "out_address", "pool_addresses", "orphaned_addresses", ) != self.get("inp_address"): self.errors.append( _( "Sum of output and deleted addresses should be equal " "to the input addresses" ) ) if ( self.sum("out_address_entrance", "out_address_parcel") > 0 and self.sum("out_address_entrance", "out_address_parcel") ) != self.get("out_address"): self.errors.append( _( "Sum of entrance and parcel addresses should be equal " "to output addresses" ) ) if self.sum("out_addr_str", "out_addr_plc") != self.get("out_address"): self.errors.append( _( "Sum of street and place addresses should be equal " "to output addresses" ) ) if self.sum("inp_buildings", "inp_parts", "inp_pools") != self.get( "inp_features" ): self.errors.append( _( "Sum of buildings, parts and pools should be equal " "to the feature count" ) ) if self.sum( "out_features", "outside_parts", "underground_parts", "multipart_geoms_building", "parts_to_outline", "parts_wo_building", "adjacent_parts", "geom_invalid_building", "buildings_in_pools", ) - self.get("exploded_parts_building") != self.get("inp_features"): self.errors.append( _( "Sum of output and deleted minus created " "building features should be equal to input features" ) ) if self.building_counter: if sum(self.building_counter.values()) != self.get("out_buildings"): self.errors.append( _( "Sum of building types should be equal " "to the number of buildings" ) )
[docs] def to_string(self): if self.start_time is not None: self.ex_time = time.time() - self.start_time groups = set() last_group = False last_subgroup = False for key in list(self.titles.keys()): exists = key in self.values if ( exists and isinstance(self.values[key], list) and len(self.values[key]) == 0 ): exists = False if key.startswith("group_"): last_group = key last_subgroup = False elif key.startswith("subgroup_"): last_subgroup = key if last_group and exists: groups.add(last_group) if last_subgroup and exists: groups.add(last_subgroup) output = "" for key, title in list(self.titles.items()): if title is None: continue if key.startswith("group_") and key in groups: output += config.eol + "=" + self.titles[key] + "=" + config.eol elif key.startswith("subgroup_") and key in groups: output += config.eol + "==" + self.titles[key] + "==" + config.eol elif key in self.values: if isinstance(self.values[key], list): if len(self.values[key]) > 0: if title: output += ( title + " " + int_format(len(self.values[key])) + config.eol ) for value in self.values[key]: output += TAB + value + config.eol else: value = self.values[key] if value is None: value = "" if key in self.formats: value = self.formats[key](value) elif isinstance(value, int): value = int_format(value) output += title + SEP + value output += config.eol if "fixmes" in self.values: output += config.eol + config.fixme_doc_url return output
[docs] def to_file(self, fn): with io.open(fn, "w", encoding=config.encoding) as fo: fo.write(self.to_string())
[docs] def export(self, fn): with open(fn, "w") as fo: fo.write(json.dumps(self.values))
[docs] def from_file(self, fn): with open(fn, "r") as fo: self.values = json.loads(fo.read()) for k, v in self.values.items(): if k.endswith("_counter"): self.values[k] = Counter(v)
instance = Report()