Source code for test.test_app

import logging
import os
import unittest
from importlib import reload
from optparse import Values

import mock
from qgis.core import QgsVectorLayer

from catatom2osm import app, config, osm
from catatom2osm.exceptions import CatIOError

qgs = app.QgsSingleton()
os.environ["LANGUAGE"] = "C"
config.install_gettext("catato2osm", "")
m_log = mock.MagicMock()
m_log.app_level = logging.INFO


[docs]def get_func(f): return getattr(f, "__func__", f)
[docs]class TestQgsSingleton(unittest.TestCase):
[docs] @mock.patch("catatom2osm.app.QgsSingleton._qgs", None) @mock.patch("catatom2osm.app.gdal") @mock.patch("catatom2osm.app.QgsApplication") def test_new(self, m_qgsapp, m_gdal): q1 = app.QgsSingleton() self.assertEqual(m_qgsapp.call_count, 1) m_gdal.SetConfigOption.assert_has_calls( [ mock.call("GML_ATTRIBUTES_TO_OGR_FIELDS", "YES"), mock.call("GML_SKIP_RESOLVE_ELEMS", "ALL"), ] ) q2 = app.QgsSingleton() self.assertEqual(m_qgsapp.call_count, 1) self.assertTrue(q1 is q2)
[docs]class TestCatAtom2Osm(unittest.TestCase):
[docs] def setUp(self): options = { "building": True, "tasks": True, "list_zones": False, "log_level": "INFO", "list": "", "zoning": False, "version": False, "address": True, "manual": False, "info": False, "zone": [], "task": ["33333"], "comment": False, "split": None, "args": "33333", } self.m_app = mock.MagicMock() self.m_app.options = Values(options) self.m_app.get_translations.return_value = ([], False) self.m_app.path = "33333" self.m_app.highway_names_path = "33333/highway_names.csv" self.m_app.zone = self.m_app.options.zone self.m_app.tasks_path = "33333/tasks" self.m_app.is_new = False self.m_app.cat.get_path = lambda *args: self.m_app.path + "/" + "/".join(args)
[docs] @mock.patch("catatom2osm.app.report", mock.MagicMock()) @mock.patch("catatom2osm.catatom.Reader") @mock.patch("catatom2osm.app.os") def test_init(self, m_os, m_cat): m_cat.return_value.get_path = lambda *args: "foo/" + "/".join(args) m_os.path.exists.return_value = False m_os.path.splitext.return_value = [""] self.m_app.options.split = None self.m_app.init = get_func(app.CatAtom2Osm.__init__) self.m_app.init(self.m_app, "xxx/12345", self.m_app.options) m_cat.assert_called_once_with("xxx/12345") self.assertEqual(self.m_app.path, m_cat().path)
[docs] @mock.patch("catatom2osm.app.gdal") def test_gdal(self, m_gdal): reload(app) self.assertFalse(m_gdal.PushErrorHandler.called) config.silence_gdal = True reload(app) m_gdal.PushErrorHandler.called_once_with("CPLQuietErrorHandler")
[docs] @mock.patch("catatom2osm.app.report", mock.MagicMock()) def test_run_default(self): self.m_app.source = "building" self.m_app.building = mock.MagicMock() self.m_app.is_new = True self.m_app.options.address = True self.m_app.run = get_func(app.CatAtom2Osm.run) self.m_app.run(self.m_app) self.m_app.stop_address.assert_called_once_with()
[docs] @mock.patch("catatom2osm.app.report", mock.MagicMock()) def test_run_default_2nd(self): self.m_app.is_new = False self.m_app.options.address = True self.m_app.source = "building" self.m_app.run = get_func(app.CatAtom2Osm.run) self.m_app.run(self.m_app) self.m_app.resume_address.assert_called_once_with() self.m_app.process_tasks.assert_called_once_with(self.m_app.building)
[docs] @mock.patch("catatom2osm.app.log", m_log) @mock.patch("catatom2osm.app.geo", mock.MagicMock()) @mock.patch("catatom2osm.app.report") def test_process_building(self, m_report): m_report.values["max_level"] = {} m_report.values["min_level"] = {} self.m_app.process_building = get_func(app.CatAtom2Osm.process_building) self.m_app.process_building(self.m_app) building = self.m_app.building building.remove_outside_parts.assert_called_once_with() building.explode_multi_parts.assert_called_once_with() building.clean.assert_called_once_with() building.validate.assert_called_once()
[docs] @mock.patch("catatom2osm.app.log", m_log) @mock.patch("catatom2osm.app.report", mock.MagicMock()) def test_process_building_no_add_no_conf(self): self.m_app.process_building = get_func(app.CatAtom2Osm.process_building) self.m_app.options.address = False self.m_app.options.manual = True self.m_app.process_building(self.m_app) self.m_app.building.move_address.assert_not_called()
[docs] @mock.patch("catatom2osm.app.report", mock.MagicMock()) @mock.patch("catatom2osm.app.os") def test_process_tasks(self, m_os): m_os.path.exists.return_value = True self.m_app.tasks_folder = "tasks" self.m_app.get_tasks.return_value = { "123456A": mock.MagicMock(), "123456B": mock.MagicMock(), "123456C": mock.MagicMock(), "123456D": mock.MagicMock(), "123456E": mock.MagicMock(), } building = mock.MagicMock() building.source_date = 1234 self.m_app.parcel.getFeatures.return_value = [ {"localId": "123456E", "zone": "001"}, {"localId": "123456C", "zone": "002"}, {"localId": "123456D", "zone": "00001"}, {"localId": "123456B", "zone": "00002"}, {"localId": "123456A", "zone": "00003"}, ] self.m_app.get_task_comment = lambda x: "X" + x self.m_app.process_tasks = get_func(app.CatAtom2Osm.process_tasks) self.m_app.process_tasks(self.m_app, building) for label, task in self.m_app.get_tasks.return_value.items(): task.to_osm.assert_called_with(upload="yes", tags={"comment": "X" + label}) self.assertEqual(self.m_app.merge_address.call_count, 5)
[docs] @mock.patch("catatom2osm.app.report", mock.MagicMock()) @mock.patch("catatom2osm.app.os") @mock.patch("catatom2osm.app.type") def test_get_tasks(self, m_type, m_os): m_os.path.join = lambda *args: "/".join(args) m_os.listdir.return_value = ["1", "2", "3"] layer_class = mock.MagicMock() m_type.return_value = layer_class self.m_app.tasks = {"00001": "00001", "00002": "00001"} building = mock.MagicMock() building.get_id = lambda feat: feat["localId"] building.getFeatures.return_value = [ {"localId": "001"}, {"localId": "00001"}, {"localId": "00002"}, ] building.featureCount.return_value = 3 building.copy_feature.side_effect = [100, 101, 102] self.m_app.get_tasks = get_func(app.CatAtom2Osm.get_tasks) self.m_app.get_tasks(self.m_app, building) m_os.remove.assert_has_calls( [ mock.call("33333/tasks/1"), mock.call("33333/tasks/2"), mock.call("33333/tasks/3"), ] ) layer_class.assert_has_calls( [ mock.call(baseName="001"), mock.call().writer.addFeatures([100]), mock.call(baseName="00001"), mock.call().writer.addFeatures([101, 102]), ] )
[docs] def test_process_parcel(self): self.m_app.tasks = {"a": "a", "b": "b", "c": "c", "d": "d", "e": "e"} self.m_app.parcel.merge_by_adjacent_buildings.return_value = { "b": "a", "c": "a", } self.m_app.parcel.merge_by_parts_count.return_value = {"e": "d"} self.m_app.process_parcel = get_func(app.CatAtom2Osm.process_parcel) self.m_app.process_parcel(self.m_app) result = {"a": "a", "b": "a", "c": "a", "d": "d", "e": "d"} self.assertEqual(self.m_app.tasks, result)
[docs] @mock.patch("catatom2osm.geo.layer.base.log", m_log) def test_exit(self): self.m_app.exit = get_func(app.CatAtom2Osm.exit) self.m_app.test1 = QgsVectorLayer("Point", "test", "memory") self.m_app.test2 = QgsVectorLayer("Point", "test", "memory") self.m_app.exit(self.m_app) self.assertFalse(hasattr(self.m_app, "test1")) self.assertFalse(hasattr(self.m_app, "test2")) del self.m_app.qgs self.m_app.exit(self.m_app)
[docs] @mock.patch("catatom2osm.app.log") def test_export_layer(self, m_log): m_layer = mock.MagicMock() m_layer.export.return_value = True self.m_app.export_layer = get_func(app.CatAtom2Osm.export_layer) self.m_app.export_layer(self.m_app, m_layer, "bar", "taz") m_layer.export.assert_called_once_with("33333/bar", "taz", target_crs_id=None) output = m_log.info.call_args_list[0][0][0] self.assertIn("Generated", output) m_layer.export.return_value = False with self.assertRaises(CatIOError): self.m_app.export_layer( self.m_app, m_layer, "bar", "taz", target_crs_id=None )
[docs] @mock.patch("catatom2osm.app.os") @mock.patch("catatom2osm.app.log") @mock.patch("catatom2osm.app.open") @mock.patch("catatom2osm.app.osmxml") @mock.patch("catatom2osm.app.overpass") def test_read_osm(self, m_overpass, m_xml, m_open, m_log, m_os): self.m_app.read_osm = get_func(app.CatAtom2Osm.read_osm) m_os.path.join = lambda *args: "/".join(args) m_os.path.exists.return_value = True m_xml.deserialize.return_value.elements = [] self.m_app.read_osm(self.m_app, "bar", "taz") m_overpass.Query.assert_not_called() m_open.assert_called_with("33333/bar/taz", "rb") m_xml.deserialize.assert_called_once_with(m_open()) output = m_log.warning.call_args_list[0][0][0] self.assertIn("No OSM data", output) m_xml.deserialize.return_value.elements = [1] self.m_app.boundary_search_area = "123456" m_os.path.exists.return_value = False data = self.m_app.read_osm(self.m_app, "taz", ql="bar") m_overpass.Query.assert_called_with("123456") m_overpass.Query().add.assert_called_once_with("bar") self.assertEqual(data.elements, [1]) output = m_log.info.call_args_list[0][0][0] self.assertIn("Downloading", output)
[docs] @mock.patch("catatom2osm.app.osmxml") @mock.patch("catatom2osm.app.codecs") @mock.patch("catatom2osm.app.io") @mock.patch("catatom2osm.app.gzip") def test_write_osm(self, m_gz, m_io, m_codecs, m_xml): m_xml.serialize.return_value = "taz" data = osm.Osm() data.Node(0, 0, {"ref": "1"}) data.Node(1, 1, {"ref": "2"}) data.Node(2, 2) self.m_app.write_osm = get_func(app.CatAtom2Osm.write_osm) self.m_app.write_osm(self.m_app, data, "bar") self.assertNotIn( "ref", [k for el in data.elements for k in list(el.tags.keys())] ) m_io.open.assert_called_once_with("33333/bar", "w", encoding="utf-8") file_obj = m_io.open.return_value m_xml.serialize.assert_called_once_with(file_obj, data) m_xml.reset_mock() self.m_app.write_osm(self.m_app, data, "bar.gz") m_gz.open.assert_called_once_with("33333/bar.gz", "w") f_gz = m_gz.open.return_value m_codecs.getwriter.return_value.assert_called_once_with(f_gz)
[docs] @mock.patch("catatom2osm.app.cdau") def test_get_auxiliary_addresses(self, m_cdau): self.m_app.cat.zip_code = "29900" self.m_app.path = os.path.join("/foo", "bar") self.m_app.get_auxiliary_addresses = get_func( app.CatAtom2Osm.get_auxiliary_addresses ) self.m_app.get_auxiliary_addresses(self.m_app) m_cdau.Reader.assert_called_once_with(self.m_app.aux_path)
[docs] @mock.patch("catatom2osm.app.report", mock.MagicMock()) def test_merge_address(self): address = osm.Osm() address.Node(0, 0, {"ref": "1", "addr:street": "address1", "image": "foo"}) address.Node( 2, 0, {"ref": "2", "addr:street": "address2", "entrance": "yes", "image": "bar"}, ) address.Node(4, 0, {"ref": "3", "addr:street": "address3", "entrance": "yes"}) address.Node(6, 0, {"ref": "4", "addr:place": "address5", "entrance": "yes"}) building = osm.Osm() w0 = building.Way([], {"ref": "0"}) # building with ref not in address # no entrance address, tags to way w1 = building.Way([(0, 0), (1, 0), (1, 1), (0, 0)], {"ref": "1"}) # entrance exists, tags to node n2 = building.Node(2, 0) building.Way([n2, (3, 0), (3, 1), (2, 0)], {"ref": "2"}) # entrance don't exists, tags to way w3 = building.Way([(4, 1), (5, 0), (5, 1), (4, 1)], {"ref": "3"}) # entrance exists, tags to node in relation n5 = building.Node(6, 0) w6 = building.Way([(6, 5), (9, 5), (9, 8), (6, 8), (6, 5)]) w7 = building.Way([n5, (9, 0), (9, 3), (6, 3), (6, 0)]) w8 = building.Way([(7, 1), (8, 1), (8, 2), (7, 2), (7, 1)]) r1 = building.Relation(tags={"ref": "4"}) r1.append(w6, "outer") r1.append(w7, "outer") r1.append(w8, "inner") self.m_app.merge_address = get_func(app.CatAtom2Osm.merge_address) self.m_app.merge_address(self.m_app, building, address) self.assertNotIn("addrtags", w0.tags) self.assertEqual(w1.tags["addr:street"], "address1") self.assertNotIn("image", w1.tags) self.assertEqual(n2.tags["addr:street"], "address2") self.assertNotIn("image", n2.tags) self.assertEqual(w3.tags["addr:street"], "address3") self.assertNotIn( "addr:street", [k for n in w3.nodes for k in list(n.tags.keys())] ) self.assertEqual(n5.tags["addr:place"], "address5") address.tags["source:date"] = "foobar" self.m_app.merge_address(self.m_app, building, address) self.assertEqual(building.tags["source:date:addr"], address.tags["source:date"])
[docs] @mock.patch("catatom2osm.app.os") @mock.patch("catatom2osm.app.config") @mock.patch("catatom2osm.app.csvtools") def test_get_translations(self, m_csv, m_config, m_os): m_os.path.join = lambda *args: "/".join(args) self.m_app.get_translations = get_func(app.CatAtom2Osm.get_translations) m_config.app_path = "foo" m_config.highway_types = "bar" m_csv.csv2dict.return_value = {"RAZ": " raz "} m_os.path.exists.return_value = True address = mock.MagicMock() names = self.m_app.get_translations(self.m_app, address) m_csv.dict2csv.assert_not_called() m_csv.csv2dict.assert_has_calls( [ mock.call("foo/highway_types.csv", "bar"), mock.call("33333/highway_names.csv", {}), ] ) self.assertEqual(names, {"RAZ": "raz"}) address.get_names.return_value = {"TAZ": " taz "} m_csv.csv2dict.reset_mock() m_os.path.exists.return_value = False self.m_app.is_new = True names = self.m_app.get_translations(self.m_app, address) address.get_names.assert_called_once_with( self.m_app.get_highway.return_value, self.m_app.get_place.return_value ) m_csv.csv2dict.assert_not_called() m_csv.dict2csv.assert_has_calls( [ mock.call("foo/highway_types.csv", "bar"), mock.call("33333/highway_names.csv", {"TAZ": "taz"}, sort=1), ] ) self.assertEqual(names, {"TAZ": "taz"}) self.m_app.options.manual = True names = self.m_app.get_translations(self.m_app, address) address.get_names.assert_called_with(None, None)
[docs] @mock.patch("catatom2osm.app.geo") def test_get_highway(self, m_layer): self.m_app.read_osm.return_value = 1234 self.m_app.get_highway = get_func(app.CatAtom2Osm.get_highway) h = self.m_app.get_highway(self.m_app) h.read_from_osm.assert_called_once_with(1234)
[docs] @mock.patch("catatom2osm.app.log") @mock.patch("catatom2osm.app.report") def test_get_current_ad_osm(self, m_report, m_log): d = osm.Osm() d.Node(0, 0, {"addr:housenumber": "12", "addr:street": "foobar"}) d.Node(1, 1, {"addr:housenumber": "14", "addr:street": "foobar"}) d.Node(2, 2, {"addr:housenumber": "10", "addr:place": "bartaz"}) self.m_app.get_current_ad_osm = get_func(app.CatAtom2Osm.get_current_ad_osm) self.m_app.read_osm.return_value = d address = self.m_app.get_current_ad_osm(self.m_app) self.assertEqual(address, set(["foobar14", "foobar12", "bartaz10"])) self.assertNotIn("osm_addresses_whithout_number", m_report) d.Node(3, 3, {"addr:street": "x"}) d.Node(4, 4, {"addr:place": "y"}) self.m_app.read_osm.return_value = d address = self.m_app.get_current_ad_osm(self.m_app) self.assertEqual(m_report.osm_addresses_without_number, 2)