440 lines
14 KiB
Python
440 lines
14 KiB
Python
from whitebox_tools import WhiteboxTools
|
|
|
|
class D8Pointer(WhiteboxTools):
|
|
def run(self, args, working_directory, verbose = False):
|
|
import os
|
|
import multiprocessing
|
|
from whitebox_tools import Raster
|
|
|
|
def get_formatted_elapsed_time(start):
|
|
elapsed_time = round((time.time() - start), 4)
|
|
if elapsed_time < 60.0:
|
|
return "{} seconds".format(elapsed_time)
|
|
elif elapsed_time < 3600.0:
|
|
mins = int(elapsed_time / 60.0)
|
|
secs = elapsed_time - (mins * 60.0)
|
|
return "{} minutes {} seconds".format(mins, round(secs, 1))
|
|
else:
|
|
hours = int(elapsed_time / 3600.0)
|
|
mins = int((elapsed_time - (hours * 3600.0)) / 60.0)
|
|
secs = elapsed_time - (hours * 3600.0) - (mins * 60.0)
|
|
return "{} hours {} minutes {} seconds".format(hours, mins, round(secs, 1))
|
|
|
|
input_file = ""
|
|
output_file = ""
|
|
esri_style = False
|
|
|
|
if len(args) == 0:
|
|
raise ValueError("Tool run with no parameters.")
|
|
|
|
sep = os.path.sep
|
|
|
|
progress = 0
|
|
old_progress = 1
|
|
input = Raster(input_file)
|
|
|
|
start = time.time()
|
|
cell_size_x = input.header.cell_size_x
|
|
cell_size_y = input.header.cell_size_y
|
|
diag_cell_size = (cell_size_x * cell_size_x + cell_size_y * cell_size_y) ** 0.5
|
|
|
|
rows = input.header.rows
|
|
nodata = input.header.nodata
|
|
out_nodata = -32768
|
|
columns = input.header.columns
|
|
|
|
num_procs = multiprocessing.cpu_count()
|
|
def process_rows(tid):
|
|
d_x = [1, 1, 1, 0, -1, -1, -1, 0]
|
|
d_y = [-1, 0, 1, 1, 1, 0, -1, -1]
|
|
grid_lengths = [diag_cell_size,
|
|
cell_size_x,
|
|
diag_cell_size,
|
|
cell_size_y,
|
|
diag_cell_size,
|
|
cell_size_x,
|
|
diag_cell_size,
|
|
cell_size_y]
|
|
out_vals = [0, 1, 2, 4, 8, 16, 32, 64] if esri_style else [1, 2, 4, 8, 16, 32, 64, 128]
|
|
|
|
for row in range(rows):
|
|
if row % num_procs == tid:
|
|
data = [out_nodata] * columns
|
|
for col in range(columns):
|
|
z = input.data[row][col]
|
|
if z != nodata:
|
|
dir = 0
|
|
max_slope = float("-inf")
|
|
for i in range(8):
|
|
z_n = input.data[row + d_y[i]][col + d_x[i]]
|
|
if z_n != nodata:
|
|
slope = (z - z_n) / grid_lengths[i]
|
|
if slope > max_slope and slope > 0:
|
|
max_slope = slope
|
|
dir = i
|
|
if max_slope >= 0:
|
|
data[col] = out_vals[dir]
|
|
else:
|
|
data[col] = 0
|
|
tx1.send((row, data))
|
|
|
|
output = [[out_nodata] * columns for _ in range(rows)]
|
|
tx, rx = multiprocessing.Queue(), multiprocessing.Queue()
|
|
procs = []
|
|
for tid in range(num_procs):
|
|
p = multiprocessing.Process(target=process_rows, args=(tid,))
|
|
procs.append(p)
|
|
p.start()
|
|
|
|
for _ in range(rows):
|
|
data = rx.get()
|
|
row = data[0]
|
|
output[row] = data[1]
|
|
|
|
input = None
|
|
|
|
output_raster = Raster(output_file, "w")
|
|
output_raster.header = input.header
|
|
output_raster.data = output
|
|
elapsed_time = get_formatted_elapsed_time(start)
|
|
output_raster.header.nodata = out_nodata
|
|
output_raster.header.data_type = "i16"
|
|
output_raster.header.palette = "qual.plt"
|
|
output_raster.header.photometric_interp = "categorical"
|
|
output_raster.metadata.append(
|
|
"Created by whitebox_tools' {} tool".format(self.get_tool_name())
|
|
)
|
|
output_raster.metadata.append("Input file: {}".format(input_file))
|
|
output_raster.metadata.append(
|
|
"ESRI-style output: {}".format("true" if esri_style else "false")
|
|
)
|
|
output_raster.metadata.append(
|
|
"Elapsed Time (excluding I/O): {}".format(elapsed_time)
|
|
)
|
|
output_raster.write_raster()
|
|
return None
|
|
|
|
class Basin:
|
|
# Asignar valores a las direcciones D8
|
|
directions = {
|
|
'N': 1,
|
|
'NE': 2,
|
|
'E': 3,
|
|
'SE': 4,
|
|
'S': 5,
|
|
'SW': 6,
|
|
'W': 7,
|
|
'NW': 8
|
|
}
|
|
def __init__(self, mesh):
|
|
''' initialize '''
|
|
self.mesh = mesh
|
|
|
|
|
|
def calculate_flow_direction(mesh):
|
|
# Crear una lista para almacenar las direcciones del flujo
|
|
flow_directions = []
|
|
|
|
# Iterar sobre cada faceta
|
|
for facet in mesh.Facets:
|
|
# Calcular la dirección del flujo basándose en la inclinación
|
|
flow_direction = calculate_flow_direction_from_inclination(facet.Normal)
|
|
|
|
# Agregar la dirección del flujo a la lista
|
|
flow_directions.append(flow_direction)
|
|
|
|
return flow_directions
|
|
|
|
def calculate_flow_direction(mesh):
|
|
import time
|
|
start = time.time()
|
|
data = []
|
|
total_facets = mesh.CountFacets
|
|
for i, facet in enumerate(mesh.Facets):
|
|
c = facet.CircumCircle[0]
|
|
dir = 0
|
|
max_slope = 0.0 #float("-inf")
|
|
for ind in facet.NeighbourIndices:
|
|
if ind >= total_facets:
|
|
continue
|
|
c_n = mesh.Facets[ind].CircumCircle[0]
|
|
l = c.sub(c_n)
|
|
l.z = 0
|
|
slope = (c.z - c_n.z) / l.Length
|
|
if slope > max_slope and slope > 0:
|
|
max_slope = slope
|
|
dir = ind
|
|
if max_slope >= 0:
|
|
data.append(dir)
|
|
else:
|
|
data.append(-1)
|
|
if i == 100:
|
|
break
|
|
print(time.time() - start)
|
|
return data
|
|
|
|
def calculate_flow_direction_from_inclination(inclination):
|
|
# Obtener los componentes de inclinación en los ejes X, Y
|
|
inclination_x = inclination.x
|
|
inclination_y = inclination.y
|
|
|
|
#TODO: ver si se pude devolver un vector
|
|
|
|
# Asignar valores a las direcciones D8
|
|
directions = {
|
|
'N': 1,
|
|
'NE': 2,
|
|
'E': 3,
|
|
'SE': 4,
|
|
'S': 5,
|
|
'SW': 6,
|
|
'W': 7,
|
|
'NW': 8
|
|
}
|
|
|
|
# Calcular la dirección del flujo basándose en los componentes de inclinación
|
|
if inclination_x > 0:
|
|
if inclination_y > 0:
|
|
return directions['NE']
|
|
elif inclination_y < 0:
|
|
return directions['SE']
|
|
else:
|
|
return directions['E']
|
|
elif inclination_x < 0:
|
|
if inclination_y > 0:
|
|
return directions['NW']
|
|
elif inclination_y < 0:
|
|
return directions['SW']
|
|
else:
|
|
return directions['W']
|
|
else:
|
|
if inclination_y > 0:
|
|
return directions['N']
|
|
elif inclination_y < 0:
|
|
return directions['S']
|
|
else:
|
|
return None
|
|
|
|
|
|
# Calcular la dirección del flujo utilizando la función
|
|
flow_directions = calculate_flow_direction(land)
|
|
|
|
|
|
def delineate_watersheds(mesh, flow_direction):
|
|
# Obtener el número de caras (celdas) en la malla
|
|
num_faces = mesh.CountFacets
|
|
|
|
# Crear una lista para almacenar las etiquetas de las cuencas
|
|
watersheds = [0] * num_faces
|
|
|
|
# Inicializar el contador de etiquetas
|
|
label = 1
|
|
|
|
# Recorrer cada cara de la malla
|
|
for face_index in range(num_faces):
|
|
# Si la cara aún no tiene etiqueta asignada
|
|
if watersheds[face_index] == 0:
|
|
# Iniciar el seguimiento aguas arriba
|
|
label = delineate_upstream(mesh, face_index, label, flow_direction, watersheds)
|
|
return watersheds
|
|
|
|
def delineate_upstream(mesh, face_index, label, flow_direction, watersheds):
|
|
# Obtener la dirección del flujo para la cara actual
|
|
direction = flow_direction[face_index]
|
|
|
|
# Si la cara no tiene dirección de flujo o ya tiene una etiqueta asignada
|
|
if direction == 0 or watersheds[face_index] != 0:
|
|
return label
|
|
|
|
# Asignar la etiqueta actual a la cara
|
|
watersheds[face_index] = label
|
|
|
|
# Obtener los índices de las caras vecinas en la dirección del flujo
|
|
neighbor_face_indices = mesh.Facets[face_index].NeighbourIndices
|
|
|
|
# Realizar el seguimiento aguas arriba recursivamente para cada cara vecina
|
|
for neighbor_index in neighbor_face_indices:
|
|
delineate_upstream(mesh, neighbor_index, label, flow_direction, watersheds)
|
|
label +=1
|
|
return label
|
|
|
|
# Supongamos que ya has obtenido la malla (mesh) y la matriz de dirección del flujo (flow_direction)
|
|
# ...
|
|
|
|
# Delimitar las cuencas hidrográficas
|
|
watersheds = delineate_watersheds(mesh, flow_directions)
|
|
|
|
# Imprimir las etiquetas de las cuencas hidrográficas
|
|
print(watersheds)
|
|
|
|
import FreeCAD as App
|
|
import Part
|
|
|
|
def create_contour_wire(mesh, watershed_label):
|
|
# Obtener las caras de la malla pertenecientes a la cuenca hidrográfica
|
|
faces = [i for i, label in enumerate(watersheds) if label == watershed_label]
|
|
|
|
# Obtener los vértices de las caras seleccionadas
|
|
vertices = set()
|
|
for face_index in faces:
|
|
vertices.update(mesh.GetFace(face_index))
|
|
|
|
# Crear una lista de puntos para la wire
|
|
points = [App.Vector(*vertex) for vertex in vertices]
|
|
|
|
# Crear la wire a partir de la lista de puntos
|
|
wire = Part.makePolygon(points)
|
|
|
|
return wire
|
|
|
|
# Supongamos que ya has obtenido la malla (mesh) y la lista de etiquetas de las cuencas hidrográficas (watersheds)
|
|
# ...
|
|
|
|
# Definir el número de la cuenca hidrográfica para la que deseas generar la wire
|
|
cuenca_numero = 1
|
|
|
|
# Crear la wire para el contorno externo de la cuenca hidrográfica seleccionada
|
|
wire_cuenca = create_contour_wire(mesh, cuenca_numero)
|
|
|
|
# Agregar la wire al documento de FreeCAD
|
|
App.ActiveDocument.addObject("Part::Feature", "ContornoCuenca").Shape = wire_cuenca
|
|
|
|
import FreeCAD
|
|
|
|
def obtener_coordenadas(malla):
|
|
num_vertices = len(malla)
|
|
coordenadas = np.zeros((num_vertices, 3))
|
|
|
|
for i, vertice in enumerate(malla):
|
|
coordenadas[i] = vertice.Point
|
|
|
|
return coordenadas
|
|
|
|
def delimitar_cuencas(malla):
|
|
coordenadas = obtener_coordenadas(malla)
|
|
num_vertices = coordenadas.shape[0]
|
|
cuencas = np.zeros(num_vertices, dtype=int)
|
|
altura = np.max(coordenadas[:, 2]) + 1 # Altura inicial para iniciar el flujo
|
|
|
|
while True:
|
|
cambios = False
|
|
|
|
for i in range(num_vertices):
|
|
vecinos = np.where((coordenadas[:, 0] == coordenadas[i, 0]) & (coordenadas[:, 1] == coordenadas[i, 1]))[0]
|
|
|
|
for v in vecinos:
|
|
if coordenadas[v, 2] < altura and cuencas[v] != cuencas[i]:
|
|
cuencas[i] = cuencas[v]
|
|
cambios = True
|
|
|
|
altura -= 1
|
|
|
|
if not cambios:
|
|
break
|
|
|
|
return cuencas
|
|
|
|
# Obtener la malla de FreeCAD (ejemplo)
|
|
malla_fcad = FreeCAD.ActiveDocument.Objects[0].Shape.Faces[0].Mesh
|
|
malla = malla_fcad.Topology
|
|
|
|
# Delimitar cuencas hidrológicas
|
|
cuencas = delimitar_cuencas(malla)
|
|
|
|
# Visualizar las cuencas en un gráfico con PyCairo (ejemplo)
|
|
import cairo
|
|
|
|
width, height = 800, 600
|
|
surface = cairo.ImageSurface(cairo.FORMAT_ARGB32, width, height)
|
|
ctx = cairo.Context(surface)
|
|
ctx.scale(width, height)
|
|
|
|
# Dibujar las cuencas en el gráfico
|
|
for i, face in enumerate(malla_fcad.Facets):
|
|
ctx.set_source_rgba(0.5, 0.5, 0.5, 0.5) # Color de relleno
|
|
ctx.set_line_width(0.01) # Ancho de línea
|
|
ctx.move_to(face.Vertexes[0].Point.x, face.Vertexes[0].Point.y)
|
|
|
|
for j in range(1, len(face.Vertexes)):
|
|
ctx.line_to(face.Vertexes[j].Point.x, face.Vertexes[j].Point.y)
|
|
|
|
ctx.close_path()
|
|
ctx.stroke_preserve()
|
|
|
|
cuenca = cuencas[i]
|
|
color = (cuenca % 255) / 255.0 # Color basado en el número de cuenca
|
|
ctx.set_source_rgba(color, 0.0, 0.0, 0.5) # Color de borde
|
|
ctx.fill()
|
|
|
|
surface.write_to_png('cuencas_hidrologicas.png')
|
|
|
|
|
|
def calculate_triangle_slope(triangle):
|
|
# Calcula la pendiente o pendiente acumulada del triángulo
|
|
normal = triangle.Normal
|
|
# return normal.z / (normal.x ** 2 + normal.y ** 2) ** 0.5
|
|
# or
|
|
proyection = FreeCAD.Vector(normal)
|
|
proyection.z = 0
|
|
return normal.z/proyection.Length
|
|
|
|
def calculate_flow_accumulation(points, triangles):
|
|
num_points = len(points)
|
|
flow_accumulation = np.zeros(num_points, dtype=np.float64)
|
|
flow_direction = np.full(num_points, -1, dtype=np.int32)
|
|
|
|
for i in range(num_points):
|
|
connected_triangles = []
|
|
|
|
for j, triangle in enumerate(triangles):
|
|
if i in triangle:
|
|
connected_triangles.append(j)
|
|
|
|
for j in connected_triangles:
|
|
triangle = triangles[j]
|
|
area = triangle.Area
|
|
slope = calculate_triangle_slope(triangle)
|
|
|
|
# Actualizar la dirección del flujo si la pendiente es mayor que la dirección actual
|
|
if flow_direction[i] == -1 or slope > flow_direction[i]:
|
|
flow_direction[i] = slope
|
|
|
|
# Incrementar la acumulación de flujo
|
|
flow_accumulation[i] += area * slope
|
|
|
|
return flow_accumulation, flow_direction
|
|
|
|
import numpy as np
|
|
from queue import Queue
|
|
|
|
def delineate_watersheds(points, triangles, flow_direction, outlet_points):
|
|
num_points = len(points)
|
|
watersheds = np.full(num_points, -1, dtype=np.int32)
|
|
visited = np.zeros(num_points, dtype=np.bool)
|
|
|
|
for outlet_point in outlet_points:
|
|
queue = Queue()
|
|
queue.put(outlet_point)
|
|
|
|
while not queue.empty():
|
|
current_point = queue.get()
|
|
watersheds[current_point] = outlet_point
|
|
visited[current_point] = True
|
|
|
|
connected_triangles = []
|
|
|
|
for j, triangle in enumerate(triangles):
|
|
if current_point in triangle:
|
|
connected_triangles.append(j)
|
|
|
|
for j in connected_triangles:
|
|
triangle = triangles[j]
|
|
for point in triangle:
|
|
if not visited[point] and flow_direction[point] <= flow_direction[current_point]:
|
|
queue.put(point)
|
|
visited[point] = True
|
|
|
|
return watersheds
|
|
|
|
|