"""Reader of Cadastre ATOM GML files."""
import logging
import os
import re
import zipfile
from lxml import etree
from qgis.core import QgsCoordinateReferenceSystem
from catatom2osm import config, download, geo
from catatom2osm.exceptions import CatIOError, CatValueError
log = logging.getLogger(config.app_name)
[documentos]class Reader(object):
"""Class to download and read Cadastre ATOM GML files."""
def __init__(self, a_path):
"""
Construct a cadastre reader.
Args:
a_path (str): Directory where the source files are located.
"""
self.path = a_path
m = re.match(r"^\d{5}$", os.path.split(a_path)[-1])
if not m:
msg = _("Last directory name must be a 5 digits ZIP code")
raise CatValueError(msg)
self.zip_code = m.group()
self.prov_code = self.zip_code[0:2]
if self.prov_code not in config.prov_codes:
msg = _("Province code '%s' is not valid") % self.prov_code
raise CatValueError(msg)
if not os.path.exists(a_path):
os.makedirs(a_path)
if not os.path.isdir(a_path):
raise CatIOError(_("Not a directory: '%s'") % a_path)
[documentos] def get_path(self, *paths):
"""Get path from components relative to self.path."""
return os.path.join(self.path, *paths)
[documentos] def get_file_object(self, gml_path, zip_path=""):
"""Get handler for gml_path (if exist) or for zip_path."""
if os.path.exists(gml_path):
fo = open(gml_path, "rb")
else:
with zipfile.ZipFile(zip_path, "r") as zf:
gml_fp = self.get_path_from_zip(zf, gml_path)
fo = zf.open(gml_fp, "r")
return fo
[documentos] def get_atom_file(self, url):
"""
Try to download the ZIP file for self.zip_code.
Given the url of a Cadastre ATOM service.
"""
s = re.search(r"INSPIRE/(\w+)/", url)
log.debug(
_("Searching the url for the '%s' layer of '%s'..."),
s.group(1),
self.zip_code,
)
response = download.get_response(url)
s = re.search(r"http.+/%s.+zip" % self.zip_code, response.text)
if not s:
msg = _("Municipality code '%s' don't exists") % self.zip_code
raise CatValueError(msg)
url = s.group(0)
filename = url.split("/")[-1]
out_path = self.get_path(filename)
log.info(_("Downloading '%s'"), out_path)
download.wget(url, out_path)
[documentos] def get_layer_paths(self, layername):
if layername in ["building", "buildingpart", "otherconstruction"]:
group = "BU"
elif layername in ["cadastralparcel", "cadastralzoning"]:
group = "CP"
elif layername in [
"address",
"thoroughfarename",
"postaldescriptor",
"adminunitname",
]:
group = "AD"
else:
raise CatValueError(_("Unknown layer name '%s'") % layername)
gml_fn = ".".join((config.fn_prefix, group, self.zip_code, layername, "gml"))
if group == "AD":
gml_fn = ".".join((config.fn_prefix, group, self.zip_code, "gml"))
md_fn = ".".join((config.fn_prefix, group, "MD", self.zip_code, "xml"))
if group == "CP":
md_fn = ".".join((config.fn_prefix, group, "MD.", self.zip_code, "xml"))
zip_fn = ".".join((config.fn_prefix, group, self.zip_code, "zip"))
md_path = self.get_path(md_fn)
gml_path = self.get_path(gml_fn)
zip_path = self.get_path(zip_fn)
return (md_path, gml_path, zip_path, group)
[documentos] def is_empty(self, gml_path, zip_path):
"""
Detect if the file is empty.
Cadastre empty files (usually otherconstruction) comes with a null
feature and results in a non valid layer in QGIS.
"""
fo = self.get_file_object(gml_path, zip_path)
text = fo.read(2000)
fo.close()
parser = etree.XMLPullParser(["start"])
parser.feed(text)
events = list(parser.read_events())
try:
parser.close()
except etree.XMLSyntaxError:
pass
return len([event for event, elem in events if event == "start"]) < 3
[documentos] def get_path_from_zip(self, zf, a_path):
"""Return full path in zip of this file name."""
fn = os.path.basename(a_path).split("|")[0]
for name in zf.namelist():
if name.endswith(fn):
return name
raise KeyError("There is no item named '{}' in the archive".format(fn))
[documentos] def get_gml_from_zip(self, gml_path, zip_path, group, layername):
"""Return gml layer from zip if exists and is valid or none."""
try:
with zipfile.ZipFile(zip_path) as zf:
gml_fp = self.get_path_from_zip(zf, gml_path)
vsizip_path = "/".join(("/vsizip", zip_path, gml_fp)).replace("\\", "/")
if group == "AD":
vsizip_path += "|layername=" + layername
gml = geo.BaseLayer(vsizip_path, layername + ".gml", "ogr")
if not gml.isValid():
gml = None
except IOError:
gml = None
return gml
[documentos] def fix_encoding(self, gml_path, zip_path):
"""Test if source needs to be converted to utf-8."""
with self.get_file_object(gml_path, zip_path) as fo:
data = fo.read()
try:
data.decode("ascii")
except UnicodeDecodeError:
text = data.decode("ISO-8859-1")
with open(gml_path, "w") as fo:
fo.write(text)
[documentos] def fix_amp(self, gml_path, zip_path):
"""Test if source needs to be escape ampersand."""
with self.get_file_object(gml_path, zip_path) as fo:
data = fo.read()
save = False
if b"&<" in data:
data = re.sub(b"&<", b"&<", data)
save = True
if b"&F" in data:
data = re.sub(b"&F", b"&F", data)
save = True
if save:
with open(gml_path, "wb") as fo:
fo.write(data)
[documentos] def download(self, layername):
"""
Download the file for a Cadastre layername.
Args:
layername (str): Short name of the Cadastre layer. Any of
'building', 'cadastralzoning', 'address'
"""
(md_path, gml_path, zip_path, group) = self.get_layer_paths(layername)
url = config.prov_url[group].format(code=self.prov_code)
self.get_atom_file(url)
[documentos] def read(self, layername, allow_empty=False, force_zip=False):
"""
Create a QGIS vector layer for a Cadastre layername.
Derive the GML filename from layername. Downloads the file if not is
present. First try to read the ZIP file, if fails try with the GML file.
Args:
layername (str): Short name of the Cadastre layer. Any of
'building', 'buildingpart', 'otherconstruction',
'cadastralparcel', 'cadastralzoning', 'address',
'thoroughfarename', 'postaldescriptor', 'adminunitname'
allow_empty (bool): If False (default), raise a exception for empty
layer, else returns None
force_zip (bool): Force to use ZIP file.
Returns:
QgsVectorLayer: Vector layer.
"""
(md_path, gml_path, zip_path, group) = self.get_layer_paths(layername)
url = config.prov_url[group].format(code=self.prov_code)
if not os.path.exists(zip_path) and (not os.path.exists(gml_path) or force_zip):
self.get_atom_file(url)
if layername == "cadastralparcel":
self.fix_encoding(gml_path, zip_path)
if layername == "address":
self.fix_amp(gml_path, zip_path)
self.get_metadata(md_path, zip_path)
if self.is_empty(gml_path, zip_path):
if not allow_empty:
raise CatIOError(_("The layer '%s' is empty") % gml_path)
else:
log.info(_("The layer '%s' is empty"), gml_path)
return None
fn = gml_path
if group == "AD":
fn += "|layername=" + layername
gml = geo.BaseLayer(fn, layername + ".gml", "ogr")
if not gml.isValid():
gml = self.get_gml_from_zip(gml_path, zip_path, group, layername)
if gml is None:
raise CatIOError(_("Failed to load layer '%s'") % gml_path)
crs = QgsCoordinateReferenceSystem.fromEpsgId(self.crs_ref)
if not crs.isValid():
raise CatIOError(_("Could not determine the CRS of '%s'") % gml_path)
gml.setCrs(crs)
log.info(_("Read %d features in '%s'"), gml.featureCount(), gml_path)
gml.source_date = self.src_date
return gml