Files
PVPlant/hydro/hydrological.py
2025-04-14 10:05:32 +06:00

390 lines
12 KiB
Python

import FreeCAD
import FreeCADGui
import Mesh
import Part
import numpy as np
import random
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import Pool, cpu_count
from collections import deque
import os
from PVPlantResources import DirIcons as DirIcons
def mesh_to_numpy(mesh_obj):
"""Convierte la malla a arrays de NumPy con validación robusta"""
mesh = mesh_obj.Mesh
# Convertir vértices a array NumPy (shape: Nx3)
vertices = np.array([(v.x, v.y, v.z) for v in mesh.Points], dtype=np.float32)
# Convertir facetas a array NumPy (shape: Mx3)
facets = np.array( [f.PointIndices for f in mesh.Facets], dtype=np.uint32)
# Verificar integridad de índices
max_index = len(mesh.Points) - 1
if facets.size > 0 and (facets > max_index).any():
raise ValueError("Índices de vértices fuera de rango")
return vertices, facets
def build_adjacency_matrix(facets):
"""Construye matriz de adyacencia con conversión segura de tipos"""
edges = {}
adjacency = [[] for _ in range(len(facets))]
for idx, facet in enumerate(facets):
if len(facet) != 3:
continue
v0, v1, v2 = facet
for edge in [(v0, v1), (v1, v2), (v2, v0)]:
sorted_edge = tuple(sorted(edge))
if sorted_edge not in edges:
edges[sorted_edge] = []
edges[sorted_edge].append(idx)
# Procesar solo aristas con 2 facetas
for edge, facet_indices in edges.items():
if len(facet_indices) == 2:
f1, f2 = facet_indices
adjacency[f1].append(f2)
adjacency[f2].append(f1)
return adjacency
def calculate_incenters_parallel(vertices, facets):
"""Cálculo paralelizado de incentros usando NumPy"""
v0 = vertices[facets[:, 0]]
v1 = vertices[facets[:, 1]]
v2 = vertices[facets[:, 2]]
a = np.linalg.norm(v1 - v2, axis=1)
b = np.linalg.norm(v0 - v2, axis=1)
c = np.linalg.norm(v0 - v1, axis=1)
perimeters = a + b + c
return (a[:, None] * v0 + b[:, None] * v1 + c[:, None] * v2) / perimeters[:, None]
def find_basins_parallel(args):
"""Función paralelizable para procesamiento de cuencas"""
chunk, adjacency, elevations = args
basins = []
visited = np.zeros(len(elevations), dtype=bool)
for seed in chunk:
if visited[seed]:
continue
queue = deque([seed])
basin = []
min_elev = elevations[seed]
while queue:
current = queue.popleft()
if visited[current]:
continue
visited[current] = True
basin.append(current)
neighbors = [n for n in adjacency[current] if elevations[n] >= min_elev]
queue.extend(neighbors)
if len(basin) > 0:
basins.append(basin)
return basins
def find_hydrological_basins(mesh_obj, min_area=100):
"""Identificación de cuencas optimizada"""
FreeCAD.Console.PrintMessage(f" -- vertices y facets: ")
FreeCADGui.updateGui()
vertices, facets = mesh_to_numpy(mesh_obj)
FreeCAD.Console.PrintMessage(f" -- Adjacency: ")
FreeCADGui.updateGui()
adjacency = build_adjacency_matrix(facets)
FreeCAD.Console.PrintMessage(f" -- Elevations: ")
FreeCADGui.updateGui()
elevations = calculate_incenters_parallel(vertices, facets)[:, 2]
# Dividir trabajo en chunks
chunk_size = len(facets) // (cpu_count() * 2)
chunks = [
(chunk_range, adjacency, elevations) # Empaqueta los 3 argumentos
for chunk_range in [
range(i, min(i + chunk_size, len(facets)))
for i in range(0, len(facets), chunk_size)
]
]
# Procesamiento paralelo
with ThreadPoolExecutor(max_workers=cpu_count()) as executor:
results = list(executor.map(find_basins_parallel, chunks))
# Combinar resultados
all_basins = [b for sublist in results for b in sublist]
# Filtrar por área mínima
valid_basins = []
for basin in all_basins:
area = sum(triangle_area(vertices[facets[i]]) for i in basin)
if area >= min_area:
valid_basins.append({'facets': basin, 'area': area})
return valid_basins
def triangle_area(vertices):
"""Cálculo rápido de área con producto cruz"""
return 0.5 * np.linalg.norm(
np.cross(vertices[1] - vertices[0], vertices[2] - vertices[0])
)
def validate_facet(facet):
"""Valida que la faceta sea un triángulo válido"""
return hasattr(facet, 'Points') and len(facet.Points) == 3
def calculate_incenter(facet):
"""Calcula el incentro usando la función nativa de FreeCAD"""
try:
return facet.InCircle[0] # (x, y, z)
except:
return None
def build_adjacency(mesh):
"""Construye matriz de adyacencia eficiente en memoria"""
edges = {}
adjacency = [[] for _ in mesh.Facets]
for idx, facet in enumerate(mesh.Facets):
if not validate_facet(facet):
continue
pts = facet.Points
for edge in [(min(pts[0], pts[1]), max(pts[0], pts[1])),
(min(pts[1], pts[2]), max(pts[1], pts[2])),
(min(pts[2], pts[0]), max(pts[2], pts[0]))]:
if edge in edges:
neighbor = edges[edge]
adjacency[idx].append(neighbor)
adjacency[neighbor].append(idx)
del edges[edge] # Liberar memoria
else:
edges[edge] = idx
return adjacency
def find_hydrological_basins_old(mesh_obj, min_area=100):
"""Identificación de cuencas con validación de datos"""
mesh = mesh_obj.Mesh
adjacency = build_adjacency(mesh)
basin_map = {}
current_basin = 0
for seed in range(len(mesh.Facets)):
if seed in basin_map or not validate_facet(mesh.Facets[seed]):
continue
queue = deque([seed])
basin_area = 0.0
basin_facets = []
while queue:
facet_idx = queue.popleft()
if facet_idx in basin_map:
continue
facet = mesh.Facets[facet_idx]
in_center = calculate_incenter(facet)
if not in_center:
continue
# Verificar mínimo local
is_sink = True
for neighbor in adjacency[facet_idx]:
if neighbor >= len(mesh.Facets) or not validate_facet(mesh.Facets[neighbor]):
continue
n_center = calculate_incenter(mesh.Facets[neighbor])
if n_center and n_center[2] < in_center[2]:
is_sink = False
break
if is_sink:
basin_map[facet_idx] = current_basin
basin_facets.append(facet_idx)
basin_area += facet.Area
# Expansión controlada
for neighbor in adjacency[facet_idx]:
if neighbor not in basin_map:
queue.append(neighbor)
if basin_area >= min_area:
yield {
'facets': basin_facets,
'area': basin_area,
'depth': calculate_basin_depth(mesh, basin_facets)
}
current_basin += 1
def calculate_basin_depth(mesh, basin_facets):
"""Calcula la profundidad máxima de la cuenca"""
min_z = float('inf')
max_z = -float('inf')
for idx in basin_facets:
center = calculate_incenter(mesh.Facets[idx])
if center:
min_z = min(min_z, center[2])
max_z = max(max_z, center[2])
return max_z - min_z if max_z != min_z else 0.0
def simulate_water_flow(mesh_obj, basins, rainfall=1.0):
""" Simulación de flujo con prevención de bucles infinitos """
mesh = mesh_obj.Mesh
adjacency = build_adjacency(mesh)
flow_paths = []
for basin in basins:
start_facets = basin['facets'][:2] # Muestra primeros 10 caminos
for start in start_facets:
path = []
visited = set()
current = start
while current is not None and current not in visited:
visited.add(current)
facet = mesh.Facets[current]
center = calculate_incenter(facet)
if not center:
break
path.append(FreeCAD.Vector(*center))
# Buscar vecino más bajo
next_facet = None
min_elev = float('inf')
for neighbor in adjacency[current]:
if neighbor >= len(mesh.Facets):
continue
n_center = calculate_incenter(mesh.Facets[neighbor])
if n_center and n_center[2] < min_elev:
min_elev = n_center[2]
next_facet = neighbor
current = next_facet if min_elev < center[2] else None
if len(path) > 1:
flow_paths.append(path)
return flow_paths
def colorize_mesh(mesh_obj, facet_indices, color):
"""Coloriza facetas específicas de forma compatible"""
mesh = mesh_obj.Mesh
# Crear nuevo objeto Mesh
colored_mesh = Mesh.Mesh()
colored_mesh.addMesh(mesh)
# Crear nuevo objeto en el documento
new_obj = FreeCAD.ActiveDocument.addObject("Mesh::Feature", "ColoredBasin")
new_obj.Mesh = colored_mesh
# Asignar colores a los vértices
vcolors = []
for idx in range(len(mesh.Points)):
vcolors.append((0.8, 0.8, 0.8)) # Color base
for facet_id in facet_indices:
facet = mesh.Facets[facet_id]
for vtx in facet.PointIndices:
vcolors[vtx] = color # Color de la cuenca
new_obj.ViewObject.PointColor = vcolors
new_obj.ViewObject.Lighting = "One side"
new_obj.ViewObject.Shading = "Flat Lines"
def create_polyline(points):
"""Crea un objeto Polyline en FreeCAD"""
if len(points) < 2:
return
poly = Part.makePolygon(points)
obj = FreeCAD.ActiveDocument.addObject("Part::Feature", "FlowPath")
obj.Shape = poly
obj.ViewObject.LineWidth = 2.0
obj.ViewObject.LineColor = (0.0, 0.0, 1.0)
class CommandHydrologicalAnalysis:
def GetResources(self):
return {'Pixmap': str(os.path.join(DirIcons, "drop.jpg")),
'MenuText': "Hidrological analysis",
'Accel': "H, A",
'ToolTip': "Hidrological analysis"}
def IsActive(self):
return True
def Activated(self):
# User input parameters (example values)
os.environ['OMP_NUM_THREADS'] = str(cpu_count())
os.environ['MKL_NUM_THREADS'] = str(cpu_count())
os.environ["FREECAD_NO_FORK"] = "1" # Desactiva el fork en sistemas Unix
#try:
# Parámetros de usuario
min_basin_area = 100 # m²
rainfall_intensity = 1.0
# Validar selección
mesh_obj = FreeCADGui.Selection.getSelection()[0]
if not mesh_obj.isDerivedFrom("Mesh::Feature"):
raise ValueError("Selecciona un objeto de malla")
# Procesamiento principal
FreeCAD.Console.PrintMessage(f"buscar basins: ")
FreeCADGui.updateGui()
basins = list(find_hydrological_basins(mesh_obj, min_basin_area))
FreeCAD.Console.PrintMessage(f" - Cuencas identificadas: {len(basins)}\n")
'''FreeCAD.Console.PrintMessage(f"simulate_water_flow: ")
FreeCADGui.updateGui()
flow_paths = simulate_water_flow(mesh_obj, basins, rainfall_intensity)
FreeCAD.Console.PrintMessage(f" - Trayectorias de flujo generadas: {len(flow_paths)}\n")
FreeCADGui.updateGui()'''
# Visualización
for basin in basins:
color = (random.random(), random.random(), random.random())
colorize_mesh(mesh_obj, basin['facets'], color)
'''for path in flow_paths:
create_polyline(path)'''
FreeCAD.ActiveDocument.recompute()
'''except Exception as e:
FreeCAD.Console.PrintError(f"Error: {str(e)}\n")
finally:
# Limpieza de memoria
import gc
gc.collect()'''