Files
PVPlant/lib/catastro/Spanish_Inspire_Catastral_Downloader.py
2025-01-28 00:04:13 +01:00

599 lines
23 KiB
Python

# -*- coding: utf-8 -*-
"""
/***************************************************************************
Spanish_Inspire_Catastral_Downloader
A QGIS plugin
Spanish Inspire Catastral Downloader
-------------------
begin : 2017-06-18
git sha : $Format:%H$
copyright : (C) 2017 by Patricio Soriano :: SIGdeletras.com
email : pasoriano@sigdeletras.com
***************************************************************************/
/***************************************************************************
* *
* This program is free software; you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation; either version 2 of the License, or *
* (at your option) any later version. *
* *
***************************************************************************/
"""
import json
import os
import os.path
import shutil
import socket
import subprocess
import xml.etree.ElementTree as ET
import zipfile
from urllib import parse, request
# Import the PyQt and QGIS libraries
from qgis.PyQt.QtCore import Qt
# from PyQt5.QtWidgets import QDialog
# For Debug
try:
from pydevd import *
except ImportError:
None
from PyQt5 import QtNetwork, uic
from PyQt5.QtCore import *
from PyQt5.QtGui import *
from PyQt5.QtWidgets import *
from qgis.core import Qgis
QT_VERSION = 5
os.environ['QT_API'] = 'pyqt5'
from qgis.core import *
from qgis.core import (QgsMessageLog)
from .Spanish_Inspire_Catastral_Downloader_dialog import \
Spanish_Inspire_Catastral_DownloaderDialog
from .Config import _port, _proxy
CODPROV = ''
CODMUNI = ''
class Spanish_Inspire_Catastral_Downloader:
"""QGIS Plugin Implementation."""
def __init__(self, iface):
"""Constructor.
:param iface: An interface instance that will be passed to this class
which provides the hook by which you can manipulate the QGIS
application at run time.
:type iface: QgsInterface
"""
# Save reference to the QGIS interface
self.iface = iface
self.msgBar = iface.messageBar()
self.data_dir = ''
# initialize plugin directory
self.plugin_dir = os.path.dirname(__file__)
# initialize locale
locale = QSettings().value('locale/userLocale')[0:2]
locale_path = os.path.join(
self.plugin_dir,
'i18n',
'Spanish_Inspire_Catastral_Downloader_{}.qm'.format(locale))
if os.path.exists(locale_path):
self.translator = QTranslator()
self.translator.load(locale_path)
if qVersion() > '4.3.3':
QCoreApplication.installTranslator(self.translator)
# Declare instance attributes
self.actions = []
self.menu = self.tr(u'&Spanish Inspire Catastral Downloader')
self.toolbar = self.iface.addToolBar(u'Spanish_Inspire_Catastral_Downloader')
self.toolbar.setObjectName(u'Spanish_Inspire_Catastral_Downloader')
socket.setdefaulttimeout(5)
# noinspection PyMethodMayBeStatic
def tr(self, message):
"""Get the translation for a string using Qt translation API.
We implement this ourselves since we do not inherit QObject.
:param message: String for translation.
:type message: str, QString
:returns: Translated version of message.
:rtype: QString
"""
# noinspection PyTypeChecker,PyArgumentList,PyCallByClass
return QCoreApplication.translate('Spanish_Inspire_Catastral_Downloader', message)
def add_action(
self,
icon_path,
text,
callback,
enabled_flag=True,
add_to_menu=True,
add_to_toolbar=True,
status_tip=None,
whats_this=None,
parent=None):
"""Add a toolbar icon to the toolbar.
:param icon_path: Path to the icon for this action. Can be a resource
path (e.g. ':/plugins/foo/bar.png') or a normal file system path.
:type icon_path: str
:param text: Text that should be shown in menu items for this action.
:type text: str
:param callback: Function to be called when the action is triggered.
:type callback: function
:param enabled_flag: A flag indicating if the action should be enabled
by default. Defaults to True.
:type enabled_flag: bool
:param add_to_menu: Flag indicating whether the action should also
be added to the menu. Defaults to True.
:type add_to_menu: bool
:param add_to_toolbar: Flag indicating whether the action should also
be added to the toolbar. Defaults to True.
:type add_to_toolbar: bool
:param status_tip: Optional text to show in a popup when mouse pointer
hovers over the action.
:type status_tip: str
:param parent: Parent widget for the new action. Defaults None.
:type parent: QWidget
:param whats_this: Optional text to show in the status bar when the
mouse pointer hovers over the action.
:returns: The action that was created. Note that the action is also
added to self.actions list.
:rtype: QAction
"""
# Create the dialog (after translation) and keep reference
self.dlg = Spanish_Inspire_Catastral_DownloaderDialog()
# self.dlg.setWindowFlags(Qt.WindowSystemMenuHint | Qt.WindowTitleHint)
icon = QIcon(icon_path)
action = QAction(icon, text, parent)
action.triggered.connect(callback)
action.setEnabled(enabled_flag)
if status_tip is not None:
action.setStatusTip(status_tip)
if whats_this is not None:
action.setWhatsThis(whats_this)
if add_to_toolbar:
self.toolbar.addAction(action)
if add_to_menu:
self.iface.addPluginToMenu(
self.menu,
action)
self.actions.append(action)
return action
def initGui(self):
"""Create the menu entries and toolbar icons inside the QGIS GUI."""
icon_path = ':/plugins/Spanish_Inspire_Catastral_Downloader/icon.png'
self.add_action(
icon_path,
text=self.tr(u'&Spanish Inspire Catastral Downloader'),
callback=self.run,
parent=self.iface.mainWindow())
self.dlg.pushButton_select_path.clicked.connect(self.select_output_folder)
self.dlg.pushButton_run.clicked.connect(self.download)
self.dlg.pushButton_add_layers.clicked.connect(self.add_layers)
self.dlg.comboBox_province.currentTextChanged.connect(self.on_combobox_changed)
self.dlg.comboBox_province.currentTextChanged.connect(self.on_combobox_changed)
def unload(self):
"""Removes the plugin menu item and icon from QGIS GUI."""
for action in self.actions:
self.iface.removePluginMenu(
self.tr(u'&Spanish Inspire Catastral Downloader'),
action)
self.iface.removeToolBarIcon(action)
# remove the toolbar
del self.toolbar
def select_output_folder(self) -> None:
"""Select output folder"""
self.dlg.lineEdit_path.clear()
folder = QFileDialog.getExistingDirectory(self.dlg, "Select folder")
self.dlg.lineEdit_path.setText(folder)
def check_form(self, option: int) -> None:
"""Message for fields without information"""
messages = {
1: self.tr('You must complete the data of the province and municipality and indicate the download route.'),
2: self.tr('You must select at least one cadastral entity to download.')
}
QgsMessageLog.logMessage(messages[option], 'SICD',
level=Qgis.Warning)
self.msgBar.pushMessage(messages[option], level=Qgis.Warning, duration=3)
# Progress Download
def reporthook(self, blocknum, blocksize, totalsize):
readsofar = blocknum * blocksize
if totalsize > 0:
percent = readsofar * 1e2 / totalsize
self.dlg.progressBar.setValue(int(percent))
# Set Proxy
def set_proxy(self):
proxy_handler = request.ProxyHandler({
'http': '%s:%s' % (_proxy, _port),
'https': '%s:%s' % (_proxy, _port)
})
opener = request.build_opener(proxy_handler)
request.install_opener(opener)
return
def unset_proxy(self):
""" Unset Proxy """
proxy_handler = request.ProxyHandler({})
opener = request.build_opener(proxy_handler)
request.install_opener(opener)
return
def encode_url(self, url):
""" Encode URL Download """
url = parse.urlsplit(url)
url = list(url)
url[2] = parse.quote(url[2])
encoded_link = parse.urlunsplit(url)
return encoded_link
def formatFolderName(self, foldername) -> str:
""" """
foldernameformat = foldername.replace(' ', "_")
return foldernameformat
def gml2geojson(self, input, output):
""" Convert a GML to a GeoJSON file """
try:
connect_command = """ogr2ogr -f GeoJSON {} {} -a_srs EPSG:25830""".format(output, input)
print("\n Executing: ", connect_command)
process = subprocess.Popen(connect_command, shell=True)
process.communicate()
process.wait()
QgsMessageLog.logMessage(f'09.1 Función gml2geojson()', 'SICD', level=Qgis.Info)
QgsMessageLog.logMessage(f'09.2 Input {input}', 'SICD', level=Qgis.Info)
QgsMessageLog.logMessage(f'09.3 GML {input} converted to {output}', 'SICD', level=Qgis.Info)
except Exception as err:
msg = self.tr("Error converting files to GML")
QgsMessageLog.logMessage(f'{msg}', 'SICD', level=Qgis.Warning)
self.msgBar.pushMessage(f'{msg}', level=Qgis.Warning, duration=3)
raise
return
def search_url(self, inecode_catastro, tipo, codtipo, wd):
inecode_catastro = inecode_catastro.split(' - ')[0]
CODPROV = inecode_catastro[0:2]
ATOM = f'https://www.catastro.minhap.es/INSPIRE/{tipo}/{CODPROV}/ES.SDGC.{codtipo}.atom_{CODPROV}.xml?tipo={tipo}&wd={wd}'
req = QtNetwork.QNetworkRequest(QUrl(ATOM))
self.manager_ATOM.get(req)
def generate_download_url(self, reply):
QgsMessageLog.logMessage(f'06.1 Genera url de descarga generate_download_url()', 'SICD', level=Qgis.Info)
inecode_catastro = self.dlg.comboBox_municipality.currentText().split(' - ')[0]
er = reply.error()
if er == QtNetwork.QNetworkReply.NetworkError.NoError:
bytes_string = reply.readAll()
response = str(bytes_string, 'iso-8859-1')
root = ET.fromstring(response)
for entry in root.findall('{http://www.w3.org/2005/Atom}entry'):
try:
url_cadastre = entry.find('{http://www.w3.org/2005/Atom}id').text
QgsMessageLog.logMessage(f'06.2 {url_cadastre}', 'SICD', level=Qgis.Info)
except:
msg = self.tr("The data set was not found.")
self.msgBar.pushMessage(msg, level=Qgis.Info, duration=3)
if url_cadastre is not None and url_cadastre.endswith('{}.zip'.format(inecode_catastro)):
params = parse.parse_qs(parse.urlparse(reply.request().url().toString()).query)
tipo = params['tipo'][0]
wd = params['wd'][0]
self.create_download_file(inecode_catastro, tipo, url_cadastre, wd)
break
def create_download_file(self, inecode_catastro, tipo, url, wd):
QgsMessageLog.logMessage(f'07.1 Función create_download_file', 'SICD', level=Qgis.Info)
QgsMessageLog.logMessage(
f'07.2 Parámetros inecode_catastro {inecode_catastro}, tipo {tipo}, url {url}, wd {wd})',
'SICD', level=Qgis.Info)
self.data_dir = os.path.normpath(os.path.join(wd, inecode_catastro))
QgsMessageLog.logMessage(f'07.1 {self.data_dir}', 'SICD', level=Qgis.Info)
try:
os.makedirs(self.data_dir)
QgsMessageLog.logMessage(
f'07.3 Creada carpeta en {self.data_dir})', 'SICD', level=Qgis.Success)
except OSError:
pass
zip_file = os.path.join(self.data_dir, "{}_{}.zip".format(inecode_catastro, tipo)) # poner fecha
if not os.path.exists(zip_file):
e_url = self.encode_url(url)
try:
request.urlretrieve(e_url, zip_file, self.reporthook)
QgsMessageLog.logMessage(f"7.4 Ficheros descargados correctamente en {self.data_dir}", 'SICD',
level=Qgis.Success)
txt = self.tr('Files downloaded correctly in')
msg = f'😎 {txt} <a href="file:///{self.data_dir}">{self.data_dir}</a>'
self.msgBar.pushMessage(msg, level=Qgis.Success, duration=5)
self.unzip_files(self.data_dir)
except:
shutil.rmtree(self.data_dir)
raise
else:
QApplication.restoreOverrideCursor()
txt1 = self.tr('The data set already exists in the folder')
txt2 = self.tr('You must delete them first if you want to download them to the same location')
msg = f'{txt1} <a href="file:///{self.data_dir}">{self.data_dir}</a>. {txt2} '
QgsMessageLog.logMessage(msg, 'SICD', level=Qgis.Critical)
self.msgBar.pushMessage(msg, level=Qgis.Critical)
pass
def unzip_files(self, wd):
try:
if os.path.isdir(wd):
for zipfilecatastro in os.listdir(wd):
if zipfilecatastro.endswith('.zip'):
with zipfile.ZipFile(os.path.join(wd, zipfilecatastro), "r") as z:
z.extractall(wd)
QgsMessageLog.logMessage(f'08.1 Zip descomprimidos', 'SICD', level=Qgis.Info)
self.dlg.pushButton_add_layers.setEnabled(1)
else:
msg = self.tr("Select at least one data set to download")
self.msgBar.pushMessage(msg, level=Qgis.Critical)
return
except:
self.msgBar.pushMessage(self.tr("An error occurred while decompressing the file."), level=Qgis.Warning, duration=3)
QApplication.restoreOverrideCursor()
self.dlg.progressBar.setValue(100) # No llega al 100% aunque lo descargue,es random
QApplication.restoreOverrideCursor()
def add_layers(self):
inecode_catastro = self.dlg.comboBox_municipality.currentText().split(' - ')
zippath = self.dlg.lineEdit_path.text()
wd = os.path.join(zippath, inecode_catastro[0])
group_name = self.dlg.comboBox_municipality.currentText()
project = QgsProject.instance()
tree_root = project.layerTreeRoot()
layers_group = tree_root.addGroup(group_name)
for gmlfile in os.listdir(wd):
if gmlfile.endswith('.gml'):
layer_path = os.path.join(wd, gmlfile)
file_name = os.path.splitext(gmlfile)[0]
QgsMessageLog.logMessage(layer_path, 'SICD', level=Qgis.Info)
gml_layer = QgsVectorLayer(layer_path, file_name, "ogr")
project.addMapLayer(gml_layer, False)
layers_group.addLayer(gml_layer)
QgsMessageLog.logMessage("10. Capas cargadas", 'SICD', level=Qgis.Info)
def run(self):
"""Run method that performs all the real work"""
self.dlg.lineEdit_path.clear()
self.dlg.comboBox_province.clear()
self.dlg.comboBox_municipality.clear()
self.obtener_provincias()
self.dlg.checkBox_parcels.setChecked(0)
self.dlg.checkBox_buildings.setChecked(0)
self.dlg.checkBox_addresses.setChecked(0)
# self.dlg.checkBox_load_layers.setChecked(0)
self.dlg.pushButton_add_layers.setEnabled(0)
# show the dialog
self.dlg.progressBar.setValue(0)
self.dlg.setWindowIcon(QIcon(':/plugins/Spanish_Inspire_Catastral_Downloader/icon.png'));
self.dlg.show()
# Run the dialog event loop
result = self.dlg.exec_()
# See if OK was pressed
if result: pass
def on_combobox_changed(self):
self.dlg.lineEdit_path.clear()
self.dlg.checkBox_parcels.setChecked(0)
self.dlg.checkBox_buildings.setChecked(0)
self.dlg.checkBox_addresses.setChecked(0)
self.dlg.pushButton_add_layers.setEnabled(0)
def obtener_provincias(self):
QgsMessageLog.logMessage("01.1 Obtenindo provincias (obtener_provincias)", 'SICD', level=Qgis.Info)
self.manager_provincias = QtNetwork.QNetworkAccessManager()
self.manager_provincias.finished.connect(self.rellenar_provincias)
url = 'http://ovc.catastro.meh.es/OVCServWeb/OVCWcfCallejero/COVCCallejero.svc/json/ObtenerProvincias'
QgsMessageLog.logMessage(f'01.2 URL JSON Provincias de Catastro {url}', 'SICD', level=Qgis.Info)
req = QtNetwork.QNetworkRequest(QUrl(url))
self.manager_provincias.get(req)
def rellenar_provincias(self, reply):
QgsMessageLog.logMessage("02. Rellenando provincias (rellenar_provincias)", 'SICD', level=Qgis.Info)
er = reply.error()
if er == QtNetwork.QNetworkReply.NetworkError.NoError:
bytes_string = reply.readAll()
response = str(bytes_string, 'utf-8')
response_json = json.loads(response)
provincias = response_json['consulta_provincieroResult']['provinciero']['prov']
list_provincias = [self.tr('Select a province...')]
for provincia in provincias:
list_provincias.append('{} - {}'.format(provincia['cpine'], provincia['np']))
self.dlg.comboBox_province.addItems(list_provincias)
self.dlg.comboBox_province.currentIndexChanged.connect(self.obtener_municipos)
def obtener_municipos(self):
try:
self.manager_municipios = QtNetwork.QNetworkAccessManager()
self.manager_municipios.finished.connect(self.rellenar_municipios)
provincia_cod = self.dlg.comboBox_province.currentText()
msg = f'03.1 Obteniendo municipios (obtener_municipios) de la provincia {provincia_cod}'
QgsMessageLog.logMessage(msg, 'SICD', level=Qgis.Info)
provincia = provincia_cod.split(' - ')[0]
url = 'http://ovc.catastro.meh.es/OVCServWeb/OVCWcfCallejero/COVCCallejeroCodigos.svc/json/ObtenerMunicipiosCodigos?CodigoProvincia=' + str(
provincia)
QgsMessageLog.logMessage(f'03.2 URL JSON Municipios de Catastro de la {provincia_cod}: {url}', 'SICD',
level=Qgis.Info)
req = QtNetwork.QNetworkRequest(QUrl(url))
self.manager_municipios.get(req)
except Exception as e:
print(e)
def rellenar_municipios(self, reply):
er = reply.error()
if er == QtNetwork.QNetworkReply.NetworkError.NoError:
bytes_string = reply.readAll()
response = str(bytes_string, 'utf-8')
response_json = json.loads(response)
list_municipios = []
try:
municipios = response_json['consulta_municipieroResult']['municipiero']['muni']
QgsMessageLog.logMessage("04. Rellenando municipios (rellenar_municipios)", 'SICD', level=Qgis.Info)
for municipio in municipios:
codigo_provincia = str(municipio['locat']['cd']).zfill(2)
codigo_municipio = str(municipio['locat']['cmc']).zfill(3)
codigo = codigo_provincia + codigo_municipio
list_municipios.append(codigo + ' - ' + municipio['nm'])
except:
pass
self.dlg.comboBox_municipality.clear()
self.dlg.comboBox_municipality.addItems(list_municipios)
def download(self):
"""Download data funtion"""
if self.dlg.comboBox_municipality.currentText() == '' or self.dlg.lineEdit_path.text() == '':
self.check_form(1)
elif not (
self.dlg.checkBox_parcels.isChecked() or self.dlg.checkBox_buildings.isChecked() or self.dlg.checkBox_addresses.isChecked()):
self.check_form(2)
else:
QgsMessageLog.logMessage("05 Inicio de descarga", 'SICD', level=Qgis.Info)
try:
QApplication.setOverrideCursor(QCursor(Qt.WaitCursor))
inecode_catastro = self.dlg.comboBox_municipality.currentText()
zippath = self.dlg.lineEdit_path.text()
# wd = os.path.join(zippath , inecode_catastro.replace(' ', "_"))
# wd = os.path.join(zippath, CODMUNI)
QgsMessageLog.logMessage(f'05.1 Genera variables zippath {zippath}', 'SICD', level=Qgis.Info)
proxy_support = request.ProxyHandler({})
opener = request.build_opener(proxy_support)
request.install_opener(opener)
# Estabelcemos un proxy si lo ha definido el usuario
try:
if (_proxy is not None and _proxy != "") and (_port is not None and _port != ""):
self.set_proxy()
else:
self.unset_proxy()
except Exception as e:
QApplication.restoreOverrideCursor()
txt = self.tr('Error setting proxy')
msg = f"{txt} : {str(e)}"
self.msgBar.pushMessage(msg, level=Qgis.Warning, duration=3)
raise
self.manager_ATOM = QtNetwork.QNetworkAccessManager()
self.manager_ATOM.finished.connect(self.generate_download_url)
if self.dlg.checkBox_parcels.isChecked():
self.search_url(inecode_catastro, 'CadastralParcels', 'CP', zippath)
if self.dlg.checkBox_buildings.isChecked():
self.search_url(inecode_catastro, 'Buildings', 'BU', zippath)
if self.dlg.checkBox_addresses.isChecked():
self.search_url(inecode_catastro, 'Addresses', 'AD', zippath)
except Exception as e:
QApplication.restoreOverrideCursor()
self.dlg.pushButton_add_layers.setEnabled(0)
self.msgBar.pushMessage(self.tr("Failed!") + str(e), level=Qgis.Warning, duration=3)