primera subida
This commit is contained in:
439
hydro/D8Pointer.py
Normal file
439
hydro/D8Pointer.py
Normal file
@@ -0,0 +1,439 @@
|
||||
from whitebox_tools import WhiteboxTools
|
||||
|
||||
class D8Pointer(WhiteboxTools):
|
||||
def run(self, args, working_directory, verbose = False):
|
||||
import os
|
||||
import multiprocessing
|
||||
from whitebox_tools import Raster
|
||||
|
||||
def get_formatted_elapsed_time(start):
|
||||
elapsed_time = round((time.time() - start), 4)
|
||||
if elapsed_time < 60.0:
|
||||
return "{} seconds".format(elapsed_time)
|
||||
elif elapsed_time < 3600.0:
|
||||
mins = int(elapsed_time / 60.0)
|
||||
secs = elapsed_time - (mins * 60.0)
|
||||
return "{} minutes {} seconds".format(mins, round(secs, 1))
|
||||
else:
|
||||
hours = int(elapsed_time / 3600.0)
|
||||
mins = int((elapsed_time - (hours * 3600.0)) / 60.0)
|
||||
secs = elapsed_time - (hours * 3600.0) - (mins * 60.0)
|
||||
return "{} hours {} minutes {} seconds".format(hours, mins, round(secs, 1))
|
||||
|
||||
input_file = ""
|
||||
output_file = ""
|
||||
esri_style = False
|
||||
|
||||
if len(args) == 0:
|
||||
raise ValueError("Tool run with no parameters.")
|
||||
|
||||
sep = os.path.sep
|
||||
|
||||
progress = 0
|
||||
old_progress = 1
|
||||
input = Raster(input_file)
|
||||
|
||||
start = time.time()
|
||||
cell_size_x = input.header.cell_size_x
|
||||
cell_size_y = input.header.cell_size_y
|
||||
diag_cell_size = (cell_size_x * cell_size_x + cell_size_y * cell_size_y) ** 0.5
|
||||
|
||||
rows = input.header.rows
|
||||
nodata = input.header.nodata
|
||||
out_nodata = -32768
|
||||
columns = input.header.columns
|
||||
|
||||
num_procs = multiprocessing.cpu_count()
|
||||
def process_rows(tid):
|
||||
d_x = [1, 1, 1, 0, -1, -1, -1, 0]
|
||||
d_y = [-1, 0, 1, 1, 1, 0, -1, -1]
|
||||
grid_lengths = [diag_cell_size,
|
||||
cell_size_x,
|
||||
diag_cell_size,
|
||||
cell_size_y,
|
||||
diag_cell_size,
|
||||
cell_size_x,
|
||||
diag_cell_size,
|
||||
cell_size_y]
|
||||
out_vals = [0, 1, 2, 4, 8, 16, 32, 64] if esri_style else [1, 2, 4, 8, 16, 32, 64, 128]
|
||||
|
||||
for row in range(rows):
|
||||
if row % num_procs == tid:
|
||||
data = [out_nodata] * columns
|
||||
for col in range(columns):
|
||||
z = input.data[row][col]
|
||||
if z != nodata:
|
||||
dir = 0
|
||||
max_slope = float("-inf")
|
||||
for i in range(8):
|
||||
z_n = input.data[row + d_y[i]][col + d_x[i]]
|
||||
if z_n != nodata:
|
||||
slope = (z - z_n) / grid_lengths[i]
|
||||
if slope > max_slope and slope > 0:
|
||||
max_slope = slope
|
||||
dir = i
|
||||
if max_slope >= 0:
|
||||
data[col] = out_vals[dir]
|
||||
else:
|
||||
data[col] = 0
|
||||
tx1.send((row, data))
|
||||
|
||||
output = [[out_nodata] * columns for _ in range(rows)]
|
||||
tx, rx = multiprocessing.Queue(), multiprocessing.Queue()
|
||||
procs = []
|
||||
for tid in range(num_procs):
|
||||
p = multiprocessing.Process(target=process_rows, args=(tid,))
|
||||
procs.append(p)
|
||||
p.start()
|
||||
|
||||
for _ in range(rows):
|
||||
data = rx.get()
|
||||
row = data[0]
|
||||
output[row] = data[1]
|
||||
|
||||
input = None
|
||||
|
||||
output_raster = Raster(output_file, "w")
|
||||
output_raster.header = input.header
|
||||
output_raster.data = output
|
||||
elapsed_time = get_formatted_elapsed_time(start)
|
||||
output_raster.header.nodata = out_nodata
|
||||
output_raster.header.data_type = "i16"
|
||||
output_raster.header.palette = "qual.plt"
|
||||
output_raster.header.photometric_interp = "categorical"
|
||||
output_raster.metadata.append(
|
||||
"Created by whitebox_tools' {} tool".format(self.get_tool_name())
|
||||
)
|
||||
output_raster.metadata.append("Input file: {}".format(input_file))
|
||||
output_raster.metadata.append(
|
||||
"ESRI-style output: {}".format("true" if esri_style else "false")
|
||||
)
|
||||
output_raster.metadata.append(
|
||||
"Elapsed Time (excluding I/O): {}".format(elapsed_time)
|
||||
)
|
||||
output_raster.write_raster()
|
||||
return None
|
||||
|
||||
class Basin:
|
||||
# Asignar valores a las direcciones D8
|
||||
directions = {
|
||||
'N': 1,
|
||||
'NE': 2,
|
||||
'E': 3,
|
||||
'SE': 4,
|
||||
'S': 5,
|
||||
'SW': 6,
|
||||
'W': 7,
|
||||
'NW': 8
|
||||
}
|
||||
def __init__(self, mesh):
|
||||
''' initialize '''
|
||||
self.mesh = mesh
|
||||
|
||||
|
||||
def calculate_flow_direction(mesh):
|
||||
# Crear una lista para almacenar las direcciones del flujo
|
||||
flow_directions = []
|
||||
|
||||
# Iterar sobre cada faceta
|
||||
for facet in mesh.Facets:
|
||||
# Calcular la dirección del flujo basándose en la inclinación
|
||||
flow_direction = calculate_flow_direction_from_inclination(facet.Normal)
|
||||
|
||||
# Agregar la dirección del flujo a la lista
|
||||
flow_directions.append(flow_direction)
|
||||
|
||||
return flow_directions
|
||||
|
||||
def calculate_flow_direction(mesh):
|
||||
import time
|
||||
start = time.time()
|
||||
data = []
|
||||
total_facets = mesh.CountFacets
|
||||
for i, facet in enumerate(mesh.Facets):
|
||||
c = facet.CircumCircle[0]
|
||||
dir = 0
|
||||
max_slope = 0.0 #float("-inf")
|
||||
for ind in facet.NeighbourIndices:
|
||||
if ind >= total_facets:
|
||||
continue
|
||||
c_n = mesh.Facets[ind].CircumCircle[0]
|
||||
l = c.sub(c_n)
|
||||
l.z = 0
|
||||
slope = (c.z - c_n.z) / l.Length
|
||||
if slope > max_slope and slope > 0:
|
||||
max_slope = slope
|
||||
dir = ind
|
||||
if max_slope >= 0:
|
||||
data.append(dir)
|
||||
else:
|
||||
data.append(-1)
|
||||
if i == 100:
|
||||
break
|
||||
print(time.time() - start)
|
||||
return data
|
||||
|
||||
def calculate_flow_direction_from_inclination(inclination):
|
||||
# Obtener los componentes de inclinación en los ejes X, Y
|
||||
inclination_x = inclination.x
|
||||
inclination_y = inclination.y
|
||||
|
||||
#TODO: ver si se pude devolver un vector
|
||||
|
||||
# Asignar valores a las direcciones D8
|
||||
directions = {
|
||||
'N': 1,
|
||||
'NE': 2,
|
||||
'E': 3,
|
||||
'SE': 4,
|
||||
'S': 5,
|
||||
'SW': 6,
|
||||
'W': 7,
|
||||
'NW': 8
|
||||
}
|
||||
|
||||
# Calcular la dirección del flujo basándose en los componentes de inclinación
|
||||
if inclination_x > 0:
|
||||
if inclination_y > 0:
|
||||
return directions['NE']
|
||||
elif inclination_y < 0:
|
||||
return directions['SE']
|
||||
else:
|
||||
return directions['E']
|
||||
elif inclination_x < 0:
|
||||
if inclination_y > 0:
|
||||
return directions['NW']
|
||||
elif inclination_y < 0:
|
||||
return directions['SW']
|
||||
else:
|
||||
return directions['W']
|
||||
else:
|
||||
if inclination_y > 0:
|
||||
return directions['N']
|
||||
elif inclination_y < 0:
|
||||
return directions['S']
|
||||
else:
|
||||
return None
|
||||
|
||||
|
||||
# Calcular la dirección del flujo utilizando la función
|
||||
flow_directions = calculate_flow_direction(land)
|
||||
|
||||
|
||||
def delineate_watersheds(mesh, flow_direction):
|
||||
# Obtener el número de caras (celdas) en la malla
|
||||
num_faces = mesh.CountFacets
|
||||
|
||||
# Crear una lista para almacenar las etiquetas de las cuencas
|
||||
watersheds = [0] * num_faces
|
||||
|
||||
# Inicializar el contador de etiquetas
|
||||
label = 1
|
||||
|
||||
# Recorrer cada cara de la malla
|
||||
for face_index in range(num_faces):
|
||||
# Si la cara aún no tiene etiqueta asignada
|
||||
if watersheds[face_index] == 0:
|
||||
# Iniciar el seguimiento aguas arriba
|
||||
label = delineate_upstream(mesh, face_index, label, flow_direction, watersheds)
|
||||
return watersheds
|
||||
|
||||
def delineate_upstream(mesh, face_index, label, flow_direction, watersheds):
|
||||
# Obtener la dirección del flujo para la cara actual
|
||||
direction = flow_direction[face_index]
|
||||
|
||||
# Si la cara no tiene dirección de flujo o ya tiene una etiqueta asignada
|
||||
if direction == 0 or watersheds[face_index] != 0:
|
||||
return label
|
||||
|
||||
# Asignar la etiqueta actual a la cara
|
||||
watersheds[face_index] = label
|
||||
|
||||
# Obtener los índices de las caras vecinas en la dirección del flujo
|
||||
neighbor_face_indices = mesh.Facets[face_index].NeighbourIndices
|
||||
|
||||
# Realizar el seguimiento aguas arriba recursivamente para cada cara vecina
|
||||
for neighbor_index in neighbor_face_indices:
|
||||
delineate_upstream(mesh, neighbor_index, label, flow_direction, watersheds)
|
||||
label +=1
|
||||
return label
|
||||
|
||||
# Supongamos que ya has obtenido la malla (mesh) y la matriz de dirección del flujo (flow_direction)
|
||||
# ...
|
||||
|
||||
# Delimitar las cuencas hidrográficas
|
||||
watersheds = delineate_watersheds(mesh, flow_directions)
|
||||
|
||||
# Imprimir las etiquetas de las cuencas hidrográficas
|
||||
print(watersheds)
|
||||
|
||||
import FreeCAD as App
|
||||
import Part
|
||||
|
||||
def create_contour_wire(mesh, watershed_label):
|
||||
# Obtener las caras de la malla pertenecientes a la cuenca hidrográfica
|
||||
faces = [i for i, label in enumerate(watersheds) if label == watershed_label]
|
||||
|
||||
# Obtener los vértices de las caras seleccionadas
|
||||
vertices = set()
|
||||
for face_index in faces:
|
||||
vertices.update(mesh.GetFace(face_index))
|
||||
|
||||
# Crear una lista de puntos para la wire
|
||||
points = [App.Vector(*vertex) for vertex in vertices]
|
||||
|
||||
# Crear la wire a partir de la lista de puntos
|
||||
wire = Part.makePolygon(points)
|
||||
|
||||
return wire
|
||||
|
||||
# Supongamos que ya has obtenido la malla (mesh) y la lista de etiquetas de las cuencas hidrográficas (watersheds)
|
||||
# ...
|
||||
|
||||
# Definir el número de la cuenca hidrográfica para la que deseas generar la wire
|
||||
cuenca_numero = 1
|
||||
|
||||
# Crear la wire para el contorno externo de la cuenca hidrográfica seleccionada
|
||||
wire_cuenca = create_contour_wire(mesh, cuenca_numero)
|
||||
|
||||
# Agregar la wire al documento de FreeCAD
|
||||
App.ActiveDocument.addObject("Part::Feature", "ContornoCuenca").Shape = wire_cuenca
|
||||
|
||||
import FreeCAD
|
||||
|
||||
def obtener_coordenadas(malla):
|
||||
num_vertices = len(malla)
|
||||
coordenadas = np.zeros((num_vertices, 3))
|
||||
|
||||
for i, vertice in enumerate(malla):
|
||||
coordenadas[i] = vertice.Point
|
||||
|
||||
return coordenadas
|
||||
|
||||
def delimitar_cuencas(malla):
|
||||
coordenadas = obtener_coordenadas(malla)
|
||||
num_vertices = coordenadas.shape[0]
|
||||
cuencas = np.zeros(num_vertices, dtype=int)
|
||||
altura = np.max(coordenadas[:, 2]) + 1 # Altura inicial para iniciar el flujo
|
||||
|
||||
while True:
|
||||
cambios = False
|
||||
|
||||
for i in range(num_vertices):
|
||||
vecinos = np.where((coordenadas[:, 0] == coordenadas[i, 0]) & (coordenadas[:, 1] == coordenadas[i, 1]))[0]
|
||||
|
||||
for v in vecinos:
|
||||
if coordenadas[v, 2] < altura and cuencas[v] != cuencas[i]:
|
||||
cuencas[i] = cuencas[v]
|
||||
cambios = True
|
||||
|
||||
altura -= 1
|
||||
|
||||
if not cambios:
|
||||
break
|
||||
|
||||
return cuencas
|
||||
|
||||
# Obtener la malla de FreeCAD (ejemplo)
|
||||
malla_fcad = FreeCAD.ActiveDocument.Objects[0].Shape.Faces[0].Mesh
|
||||
malla = malla_fcad.Topology
|
||||
|
||||
# Delimitar cuencas hidrológicas
|
||||
cuencas = delimitar_cuencas(malla)
|
||||
|
||||
# Visualizar las cuencas en un gráfico con PyCairo (ejemplo)
|
||||
import cairo
|
||||
|
||||
width, height = 800, 600
|
||||
surface = cairo.ImageSurface(cairo.FORMAT_ARGB32, width, height)
|
||||
ctx = cairo.Context(surface)
|
||||
ctx.scale(width, height)
|
||||
|
||||
# Dibujar las cuencas en el gráfico
|
||||
for i, face in enumerate(malla_fcad.Facets):
|
||||
ctx.set_source_rgba(0.5, 0.5, 0.5, 0.5) # Color de relleno
|
||||
ctx.set_line_width(0.01) # Ancho de línea
|
||||
ctx.move_to(face.Vertexes[0].Point.x, face.Vertexes[0].Point.y)
|
||||
|
||||
for j in range(1, len(face.Vertexes)):
|
||||
ctx.line_to(face.Vertexes[j].Point.x, face.Vertexes[j].Point.y)
|
||||
|
||||
ctx.close_path()
|
||||
ctx.stroke_preserve()
|
||||
|
||||
cuenca = cuencas[i]
|
||||
color = (cuenca % 255) / 255.0 # Color basado en el número de cuenca
|
||||
ctx.set_source_rgba(color, 0.0, 0.0, 0.5) # Color de borde
|
||||
ctx.fill()
|
||||
|
||||
surface.write_to_png('cuencas_hidrologicas.png')
|
||||
|
||||
|
||||
def calculate_triangle_slope(triangle):
|
||||
# Calcula la pendiente o pendiente acumulada del triángulo
|
||||
normal = triangle.Normal
|
||||
# return normal.z / (normal.x ** 2 + normal.y ** 2) ** 0.5
|
||||
# or
|
||||
proyection = FreeCAD.Vector(normal)
|
||||
proyection.z = 0
|
||||
return normal.z/proyection.Length
|
||||
|
||||
def calculate_flow_accumulation(points, triangles):
|
||||
num_points = len(points)
|
||||
flow_accumulation = np.zeros(num_points, dtype=np.float64)
|
||||
flow_direction = np.full(num_points, -1, dtype=np.int32)
|
||||
|
||||
for i in range(num_points):
|
||||
connected_triangles = []
|
||||
|
||||
for j, triangle in enumerate(triangles):
|
||||
if i in triangle:
|
||||
connected_triangles.append(j)
|
||||
|
||||
for j in connected_triangles:
|
||||
triangle = triangles[j]
|
||||
area = triangle.Area
|
||||
slope = calculate_triangle_slope(triangle)
|
||||
|
||||
# Actualizar la dirección del flujo si la pendiente es mayor que la dirección actual
|
||||
if flow_direction[i] == -1 or slope > flow_direction[i]:
|
||||
flow_direction[i] = slope
|
||||
|
||||
# Incrementar la acumulación de flujo
|
||||
flow_accumulation[i] += area * slope
|
||||
|
||||
return flow_accumulation, flow_direction
|
||||
|
||||
import numpy as np
|
||||
from queue import Queue
|
||||
|
||||
def delineate_watersheds(points, triangles, flow_direction, outlet_points):
|
||||
num_points = len(points)
|
||||
watersheds = np.full(num_points, -1, dtype=np.int32)
|
||||
visited = np.zeros(num_points, dtype=np.bool)
|
||||
|
||||
for outlet_point in outlet_points:
|
||||
queue = Queue()
|
||||
queue.put(outlet_point)
|
||||
|
||||
while not queue.empty():
|
||||
current_point = queue.get()
|
||||
watersheds[current_point] = outlet_point
|
||||
visited[current_point] = True
|
||||
|
||||
connected_triangles = []
|
||||
|
||||
for j, triangle in enumerate(triangles):
|
||||
if current_point in triangle:
|
||||
connected_triangles.append(j)
|
||||
|
||||
for j in connected_triangles:
|
||||
triangle = triangles[j]
|
||||
for point in triangle:
|
||||
if not visited[point] and flow_direction[point] <= flow_direction[current_point]:
|
||||
queue.put(point)
|
||||
visited[point] = True
|
||||
|
||||
return watersheds
|
||||
|
||||
|
||||
325
hydro/basin.py
Normal file
325
hydro/basin.py
Normal file
@@ -0,0 +1,325 @@
|
||||
'''
|
||||
This tool is part of the WhiteboxTools geospatial analysis library.
|
||||
Authors: Dr. John Lindsay
|
||||
Created: 01/07/2017
|
||||
Last Modified: 18/10/2019
|
||||
License: MIT
|
||||
'''
|
||||
|
||||
'''use whitebox_raster::*
|
||||
use whitebox_common::structures::Array2D
|
||||
use crate::tools::*
|
||||
use std::env
|
||||
use std::f64
|
||||
use std::io::Error, ErrorKind
|
||||
use std::path'''
|
||||
|
||||
from crate.tools import *
|
||||
from whitebox_raster import *
|
||||
|
||||
|
||||
# This tool can be used to delineate all of the drainage basins contained within a local drainage direction,
|
||||
# or flow pointer raster (`--d8_pntr`), and draining to the edge of the data. The flow pointer raster must be derived using
|
||||
# the `D8Pointer` tool and should have been extracted from a digital elevation model (DEM) that has been
|
||||
# hydrologically pre-processed to remove topographic depressions and flat areas, e.g. using the `BreachDepressions`
|
||||
# tool. By default, the flow pointer raster is assumed to use the clockwise indexing method used by WhiteboxTools:
|
||||
#
|
||||
# | . | . | . |
|
||||
# |:--:|:---:|:--:|
|
||||
# | 64 | 128 | 1 |
|
||||
# | 32 | 0 | 2 |
|
||||
# | 16 | 8 | 4 |
|
||||
#
|
||||
# If the pointer file contains ESRI flow direction values instead, the `--esri_pntr` parameter must be specified.
|
||||
#
|
||||
# The `Basins` and `Watershed` tools are similar in function but while the `Watershed` tool identifies the upslope areas
|
||||
# that drain to one or more user-specified outpoints, the `Basins` tool automatically sets outlets to all grid cells
|
||||
# situated along the edge of the data that do not have a defined flow direction (i.e. they do not have a lower neighbour).
|
||||
# Notice that these edge outlets need not be situated along the edges of the flow-pointer raster, but rather along the
|
||||
# edges of the region of valid data. That is, the DEM from which the flow-pointer has been extracted may incompletely
|
||||
# fill the containing raster, if it is irregular shaped, and NoData regions may occupy the peripherals. Thus, the entire
|
||||
# region of valid data in the flow pointer raster will be divided into a set of mutually exclusive basins using this tool.
|
||||
#
|
||||
# # See Also
|
||||
# `Watershed`, `D8Pointer`, `BreachDepressions`
|
||||
|
||||
class Basins:
|
||||
def run(self, args: List[str], working_directory: str, verbose: bool) -> None:
|
||||
d8_file = ''
|
||||
output_file = ''
|
||||
esri_style = False
|
||||
|
||||
if len(args) == 0:
|
||||
raise ValueError("Tool run with no parameters.")
|
||||
|
||||
i = 0
|
||||
'''while i < len(args):
|
||||
arg = args[i].replace('"', '').replace("'", "")
|
||||
cmd = arg.split("=")
|
||||
vec = list(cmd)
|
||||
keyval = False
|
||||
if len(vec) > 1:
|
||||
keyval = True
|
||||
flag_val = vec[0].lower().replace("--", "-")
|
||||
if flag_val == "-d8_pntr":
|
||||
d8_file = vec[1] if keyval else args[i + 1]
|
||||
elif flag_val == "-o" or flag_val == "-output":
|
||||
output_file = vec[1] if keyval else args[i + 1]
|
||||
elif flag_val == "-esri_pntr" or flag_val == "-esri_style":
|
||||
if len(vec) == 1 or not vec[1].lower().contains("false"):
|
||||
esri_style = True
|
||||
i += 1
|
||||
|
||||
if verbose:
|
||||
tool_name = self.get_tool_name()
|
||||
welcome_len = max(len(f"* Welcome to {tool_name} *"), 28) # 28 = length of the 'Powered by' by statement.
|
||||
print("*" * welcome_len)
|
||||
print(f"* Welcome to {tool_name} *{' ' * (welcome_len - 15 - len(tool_name))}")
|
||||
print("* Powered by WhiteboxTools *{' ' * (welcome_len - 28)}")
|
||||
print("* www.whiteboxgeo.com *{' ' * (welcome_len - 23)}")
|
||||
print("*" * welcome_len)'''
|
||||
|
||||
sep = os.path.sep
|
||||
|
||||
progress = 0
|
||||
old_progress = 1
|
||||
|
||||
pntr = Raster(d8_file, mode="r")
|
||||
start = time.time()
|
||||
rows = pntr.configs.rows
|
||||
columns = pntr.configs.columns
|
||||
nodata = pntr.configs.nodata
|
||||
|
||||
dx = [1, 1, 1, 0, -1, -1, -1, 0]
|
||||
dy = [-1, 0, 1, 1, 1, 0, -1, -1]
|
||||
|
||||
import numpy as np
|
||||
|
||||
flow_dir = np.full((rows, columns), -2, dtype=np.int8)
|
||||
output = Raster(output_file, mode="w", raster=pntr)
|
||||
output.data_type = DataType.FLOAT32
|
||||
output.palette = "qual.plt"
|
||||
output.photometric_interp = PhotometricInterpretation.CATEGORICAL
|
||||
low_value = np.finfo(np.float64).min
|
||||
output.reinitialize_values(low_value)
|
||||
|
||||
# Create a mapping from the pointer values to cell offsets.
|
||||
# This may seem wasteful, using only 8 of 129 values in the array,
|
||||
# but the mapping method is far faster than calculating np.log(z) / np.log(2.0).
|
||||
# It's also a good way of allowing for different point styles.
|
||||
pntr_matches = np.zeros(129, dtype=np.int8)
|
||||
if not esri_style:
|
||||
# This maps Whitebox-style D8 pointer values
|
||||
# onto the cell offsets in d_x and d_y.
|
||||
pntr_matches[1] = 0
|
||||
pntr_matches[2] = 1
|
||||
pntr_matches[4] = 2
|
||||
pntr_matches[8] = 3
|
||||
pntr_matches[16] = 4
|
||||
pntr_matches[32] = 5
|
||||
pntr_matches[64] = 6
|
||||
pntr_matches[128] = 7
|
||||
else:
|
||||
# This maps Esri-style D8 pointer values
|
||||
# onto the cell offsets in d_x and d_y.
|
||||
pntr_matches[1] = 1
|
||||
pntr_matches[2] = 2
|
||||
pntr_matches[4] = 3
|
||||
pntr_matches[8] = 4
|
||||
pntr_matches[16] = 5
|
||||
pntr_matches[32] = 6
|
||||
pntr_matches[64] = 7
|
||||
pntr_matches[128] = 0
|
||||
|
||||
basin_id = 0.0
|
||||
for row in range(rows):
|
||||
for col in range(columns):
|
||||
z = pntr[row, col]
|
||||
if z != nodata:
|
||||
if z > 0.0:
|
||||
flow_dir[row, col] = pntr_matches[int(z)]
|
||||
else:
|
||||
flow_dir[row, col] = -1
|
||||
basin_id += 1.0
|
||||
output[row, col] = basin_id
|
||||
else:
|
||||
output[row, col] = nodata
|
||||
|
||||
flag = False
|
||||
x = 0
|
||||
y = 0
|
||||
dir = 0
|
||||
outlet_id = nodata
|
||||
for row in range(rows):
|
||||
for col in range(columns):
|
||||
if output[row, col] == low_value:
|
||||
flag = False
|
||||
x = col
|
||||
y = row
|
||||
outlet_id = nodata
|
||||
while not flag:
|
||||
# find its downslope neighbour
|
||||
dir = flow_dir[y, x]
|
||||
if dir >= 0:
|
||||
# move x and y accordingly
|
||||
x += dx[dir]
|
||||
y += dy[dir]
|
||||
|
||||
# if the new cell already has a value in the output, use that as the outletID
|
||||
z = output[y, x]
|
||||
if z != low_value:
|
||||
outlet_id = z
|
||||
flag = True
|
||||
else:
|
||||
flag = True
|
||||
|
||||
flag = False
|
||||
x = col
|
||||
y = row
|
||||
output[y, x] = outlet_id
|
||||
while not flag:
|
||||
# find its downslope neighbour
|
||||
dir = flow_dir[y, x]
|
||||
if dir >= 0:
|
||||
# move x and y accordingly
|
||||
x += dx[dir]
|
||||
y += dy[dir]
|
||||
|
||||
# if the new cell already has a value in the output, use that as the outletID
|
||||
if output[y, x] != low_value:
|
||||
flag = True
|
||||
else:
|
||||
flag = True
|
||||
|
||||
output[y, x] = outlet_id
|
||||
|
||||
elapsed_time = get_formatted_elapsed_time(start)
|
||||
output.add_metadata_entry(f"Created by whitebox_tools' {self.get_tool_name()}")
|
||||
output.add_metadata_entry(f"D8 pointer file: {d8_file}")
|
||||
output.add_metadata_entry(f"Elapsed Time (excluding I/O): {elapsed_time}")
|
||||
|
||||
|
||||
try:
|
||||
_ = output.write()
|
||||
except Exception as e:
|
||||
return Err(e)
|
||||
|
||||
|
||||
from whitebox_tools import WhiteboxTools
|
||||
import os
|
||||
from whitebox_tools.types import Array2D, DataType, PhotometricInterpretation
|
||||
|
||||
class Basins(WhiteboxTools):
|
||||
def run(self, args, working_directory, verbose):
|
||||
d8_file = ''
|
||||
output_file = ''
|
||||
esri_style = False
|
||||
sep = os.path.sep
|
||||
|
||||
progress = 0
|
||||
old_progress = 1
|
||||
|
||||
# Reading data...
|
||||
pntr = self.open_raster(d8_file)
|
||||
pntr_info = self.get_raster_info(d8_file)
|
||||
|
||||
rows = pntr_info['rows']
|
||||
columns = pntr_info['columns']
|
||||
nodata = pntr_info['nodata']
|
||||
|
||||
dx = [1, 1, 1, 0, -1, -1, -1, 0]
|
||||
dy = [-1, 0, 1, 1, 1, 0, -1, -1]
|
||||
|
||||
flow_dir = Array2D(rows, columns, -2)
|
||||
output = self.create_new_raster(output_file, d8_file, DataType.F32)
|
||||
output.configs.palette = "qual.plt"
|
||||
output.configs.photometric_interp = PhotometricInterpretation.Categoricaloutput.reinitialize_values(float('-inf'))
|
||||
|
||||
pntr_matches = [0] * 129
|
||||
if not esri_style:
|
||||
pntr_matches[1] = 0
|
||||
pntr_matches[2] = 1
|
||||
pntr_matches[4] = 2
|
||||
pntr_matches[8] = 3
|
||||
pntr_matches[16] = 4
|
||||
pntr_matches[32] = 5
|
||||
pntr_matches[64] = 6
|
||||
pntr_matches[128] = 7
|
||||
else:
|
||||
pntr_matches[1] = 1
|
||||
pntr_matches[2] = 2
|
||||
pntr_matches[4] = 3
|
||||
pntr_matches[8] = 4
|
||||
pntr_matches[16] = 5
|
||||
pntr_matches[32] = 6
|
||||
pntr_matches[64] = 7
|
||||
pntr_matches[128] = 0
|
||||
|
||||
basin_id = 0
|
||||
for row in range(rows):
|
||||
for col in range(columns):
|
||||
z = pntr[row, col]
|
||||
if z != nodata:
|
||||
if z > 0:
|
||||
flow_dir[row, col] = pntr_matches[int(z)]
|
||||
else:
|
||||
flow_dir[row, col] = -1
|
||||
basin_id += 1
|
||||
output[row, col] = basin_id
|
||||
else:
|
||||
output[row, col] = nodata
|
||||
|
||||
if verbose:
|
||||
progress = int(100 * (row * columns + col) / (rows * columns - 1))
|
||||
if progress != old_progress:
|
||||
print("Initializing: {}%".format(progress))
|
||||
old_progress = progress
|
||||
|
||||
for row in range(rows):
|
||||
for col in range(columns):
|
||||
if output[row, col] == float('-inf'):
|
||||
flag = False
|
||||
x = col
|
||||
y = row
|
||||
outlet_id = nodata
|
||||
while not flag:
|
||||
dir = flow_dir[y, x]
|
||||
if dir >= 0:
|
||||
x += dx[dir]
|
||||
y += dy[dir]
|
||||
z = output[y, x]
|
||||
if z != float('-inf'):
|
||||
outlet_id = z
|
||||
flag = True
|
||||
else:
|
||||
flag = True
|
||||
flag = False
|
||||
x = col
|
||||
y = row
|
||||
output[y, x] = outlet_id
|
||||
while not flag:
|
||||
dir = flow_dir[y, x]
|
||||
if dir >= 0:
|
||||
x += dx[dir]
|
||||
y += dy[dir]
|
||||
if output[y, x] != float('-inf'):
|
||||
flag = True
|
||||
else:
|
||||
flag = True
|
||||
output[y, x] = outlet_id
|
||||
|
||||
if verbose:
|
||||
progress = int(100 * (row * columns + col) / (rows * columns - 1))
|
||||
if progress != old_progress:
|
||||
print("Progress: {}%".format(progress))
|
||||
old_progress = progress
|
||||
|
||||
elapsed_time = self.get_formatted_elapsed_time(start)
|
||||
output.add_metadata_entry("Created by whitebox_tools' {} tool".format(self.get_tool_name()))
|
||||
output.add_metadata_entry("D8 pointer file: {}".format(d8_file))
|
||||
output.add_metadata_entry("ESRI-style output: {}".format(str(esri_style)))
|
||||
output.add_metadata_entry("Elapsed Time (excluding I/O): {}".format(elapsed_time))
|
||||
|
||||
if verbose:
|
||||
print("Saving data...")
|
||||
print("Output file written")
|
||||
117
hydro/myBasin.py
Normal file
117
hydro/myBasin.py
Normal file
@@ -0,0 +1,117 @@
|
||||
import FreeCAD
|
||||
import Mesh
|
||||
import numpy as np
|
||||
|
||||
# Cargar la malla
|
||||
mesh = Mesh.Mesh("ruta/a/tu/malla.stl")
|
||||
|
||||
# Extraer vértices y facetas
|
||||
vertices = [vertex.Point for vertex in mesh.Points]
|
||||
facets = [facet.PointIndices for facet in mesh.Facets]
|
||||
|
||||
# Crear una representación de la malla
|
||||
mesh_representation = {
|
||||
"vertices": vertices,
|
||||
"facets": facets
|
||||
}
|
||||
|
||||
|
||||
# Calcular la dirección del flujo de agua
|
||||
def calculate_slope_direction(facet):
|
||||
v1 = np.array(FreeCAD.Vector(facet.Points[0]))
|
||||
v2 = np.array(FreeCAD.Vector(facet.Points[1]))
|
||||
v3 = np.array(FreeCAD.Vector(facet.Points[2]))
|
||||
|
||||
edge1 = v2 - v1
|
||||
edge2 = v3 - v1
|
||||
|
||||
normal = np.cross(edge1, edge2)
|
||||
normal[2] = abs(normal[2])
|
||||
normal = normal / np.linalg.norm(normal)
|
||||
|
||||
slope_direction = np.array([0.0, 0.0, 0.0])
|
||||
slope_direction[:2] = np.mean([edge1[:2], edge2[:2]], axis=0)
|
||||
slope_direction[2] = -(normal[:2].dot(slope_direction[:2]) / normal[2])
|
||||
slope_direction = slope_direction / np.linalg.norm(slope_direction)
|
||||
|
||||
return slope_direction
|
||||
|
||||
|
||||
flow_directions = []
|
||||
for ind, facet in enumerate(facets):
|
||||
direction = calculate_slope_direction(facet)
|
||||
flow_directions.append(direction)
|
||||
if ind == 1000:
|
||||
break
|
||||
|
||||
# Determinar cuencas hidrológicas
|
||||
basins = [None] * len(vertices)
|
||||
|
||||
|
||||
def find_basin(vertex_index, flow_directions, facets):
|
||||
if basins[vertex_index] is not None:
|
||||
return basins[vertex_index]
|
||||
|
||||
for facet, direction in zip(facets, flow_directions):
|
||||
if vertex_index in facet:
|
||||
v_indices = [idx for idx in facet if idx != vertex_index]
|
||||
min_vertex_index = v_indices[0]
|
||||
min_height = vertices[min_vertex_index][2]
|
||||
|
||||
for vi in v_indices[1:]:
|
||||
if vertices[vi][2] < min_height:
|
||||
min_height = vertices[vi][2]
|
||||
min_vertex_index = vi
|
||||
|
||||
if vertices[vertex_index][2] > min_height:
|
||||
basins[vertex_index] = find_basin(min_vertex_index, flow_directions, facets)
|
||||
return basins[vertex_index]
|
||||
|
||||
basins[vertex_index] = vertex_index
|
||||
return vertex_index
|
||||
|
||||
|
||||
for i in range(len(vertices)):
|
||||
find_basin(i, flow_directions, facets)
|
||||
|
||||
# Calcular acumulación de agua
|
||||
water_accumulation = [0] * len(vertices)
|
||||
|
||||
|
||||
def accumulate_water(vertex_index, flow_directions, facets, water_accumulation):
|
||||
if water_accumulation[vertex_index] > 0:
|
||||
return water_accumulation[vertex_index]
|
||||
|
||||
for facet, direction in zip(facets, flow_directions):
|
||||
if vertex_index in facet:
|
||||
destination_vertex = None
|
||||
min_height = float('inf')
|
||||
for vi in facet:
|
||||
if vertices[vi][2] < min_height:
|
||||
min_height = vertices[vi][2]
|
||||
destination_vertex = vi
|
||||
if destination_vertex is not None:
|
||||
water_accumulation[vertex_index] += accumulate_water(destination_vertex, flow_directions, facets,
|
||||
water_accumulation) + 1
|
||||
|
||||
return water_accumulation[vertex_index]
|
||||
|
||||
|
||||
for i in range(len(vertices)):
|
||||
accumulate_water(i, flow_directions, facets, water_accumulation)
|
||||
|
||||
# Identificar puntos de drenaje
|
||||
drainage_points = []
|
||||
threshold = 10
|
||||
|
||||
|
||||
def identify_drainage_points(vertices, water_accumulation, threshold):
|
||||
for i, water in enumerate(water_accumulation):
|
||||
if water >= threshold:
|
||||
drainage_points.append(vertices[i])
|
||||
|
||||
|
||||
identify_drainage_points(vertices, water_accumulation, threshold)
|
||||
|
||||
for point in drainage_points:
|
||||
print("Punto de drenaje en:", point)
|
||||
Reference in New Issue
Block a user