import logging
from collections import defaultdict
from qgis.core import QgsFeatureRequest, QgsField, QgsGeometry
from qgis.PyQt.QtCore import QVariant
from catatom2osm import config, translate
from catatom2osm.geo import BUFFER_SIZE, SIMPLIFY_BUILDING_PARTS
from catatom2osm.geo.geometry import Geometry
from catatom2osm.geo.layer.polygon import PolygonLayer
from catatom2osm.geo.point import Point
from catatom2osm.geo.tools import get_attributes, is_inside
from catatom2osm.report import instance as report
log = logging.getLogger(config.app_name)
[documentos]class ConsLayer(PolygonLayer):
"""Class for constructions."""
def __init__(
self,
path="MultiPolygon",
baseName="building",
providerLib="memory",
source_date=None,
):
super(ConsLayer, self).__init__(path, baseName, providerLib)
if self.fields().isEmpty():
self.writer.addAttributes(
[
QgsField("localId", QVariant.String, len=254),
QgsField("condition", QVariant.String, len=254),
QgsField("image", QVariant.String, len=254),
QgsField("currentUse", QVariant.String, len=254),
QgsField("bu_units", QVariant.Int),
QgsField("dwellings", QVariant.Int),
QgsField("lev_above", QVariant.Int),
QgsField("lev_below", QVariant.Int),
QgsField("nature", QVariant.String, len=254),
QgsField("task", QVariant.String, len=254),
QgsField("fixme", QVariant.String, len=254),
QgsField("layer", QVariant.Int),
]
)
self.updateFields()
self.rename = {
"condition": "conditionOfConstruction",
"image": "documentLink",
"bu_units": "numberOfBuildingUnits",
"dwellings": "numberOfDwellings",
"lev_above": "numberOfFloorsAboveGround",
"lev_below": "numberOfFloorsBelowGround",
"nature": "constructionNature",
}
self.source_date = source_date
[documentos] @staticmethod
def is_building(feature):
"""Return True for building features."""
return "_" not in feature["localId"]
[documentos] @staticmethod
def is_part(feature):
"""Return True for Part features."""
return "_part" in feature["localId"]
[documentos] @staticmethod
def is_pool(feature):
"""Return True for Pool features."""
return "_PI." in feature["localId"]
[documentos] @staticmethod
def get_id(feat):
"""Trim to parcel id."""
return feat["localId"].split("_")[0].split(".")[-1]
[documentos] def explode_multi_parts(self, address=False):
request = QgsFeatureRequest()
if address:
refs = {self.get_id(ad) for ad in address.getFeatures()}
fids = [f.id() for f in self.getFeatures() if f["localId"] not in refs]
request.setFilterFids(fids)
super(ConsLayer, self).explode_multi_parts(request)
[documentos] def to_osm(self, data=None, tags={}, upload="never"):
"""Export to OSM."""
return super(ConsLayer, self).to_osm(
translate.building_tags, data, tags=tags, upload=upload
)
[documentos] def index_of_parts(self):
"""Index parts of building by building localid."""
parts = defaultdict(list)
for part in self.search("regexp_match(localId, '_part')"):
localId = self.get_id(part)
parts[localId].append(part)
return parts
[documentos] def index_of_pools(self):
"""Index pools in building parcel by building localid."""
pools = defaultdict(list)
for pool in self.search("regexp_match(localId, '_PI')"):
localId = self.get_id(pool)
pools[localId].append(pool)
return pools
[documentos] def index_of_building_and_parts(self):
"""
Construct some utility dicts.
buildings index building by localid (call before explode_multi_parts).
parts index parts of building by building localid.
"""
buildings = defaultdict(list)
parts = defaultdict(list)
for feature in self.getFeatures():
if self.is_building(feature):
buildings[feature["localId"]].append(feature)
elif self.is_part(feature):
localId = self.get_id(feature)
parts[localId].append(feature)
return (buildings, parts)
[documentos] def remove_parts_wo_building(self):
"""Remove building parts without building."""
bu_refs = [f["localId"] for f in self.getFeatures() if self.is_building(f)]
to_clean = [
f.id()
for f in self.getFeatures()
if self.is_part(f) and self.get_id(f) not in bu_refs
]
if to_clean:
self.writer.deleteFeatures(to_clean)
log.debug(_("Removed %d parts without building"), len(to_clean))
report.parts_wo_building = len(to_clean)
[documentos] def remove_outside_parts(self):
"""
Remove parts outside the outline of it building.
Remove parts without levels above ground.
Precondition: Called before merge_greatest_part.
"""
to_clean_o = []
to_clean_b = []
buildings = {f["localId"]: f for f in self.getFeatures() if self.is_building(f)}
pbar = self.get_progressbar(_("Remove outside parts"), self.featureCount())
for feat in self.getFeatures():
if self.is_part(feat):
ref = self.get_id(feat)
if feat["lev_above"] == 0 and feat["lev_below"] != 0:
to_clean_b.append(feat.id())
elif ref in buildings:
bu = buildings[ref]
if not is_inside(feat, bu):
to_clean_o.append(feat.id())
pbar.update()
pbar.close()
if len(to_clean_o) + len(to_clean_b) > 0:
self.writer.deleteFeatures(to_clean_o + to_clean_b)
if len(to_clean_o) > 0:
log.debug(
_("Removed %d building parts outside the outline"), len(to_clean_o)
)
report.outside_parts = len(to_clean_o)
if len(to_clean_b) > 0:
log.debug(
_("Deleted %d building parts with no floors above ground"),
len(to_clean_b),
)
report.underground_parts = len(to_clean_b)
[documentos] def get_parts(self, outline, parts):
"""
Return a dictionary of parts for levels, the maximum and minimum levels.
Given the building outline and its parts, for the parts inside the outline.
"""
max_level = 0
min_level = 0
parts_for_level = defaultdict(list)
for part in parts:
if is_inside(part, outline):
level = (part["lev_above"] or 0, part["lev_below"] or 0)
if level[0] > max_level:
max_level = level[0]
if level[1] > min_level:
min_level = level[1]
parts_for_level[level].append(part)
return parts_for_level, max_level, min_level
[documentos] def merge_adjacent_parts(self, outline, parts):
"""
Merge the adjacent parts in each level given a building outline and its parts.
Translates the maximum values of number of levels above and below ground to
the outline and optionally deletes all the parts in that level.
"""
to_clean = []
to_clean_g = []
to_change = {}
to_change_g = {}
parts_for_level, max_level, min_level = self.get_parts(outline, parts)
parts_area = 0
outline["lev_above"] = max_level
outline["lev_below"] = min_level
building_area = round(outline.geometry().area(), 0)
for (level, parts) in parts_for_level.items():
check_area = False
for part in parts:
part_area = part.geometry().area()
parts_area += part_area
if round(part_area, 0) > building_area:
part["fixme"] = _("This part is bigger than its building")
to_change[part.id()] = get_attributes(part)
check_area = True
if check_area:
continue
if len(parts_for_level) == 1 or (
level == (max_level, min_level) and SIMPLIFY_BUILDING_PARTS
):
to_clean = [p.id() for p in parts_for_level[max_level, min_level]]
else:
geom = Geometry.merge_adjacent_features(parts)
poly = Geometry.get_multipolygon(geom)
if len(poly) < len(parts):
for (i, part) in enumerate(parts):
if i < len(poly):
g = Geometry.fromPolygonXY(poly[i])
to_change_g[part.id()] = g
else:
to_clean_g.append(part.id())
if len(parts_for_level) > 1 and round(parts_area, 0) != building_area:
outline["fixme"] = _("Building parts don't fill the building outline")
to_change[outline.id()] = get_attributes(outline)
return to_clean, to_clean_g, to_change, to_change_g
[documentos] def remove_inner_rings(self, feat1, feat2):
"""
Auxiliary method to remove feat1 of its inner rings if equals to feat2.
Returns True if feat1 must be deleted and new geometry if any ring is
removed.
"""
poly = Geometry.get_multipolygon(feat1)[0]
geom2 = Geometry.fromPolygonXY(Geometry.get_multipolygon(feat2)[0])
delete = False
new_geom = None
delete_rings = []
for i, ring in enumerate(poly):
if Geometry.fromPolygonXY([ring]).equals(geom2):
if i == 0:
delete = True
break
else:
delete_rings.append(i)
if delete_rings:
new_poly = [ring for i, ring in enumerate(poly) if i not in delete_rings]
new_geom = Geometry().fromPolygonXY(new_poly)
return delete, new_geom
[documentos] def merge_building_parts(self):
"""
Apply merge_adjacent_parts to each set of building and its parts.
Detect pools contained in a building and assign layer=1.
Detect buildings/parts with geometry equals to a pool geometry and
delete them.
Detect inner rings of buildings/parts with geometry equals to a pool
geometry and remove them.
"""
parts = self.index_of_parts()
pools = self.index_of_pools()
to_clean = []
to_change = {}
to_change_g = {}
buildings_in_pools = 0
levels_to_outline = 0
parts_merged_to_building = 0
adjacent_parts_deleted = 0
pools_on_roofs = 0
visited_parcels = set()
t_buildings = self.count("not regexp_match(localId, '_')")
pbar = self.get_progressbar(_("Merge building parts"), t_buildings)
for building in self.search("not regexp_match(localId, '_')"):
ref = building["localId"]
it_pools = pools[ref]
it_parts = parts[ref]
for pool in it_pools:
if pool["layer"] != 1 and is_inside(pool, building):
pool["layer"] = 1
to_change[pool.id()] = get_attributes(pool)
pools_on_roofs += 1
del_building, new_geom = self.remove_inner_rings(building, pool)
if del_building:
to_clean.append(building.id())
buildings_in_pools += 1
break
if new_geom:
to_change_g[building.id()] = QgsGeometry(new_geom)
if ref not in visited_parcels:
for part in frozenset(it_parts):
del_part, new_geom = self.remove_inner_rings(part, pool)
if del_part:
to_clean.append(part.id())
it_parts.remove(part)
if part in parts[ref]:
parts[ref].remove(part)
adjacent_parts_deleted += 1
elif new_geom:
to_change_g[part.id()] = QgsGeometry(new_geom)
visited_parcels.add(ref)
cn, cng, ch, chg = self.merge_adjacent_parts(building, it_parts)
to_clean += cn + cng
to_change.update(ch)
to_change_g.update(chg)
levels_to_outline += len(ch)
parts_merged_to_building += len(cn)
adjacent_parts_deleted += len(cng)
pbar.update()
pbar.close()
if to_change:
self.writer.changeAttributeValues(to_change)
if to_change_g:
self.writer.changeGeometryValues(to_change_g)
if to_clean:
self.writer.deleteFeatures(to_clean)
if pools_on_roofs:
log.debug(_("Located %d swimming pools over a building"), pools_on_roofs)
report.pools_on_roofs = pools_on_roofs
if buildings_in_pools:
log.debug(
_("Deleted %d buildings coincidents with a swimming pool"),
buildings_in_pools,
)
report.buildings_in_pools = buildings_in_pools
if levels_to_outline:
log.debug(_("Translated %d level values to the outline"), levels_to_outline)
if parts_merged_to_building:
log.debug(
_("Merged %d building parts to the outline"), parts_merged_to_building
)
report.parts_to_outline = parts_merged_to_building
if adjacent_parts_deleted:
log.debug(_("Merged %d adjacent parts"), adjacent_parts_deleted)
report.adjacent_parts = adjacent_parts_deleted
[documentos] def clean(self):
"""
Clean geometries.
Delete invalid geometries and close vertices, add topological points,
merge building parts and simplify vertices.
"""
self.delete_invalid_geometries(
query_small_area=lambda feat: "_part" not in feat["localId"]
)
self.topology()
self.merge_building_parts()
self.simplify()
self.delete_small_geometries()
[documentos] def move_entrance(
self,
ad,
ad_buildings,
ad_parts,
to_move,
to_insert,
parents_per_vx,
):
"""
Auxiliary method to move entrance to the nearest building and part.
Don't move and the entrance specification is changed if the new
position is not enough close ('remote'), is a corner ('corner'),
is in an inner ring ('inner') or is in a wall shared with another
building ('shared').
"""
point = ad.geometry().asPoint()
distance = 9e9
for bu in ad_buildings:
bg = bu.geometry()
d, c, v = bg.closestSegmentWithContext(point)[:3]
if d < distance:
(building, distance, closest, vertex) = (bu, d, c, v)
bg = building.geometry()
bid = building.id()
va = Point(bg.vertexAt(vertex - 1))
vb = Point(bg.vertexAt(vertex))
if distance > config.addr_thr**2:
ad["spec"] = "remote"
elif vertex > len(Geometry.get_multipolygon(bg)[0][0]):
ad["spec"] = "inner"
elif (
closest.sqrDist(va) < config.entrance_thr**2
or closest.sqrDist(vb) < config.entrance_thr**2
):
ad["spec"] = "corner"
elif PolygonLayer.is_shared_segment(parents_per_vx, va, vb, bid):
ad["spec"] = "shared"
else:
dg = Geometry.fromPointXY(closest)
to_move[ad.id()] = dg
bg.insertVertex(closest.x(), closest.y(), vertex)
to_insert[bid] = QgsGeometry(bg)
building.setGeometry(bg)
for part in ad_parts:
pg = part.geometry()
r = Geometry.get_multipolygon(pg)[0][0]
for i in range(len(r) - 1):
vpa = Point(pg.vertexAt(i))
vpb = Point(pg.vertexAt(i + 1))
if va in (vpa, vpb) and vb in (vpa, vpb):
pg.insertVertex(closest.x(), closest.y(), i + 1)
to_insert[part.id()] = QgsGeometry(pg)
part.setGeometry(pg)
break
[documentos] def move_address(self, address):
"""
Try to move each entrance address to the nearest point in the building outline.
Building and addresses are associated using the cadastral reference.
Non entrance addresses ends in the building outline when
CatAtom2Osm.merge_address is called. Delete the address if the number of
associated buildings is 0 or greater than 1 for non entrance addresses.
"""
to_change = {}
to_move = {}
to_insert = {}
to_clean = []
mp = 0
oa = 0
(buildings, parts) = self.index_of_building_and_parts()
exp = "NOT(localId ~ '_')"
ppv, geometries = self.get_parents_per_vertex_and_geometries(exp)
pbar = self.get_progressbar(_("Move addresses"), address.featureCount())
for ad in address.getFeatures():
refcat = self.get_id(ad)
building_count = len(buildings.get(refcat, []))
ad_buildings = buildings[refcat]
ad_parts = parts[refcat]
if building_count == 0:
to_clean.append(ad.id())
oa += 1
else:
if ad["spec"] == "Entrance":
self.move_entrance(
ad,
ad_buildings,
ad_parts,
to_move,
to_insert,
ppv,
)
if ad["spec"] != "Entrance" and building_count > 1:
to_clean.append(ad.id())
mp += 1
if ad["spec"] != "Parcel" and building_count == 1:
to_change[ad.id()] = get_attributes(ad)
if len(to_insert) > BUFFER_SIZE:
self.writer.changeGeometryValues(to_insert)
to_insert = {}
pbar.update()
pbar.close()
address.writer.changeAttributeValues(to_change)
address.writer.changeGeometryValues(to_move)
if len(to_insert) > 0:
self.writer.changeGeometryValues(to_insert)
msg = _("Moved %d addresses to entrance, %d specification changed")
log.debug(msg, len(to_move), len(to_change))
if len(to_clean) > 0:
address.writer.deleteFeatures(to_clean)
if oa > 0:
msg = _("Deleted %d addresses without associated building")
log.debug(msg, oa)
report.pool_addresses = oa
if mp > 0:
msg = _("Refused %d addresses belonging to multiple buildings")
log.debug(msg, mp)
report.multiple_addresses = mp
[documentos] def validate(self, max_level, min_level):
"""
Put fixmes to buildings with not valid geometry, too small or big.
Returns distribution of floors.
"""
to_change = {}
for feat in self.getFeatures():
geom = feat.geometry()
errors = geom.validateGeometry()
if errors:
feat["fixme"] = (
_("GEOS validation") + ": " + "; ".join([e.what() for e in errors])
)
to_change[feat.id()] = get_attributes(feat)
if ConsLayer.is_building(feat):
localid = feat["localId"]
if isinstance(feat["lev_above"], int) and feat["lev_above"] > 0:
max_level[localid] = feat["lev_above"]
if isinstance(feat["lev_below"], int) and feat["lev_below"] > 0:
min_level[localid] = feat["lev_below"]
if feat.id() not in to_change:
area = geom.area()
if area < config.warning_min_area:
feat["fixme"] = _("Check, area too small")
to_change[feat.id()] = get_attributes(feat)
if area > config.warning_max_area:
feat["fixme"] = _("Check, area too big")
to_change[feat.id()] = get_attributes(feat)
if to_change:
self.writer.changeAttributeValues(to_change)
[documentos] def conflate(self, current_bu_osm, delete=True):
"""
Remove from current_bu_osm the buildings that don't have conflicts.
If delete=False, only mark buildings with conflicts.
"""
if len(current_bu_osm.elements) == 0:
return
index = self.get_index()
geometries = {f.id(): QgsGeometry(f.geometry()) for f in self.getFeatures()}
num_buildings = 0
conflicts = 0
to_clean = set()
pbar = self.get_progressbar(_("Conflate"), len(current_bu_osm.elements))
for el in current_bu_osm.elements:
poly = None
is_pool = "leisure" in el.tags and el.tags["leisure"] == "swimming_pool"
is_building = "building" in el.tags
if el.type == "way" and el.is_closed() and (is_building or is_pool):
poly = [[map(Point, el.geometry())]]
elif el.type == "relation" and (is_building or is_pool):
poly = [[map(Point, w)] for w in el.outer_geometry()]
if poly:
num_buildings += 1
geom = Geometry().fromMultiPolygonXY(poly)
if geom is None or not geom.isGeosValid():
msg = _("OSM building with id %s is not valid") % el.fid
pbar.clear()
log.warning(msg)
report.warnings.append(msg)
else:
fids = index.intersects(geom.boundingBox())
conflict = False
for fid in fids:
fg = geometries[fid]
if geom.contains(fg) or fg.contains(geom) or geom.overlaps(fg):
conflict = True
conflicts += 1
break
if delete and not conflict:
to_clean.add(el)
if not delete and conflict:
el.tags["conflict"] = "yes"
pbar.update()
pbar.close()
for el in to_clean:
current_bu_osm.remove(el)
log.debug(
_("Detected %d conflicts in %d buildings/pools from OSM"),
conflicts,
num_buildings,
)
report.osm_buildings = num_buildings
report.osm_building_conflicts = conflicts
return len(to_clean) > 0