from whitebox_tools import WhiteboxTools class D8Pointer(WhiteboxTools): def run(self, args, working_directory, verbose = False): import os import multiprocessing from whitebox_tools import Raster def get_formatted_elapsed_time(start): elapsed_time = round((time.time() - start), 4) if elapsed_time < 60.0: return "{} seconds".format(elapsed_time) elif elapsed_time < 3600.0: mins = int(elapsed_time / 60.0) secs = elapsed_time - (mins * 60.0) return "{} minutes {} seconds".format(mins, round(secs, 1)) else: hours = int(elapsed_time / 3600.0) mins = int((elapsed_time - (hours * 3600.0)) / 60.0) secs = elapsed_time - (hours * 3600.0) - (mins * 60.0) return "{} hours {} minutes {} seconds".format(hours, mins, round(secs, 1)) input_file = "" output_file = "" esri_style = False if len(args) == 0: raise ValueError("Tool run with no parameters.") sep = os.path.sep progress = 0 old_progress = 1 input = Raster(input_file) start = time.time() cell_size_x = input.header.cell_size_x cell_size_y = input.header.cell_size_y diag_cell_size = (cell_size_x * cell_size_x + cell_size_y * cell_size_y) ** 0.5 rows = input.header.rows nodata = input.header.nodata out_nodata = -32768 columns = input.header.columns num_procs = multiprocessing.cpu_count() def process_rows(tid): d_x = [1, 1, 1, 0, -1, -1, -1, 0] d_y = [-1, 0, 1, 1, 1, 0, -1, -1] grid_lengths = [diag_cell_size, cell_size_x, diag_cell_size, cell_size_y, diag_cell_size, cell_size_x, diag_cell_size, cell_size_y] out_vals = [0, 1, 2, 4, 8, 16, 32, 64] if esri_style else [1, 2, 4, 8, 16, 32, 64, 128] for row in range(rows): if row % num_procs == tid: data = [out_nodata] * columns for col in range(columns): z = input.data[row][col] if z != nodata: dir = 0 max_slope = float("-inf") for i in range(8): z_n = input.data[row + d_y[i]][col + d_x[i]] if z_n != nodata: slope = (z - z_n) / grid_lengths[i] if slope > max_slope and slope > 0: max_slope = slope dir = i if max_slope >= 0: data[col] = out_vals[dir] else: data[col] = 0 tx1.send((row, data)) output = [[out_nodata] * columns for _ in range(rows)] tx, rx = multiprocessing.Queue(), multiprocessing.Queue() procs = [] for tid in range(num_procs): p = multiprocessing.Process(target=process_rows, args=(tid,)) procs.append(p) p.start() for _ in range(rows): data = rx.get() row = data[0] output[row] = data[1] input = None output_raster = Raster(output_file, "w") output_raster.header = input.header output_raster.data = output elapsed_time = get_formatted_elapsed_time(start) output_raster.header.nodata = out_nodata output_raster.header.data_type = "i16" output_raster.header.palette = "qual.plt" output_raster.header.photometric_interp = "categorical" output_raster.metadata.append( "Created by whitebox_tools' {} tool".format(self.get_tool_name()) ) output_raster.metadata.append("Input file: {}".format(input_file)) output_raster.metadata.append( "ESRI-style output: {}".format("true" if esri_style else "false") ) output_raster.metadata.append( "Elapsed Time (excluding I/O): {}".format(elapsed_time) ) output_raster.write_raster() return None class Basin: # Asignar valores a las direcciones D8 directions = { 'N': 1, 'NE': 2, 'E': 3, 'SE': 4, 'S': 5, 'SW': 6, 'W': 7, 'NW': 8 } def __init__(self, mesh): ''' initialize ''' self.mesh = mesh def calculate_flow_direction(mesh): # Crear una lista para almacenar las direcciones del flujo flow_directions = [] # Iterar sobre cada faceta for facet in mesh.Facets: # Calcular la dirección del flujo basándose en la inclinación flow_direction = calculate_flow_direction_from_inclination(facet.Normal) # Agregar la dirección del flujo a la lista flow_directions.append(flow_direction) return flow_directions def calculate_flow_direction(mesh): import time start = time.time() data = [] total_facets = mesh.CountFacets for i, facet in enumerate(mesh.Facets): c = facet.CircumCircle[0] dir = 0 max_slope = 0.0 #float("-inf") for ind in facet.NeighbourIndices: if ind >= total_facets: continue c_n = mesh.Facets[ind].CircumCircle[0] l = c.sub(c_n) l.z = 0 slope = (c.z - c_n.z) / l.Length if slope > max_slope and slope > 0: max_slope = slope dir = ind if max_slope >= 0: data.append(dir) else: data.append(-1) if i == 100: break print(time.time() - start) return data def calculate_flow_direction_from_inclination(inclination): # Obtener los componentes de inclinación en los ejes X, Y inclination_x = inclination.x inclination_y = inclination.y #TODO: ver si se pude devolver un vector # Asignar valores a las direcciones D8 directions = { 'N': 1, 'NE': 2, 'E': 3, 'SE': 4, 'S': 5, 'SW': 6, 'W': 7, 'NW': 8 } # Calcular la dirección del flujo basándose en los componentes de inclinación if inclination_x > 0: if inclination_y > 0: return directions['NE'] elif inclination_y < 0: return directions['SE'] else: return directions['E'] elif inclination_x < 0: if inclination_y > 0: return directions['NW'] elif inclination_y < 0: return directions['SW'] else: return directions['W'] else: if inclination_y > 0: return directions['N'] elif inclination_y < 0: return directions['S'] else: return None # Calcular la dirección del flujo utilizando la función flow_directions = calculate_flow_direction(land) def delineate_watersheds(mesh, flow_direction): # Obtener el número de caras (celdas) en la malla num_faces = mesh.CountFacets # Crear una lista para almacenar las etiquetas de las cuencas watersheds = [0] * num_faces # Inicializar el contador de etiquetas label = 1 # Recorrer cada cara de la malla for face_index in range(num_faces): # Si la cara aún no tiene etiqueta asignada if watersheds[face_index] == 0: # Iniciar el seguimiento aguas arriba label = delineate_upstream(mesh, face_index, label, flow_direction, watersheds) return watersheds def delineate_upstream(mesh, face_index, label, flow_direction, watersheds): # Obtener la dirección del flujo para la cara actual direction = flow_direction[face_index] # Si la cara no tiene dirección de flujo o ya tiene una etiqueta asignada if direction == 0 or watersheds[face_index] != 0: return label # Asignar la etiqueta actual a la cara watersheds[face_index] = label # Obtener los índices de las caras vecinas en la dirección del flujo neighbor_face_indices = mesh.Facets[face_index].NeighbourIndices # Realizar el seguimiento aguas arriba recursivamente para cada cara vecina for neighbor_index in neighbor_face_indices: delineate_upstream(mesh, neighbor_index, label, flow_direction, watersheds) label +=1 return label # Supongamos que ya has obtenido la malla (mesh) y la matriz de dirección del flujo (flow_direction) # ... # Delimitar las cuencas hidrográficas watersheds = delineate_watersheds(mesh, flow_directions) # Imprimir las etiquetas de las cuencas hidrográficas print(watersheds) import FreeCAD as App import Part def create_contour_wire(mesh, watershed_label): # Obtener las caras de la malla pertenecientes a la cuenca hidrográfica faces = [i for i, label in enumerate(watersheds) if label == watershed_label] # Obtener los vértices de las caras seleccionadas vertices = set() for face_index in faces: vertices.update(mesh.GetFace(face_index)) # Crear una lista de puntos para la wire points = [App.Vector(*vertex) for vertex in vertices] # Crear la wire a partir de la lista de puntos wire = Part.makePolygon(points) return wire # Supongamos que ya has obtenido la malla (mesh) y la lista de etiquetas de las cuencas hidrográficas (watersheds) # ... # Definir el número de la cuenca hidrográfica para la que deseas generar la wire cuenca_numero = 1 # Crear la wire para el contorno externo de la cuenca hidrográfica seleccionada wire_cuenca = create_contour_wire(mesh, cuenca_numero) # Agregar la wire al documento de FreeCAD App.ActiveDocument.addObject("Part::Feature", "ContornoCuenca").Shape = wire_cuenca import FreeCAD def obtener_coordenadas(malla): num_vertices = len(malla) coordenadas = np.zeros((num_vertices, 3)) for i, vertice in enumerate(malla): coordenadas[i] = vertice.Point return coordenadas def delimitar_cuencas(malla): coordenadas = obtener_coordenadas(malla) num_vertices = coordenadas.shape[0] cuencas = np.zeros(num_vertices, dtype=int) altura = np.max(coordenadas[:, 2]) + 1 # Altura inicial para iniciar el flujo while True: cambios = False for i in range(num_vertices): vecinos = np.where((coordenadas[:, 0] == coordenadas[i, 0]) & (coordenadas[:, 1] == coordenadas[i, 1]))[0] for v in vecinos: if coordenadas[v, 2] < altura and cuencas[v] != cuencas[i]: cuencas[i] = cuencas[v] cambios = True altura -= 1 if not cambios: break return cuencas # Obtener la malla de FreeCAD (ejemplo) malla_fcad = FreeCAD.ActiveDocument.Objects[0].Shape.Faces[0].Mesh malla = malla_fcad.Topology # Delimitar cuencas hidrológicas cuencas = delimitar_cuencas(malla) # Visualizar las cuencas en un gráfico con PyCairo (ejemplo) import cairo width, height = 800, 600 surface = cairo.ImageSurface(cairo.FORMAT_ARGB32, width, height) ctx = cairo.Context(surface) ctx.scale(width, height) # Dibujar las cuencas en el gráfico for i, face in enumerate(malla_fcad.Facets): ctx.set_source_rgba(0.5, 0.5, 0.5, 0.5) # Color de relleno ctx.set_line_width(0.01) # Ancho de línea ctx.move_to(face.Vertexes[0].Point.x, face.Vertexes[0].Point.y) for j in range(1, len(face.Vertexes)): ctx.line_to(face.Vertexes[j].Point.x, face.Vertexes[j].Point.y) ctx.close_path() ctx.stroke_preserve() cuenca = cuencas[i] color = (cuenca % 255) / 255.0 # Color basado en el número de cuenca ctx.set_source_rgba(color, 0.0, 0.0, 0.5) # Color de borde ctx.fill() surface.write_to_png('cuencas_hidrologicas.png') def calculate_triangle_slope(triangle): # Calcula la pendiente o pendiente acumulada del triángulo normal = triangle.Normal # return normal.z / (normal.x ** 2 + normal.y ** 2) ** 0.5 # or proyection = FreeCAD.Vector(normal) proyection.z = 0 return normal.z/proyection.Length def calculate_flow_accumulation(points, triangles): num_points = len(points) flow_accumulation = np.zeros(num_points, dtype=np.float64) flow_direction = np.full(num_points, -1, dtype=np.int32) for i in range(num_points): connected_triangles = [] for j, triangle in enumerate(triangles): if i in triangle: connected_triangles.append(j) for j in connected_triangles: triangle = triangles[j] area = triangle.Area slope = calculate_triangle_slope(triangle) # Actualizar la dirección del flujo si la pendiente es mayor que la dirección actual if flow_direction[i] == -1 or slope > flow_direction[i]: flow_direction[i] = slope # Incrementar la acumulación de flujo flow_accumulation[i] += area * slope return flow_accumulation, flow_direction import numpy as np from queue import Queue def delineate_watersheds(points, triangles, flow_direction, outlet_points): num_points = len(points) watersheds = np.full(num_points, -1, dtype=np.int32) visited = np.zeros(num_points, dtype=np.bool) for outlet_point in outlet_points: queue = Queue() queue.put(outlet_point) while not queue.empty(): current_point = queue.get() watersheds[current_point] = outlet_point visited[current_point] = True connected_triangles = [] for j, triangle in enumerate(triangles): if current_point in triangle: connected_triangles.append(j) for j in connected_triangles: triangle = triangles[j] for point in triangle: if not visited[point] and flow_direction[point] <= flow_direction[current_point]: queue.put(point) visited[point] = True return watersheds