390 lines
12 KiB
Python
390 lines
12 KiB
Python
import FreeCAD
|
|
import FreeCADGui
|
|
import Mesh
|
|
import Part
|
|
import numpy as np
|
|
import random
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from multiprocessing import Pool, cpu_count
|
|
from collections import deque
|
|
|
|
|
|
import os
|
|
from PVPlantResources import DirIcons as DirIcons
|
|
|
|
|
|
def mesh_to_numpy(mesh_obj):
|
|
"""Convierte la malla a arrays de NumPy con validación robusta"""
|
|
mesh = mesh_obj.Mesh
|
|
|
|
# Convertir vértices a array NumPy (shape: Nx3)
|
|
vertices = np.array([(v.x, v.y, v.z) for v in mesh.Points], dtype=np.float32)
|
|
|
|
# Convertir facetas a array NumPy (shape: Mx3)
|
|
facets = np.array( [f.PointIndices for f in mesh.Facets], dtype=np.uint32)
|
|
|
|
# Verificar integridad de índices
|
|
max_index = len(mesh.Points) - 1
|
|
if facets.size > 0 and (facets > max_index).any():
|
|
raise ValueError("Índices de vértices fuera de rango")
|
|
|
|
return vertices, facets
|
|
|
|
|
|
def build_adjacency_matrix(facets):
|
|
"""Construye matriz de adyacencia con conversión segura de tipos"""
|
|
edges = {}
|
|
adjacency = [[] for _ in range(len(facets))]
|
|
|
|
for idx, facet in enumerate(facets):
|
|
if len(facet) != 3:
|
|
continue
|
|
|
|
v0, v1, v2 = facet
|
|
|
|
for edge in [(v0, v1), (v1, v2), (v2, v0)]:
|
|
sorted_edge = tuple(sorted(edge))
|
|
|
|
if sorted_edge not in edges:
|
|
edges[sorted_edge] = []
|
|
edges[sorted_edge].append(idx)
|
|
|
|
# Procesar solo aristas con 2 facetas
|
|
for edge, facet_indices in edges.items():
|
|
if len(facet_indices) == 2:
|
|
f1, f2 = facet_indices
|
|
adjacency[f1].append(f2)
|
|
adjacency[f2].append(f1)
|
|
|
|
return adjacency
|
|
|
|
|
|
def calculate_incenters_parallel(vertices, facets):
|
|
"""Cálculo paralelizado de incentros usando NumPy"""
|
|
v0 = vertices[facets[:, 0]]
|
|
v1 = vertices[facets[:, 1]]
|
|
v2 = vertices[facets[:, 2]]
|
|
|
|
a = np.linalg.norm(v1 - v2, axis=1)
|
|
b = np.linalg.norm(v0 - v2, axis=1)
|
|
c = np.linalg.norm(v0 - v1, axis=1)
|
|
|
|
perimeters = a + b + c
|
|
return (a[:, None] * v0 + b[:, None] * v1 + c[:, None] * v2) / perimeters[:, None]
|
|
|
|
|
|
def find_basins_parallel(args):
|
|
"""Función paralelizable para procesamiento de cuencas"""
|
|
chunk, adjacency, elevations = args
|
|
basins = []
|
|
visited = np.zeros(len(elevations), dtype=bool)
|
|
|
|
for seed in chunk:
|
|
if visited[seed]:
|
|
continue
|
|
|
|
queue = deque([seed])
|
|
basin = []
|
|
min_elev = elevations[seed]
|
|
|
|
while queue:
|
|
current = queue.popleft()
|
|
if visited[current]:
|
|
continue
|
|
|
|
visited[current] = True
|
|
basin.append(current)
|
|
|
|
neighbors = [n for n in adjacency[current] if elevations[n] >= min_elev]
|
|
queue.extend(neighbors)
|
|
|
|
if len(basin) > 0:
|
|
basins.append(basin)
|
|
|
|
return basins
|
|
|
|
|
|
def find_hydrological_basins(mesh_obj, min_area=100):
|
|
"""Identificación de cuencas optimizada"""
|
|
FreeCAD.Console.PrintMessage(f" -- vertices y facets: ")
|
|
FreeCADGui.updateGui()
|
|
vertices, facets = mesh_to_numpy(mesh_obj)
|
|
FreeCAD.Console.PrintMessage(f" -- Adjacency: ")
|
|
FreeCADGui.updateGui()
|
|
adjacency = build_adjacency_matrix(facets)
|
|
FreeCAD.Console.PrintMessage(f" -- Elevations: ")
|
|
FreeCADGui.updateGui()
|
|
elevations = calculate_incenters_parallel(vertices, facets)[:, 2]
|
|
|
|
# Dividir trabajo en chunks
|
|
chunk_size = len(facets) // (cpu_count() * 2)
|
|
chunks = [
|
|
(chunk_range, adjacency, elevations) # Empaqueta los 3 argumentos
|
|
for chunk_range in [
|
|
range(i, min(i + chunk_size, len(facets)))
|
|
for i in range(0, len(facets), chunk_size)
|
|
]
|
|
]
|
|
|
|
# Procesamiento paralelo
|
|
with ThreadPoolExecutor(max_workers=cpu_count()) as executor:
|
|
results = list(executor.map(find_basins_parallel, chunks))
|
|
|
|
# Combinar resultados
|
|
all_basins = [b for sublist in results for b in sublist]
|
|
|
|
# Filtrar por área mínima
|
|
valid_basins = []
|
|
for basin in all_basins:
|
|
area = sum(triangle_area(vertices[facets[i]]) for i in basin)
|
|
if area >= min_area:
|
|
valid_basins.append({'facets': basin, 'area': area})
|
|
|
|
return valid_basins
|
|
|
|
|
|
def triangle_area(vertices):
|
|
"""Cálculo rápido de área con producto cruz"""
|
|
return 0.5 * np.linalg.norm(
|
|
np.cross(vertices[1] - vertices[0], vertices[2] - vertices[0])
|
|
)
|
|
|
|
|
|
def validate_facet(facet):
|
|
"""Valida que la faceta sea un triángulo válido"""
|
|
return hasattr(facet, 'Points') and len(facet.Points) == 3
|
|
|
|
|
|
def calculate_incenter(facet):
|
|
"""Calcula el incentro usando la función nativa de FreeCAD"""
|
|
try:
|
|
return facet.InCircle[0] # (x, y, z)
|
|
except:
|
|
return None
|
|
|
|
|
|
def build_adjacency(mesh):
|
|
"""Construye matriz de adyacencia eficiente en memoria"""
|
|
edges = {}
|
|
adjacency = [[] for _ in mesh.Facets]
|
|
|
|
for idx, facet in enumerate(mesh.Facets):
|
|
if not validate_facet(facet):
|
|
continue
|
|
|
|
pts = facet.Points
|
|
for edge in [(min(pts[0], pts[1]), max(pts[0], pts[1])),
|
|
(min(pts[1], pts[2]), max(pts[1], pts[2])),
|
|
(min(pts[2], pts[0]), max(pts[2], pts[0]))]:
|
|
if edge in edges:
|
|
neighbor = edges[edge]
|
|
adjacency[idx].append(neighbor)
|
|
adjacency[neighbor].append(idx)
|
|
del edges[edge] # Liberar memoria
|
|
else:
|
|
edges[edge] = idx
|
|
return adjacency
|
|
|
|
|
|
def find_hydrological_basins_old(mesh_obj, min_area=100):
|
|
"""Identificación de cuencas con validación de datos"""
|
|
mesh = mesh_obj.Mesh
|
|
adjacency = build_adjacency(mesh)
|
|
basin_map = {}
|
|
current_basin = 0
|
|
|
|
for seed in range(len(mesh.Facets)):
|
|
if seed in basin_map or not validate_facet(mesh.Facets[seed]):
|
|
continue
|
|
|
|
queue = deque([seed])
|
|
basin_area = 0.0
|
|
basin_facets = []
|
|
|
|
while queue:
|
|
facet_idx = queue.popleft()
|
|
if facet_idx in basin_map:
|
|
continue
|
|
|
|
facet = mesh.Facets[facet_idx]
|
|
in_center = calculate_incenter(facet)
|
|
if not in_center:
|
|
continue
|
|
|
|
# Verificar mínimo local
|
|
is_sink = True
|
|
for neighbor in adjacency[facet_idx]:
|
|
if neighbor >= len(mesh.Facets) or not validate_facet(mesh.Facets[neighbor]):
|
|
continue
|
|
|
|
n_center = calculate_incenter(mesh.Facets[neighbor])
|
|
if n_center and n_center[2] < in_center[2]:
|
|
is_sink = False
|
|
break
|
|
|
|
if is_sink:
|
|
basin_map[facet_idx] = current_basin
|
|
basin_facets.append(facet_idx)
|
|
basin_area += facet.Area
|
|
|
|
# Expansión controlada
|
|
for neighbor in adjacency[facet_idx]:
|
|
if neighbor not in basin_map:
|
|
queue.append(neighbor)
|
|
|
|
if basin_area >= min_area:
|
|
yield {
|
|
'facets': basin_facets,
|
|
'area': basin_area,
|
|
'depth': calculate_basin_depth(mesh, basin_facets)
|
|
}
|
|
current_basin += 1
|
|
|
|
|
|
def calculate_basin_depth(mesh, basin_facets):
|
|
"""Calcula la profundidad máxima de la cuenca"""
|
|
min_z = float('inf')
|
|
max_z = -float('inf')
|
|
for idx in basin_facets:
|
|
center = calculate_incenter(mesh.Facets[idx])
|
|
if center:
|
|
min_z = min(min_z, center[2])
|
|
max_z = max(max_z, center[2])
|
|
return max_z - min_z if max_z != min_z else 0.0
|
|
|
|
|
|
def simulate_water_flow(mesh_obj, basins, rainfall=1.0):
|
|
""" Simulación de flujo con prevención de bucles infinitos """
|
|
mesh = mesh_obj.Mesh
|
|
adjacency = build_adjacency(mesh)
|
|
flow_paths = []
|
|
|
|
for basin in basins:
|
|
start_facets = basin['facets'][:2] # Muestra primeros 10 caminos
|
|
for start in start_facets:
|
|
path = []
|
|
visited = set()
|
|
current = start
|
|
|
|
while current is not None and current not in visited:
|
|
visited.add(current)
|
|
facet = mesh.Facets[current]
|
|
center = calculate_incenter(facet)
|
|
if not center:
|
|
break
|
|
|
|
path.append(FreeCAD.Vector(*center))
|
|
|
|
# Buscar vecino más bajo
|
|
next_facet = None
|
|
min_elev = float('inf')
|
|
for neighbor in adjacency[current]:
|
|
if neighbor >= len(mesh.Facets):
|
|
continue
|
|
|
|
n_center = calculate_incenter(mesh.Facets[neighbor])
|
|
if n_center and n_center[2] < min_elev:
|
|
min_elev = n_center[2]
|
|
next_facet = neighbor
|
|
|
|
current = next_facet if min_elev < center[2] else None
|
|
|
|
if len(path) > 1:
|
|
flow_paths.append(path)
|
|
|
|
return flow_paths
|
|
|
|
|
|
def colorize_mesh(mesh_obj, facet_indices, color):
|
|
"""Coloriza facetas específicas de forma compatible"""
|
|
mesh = mesh_obj.Mesh
|
|
|
|
# Crear nuevo objeto Mesh
|
|
colored_mesh = Mesh.Mesh()
|
|
colored_mesh.addMesh(mesh)
|
|
|
|
# Crear nuevo objeto en el documento
|
|
new_obj = FreeCAD.ActiveDocument.addObject("Mesh::Feature", "ColoredBasin")
|
|
new_obj.Mesh = colored_mesh
|
|
|
|
# Asignar colores a los vértices
|
|
vcolors = []
|
|
for idx in range(len(mesh.Points)):
|
|
vcolors.append((0.8, 0.8, 0.8)) # Color base
|
|
|
|
for facet_id in facet_indices:
|
|
facet = mesh.Facets[facet_id]
|
|
for vtx in facet.PointIndices:
|
|
vcolors[vtx] = color # Color de la cuenca
|
|
|
|
new_obj.ViewObject.PointColor = vcolors
|
|
new_obj.ViewObject.Lighting = "One side"
|
|
new_obj.ViewObject.Shading = "Flat Lines"
|
|
|
|
|
|
def create_polyline(points):
|
|
"""Crea un objeto Polyline en FreeCAD"""
|
|
if len(points) < 2:
|
|
return
|
|
|
|
poly = Part.makePolygon(points)
|
|
obj = FreeCAD.ActiveDocument.addObject("Part::Feature", "FlowPath")
|
|
obj.Shape = poly
|
|
obj.ViewObject.LineWidth = 2.0
|
|
obj.ViewObject.LineColor = (0.0, 0.0, 1.0)
|
|
|
|
|
|
class CommandHydrologicalAnalysis:
|
|
|
|
def GetResources(self):
|
|
return {'Pixmap': str(os.path.join(DirIcons, "drop.jpg")),
|
|
'MenuText': "Hidrological analysis",
|
|
'Accel': "H, A",
|
|
'ToolTip': "Hidrological analysis"}
|
|
|
|
def IsActive(self):
|
|
return True
|
|
|
|
def Activated(self):
|
|
# User input parameters (example values)
|
|
os.environ['OMP_NUM_THREADS'] = str(cpu_count())
|
|
os.environ['MKL_NUM_THREADS'] = str(cpu_count())
|
|
os.environ["FREECAD_NO_FORK"] = "1" # Desactiva el fork en sistemas Unix
|
|
#try:
|
|
# Parámetros de usuario
|
|
min_basin_area = 100 # m²
|
|
rainfall_intensity = 1.0
|
|
|
|
# Validar selección
|
|
mesh_obj = FreeCADGui.Selection.getSelection()[0]
|
|
if not mesh_obj.isDerivedFrom("Mesh::Feature"):
|
|
raise ValueError("Selecciona un objeto de malla")
|
|
|
|
# Procesamiento principal
|
|
FreeCAD.Console.PrintMessage(f"buscar basins: ")
|
|
FreeCADGui.updateGui()
|
|
basins = list(find_hydrological_basins(mesh_obj, min_basin_area))
|
|
FreeCAD.Console.PrintMessage(f" - Cuencas identificadas: {len(basins)}\n")
|
|
'''FreeCAD.Console.PrintMessage(f"simulate_water_flow: ")
|
|
FreeCADGui.updateGui()
|
|
flow_paths = simulate_water_flow(mesh_obj, basins, rainfall_intensity)
|
|
FreeCAD.Console.PrintMessage(f" - Trayectorias de flujo generadas: {len(flow_paths)}\n")
|
|
FreeCADGui.updateGui()'''
|
|
|
|
# Visualización
|
|
for basin in basins:
|
|
color = (random.random(), random.random(), random.random())
|
|
colorize_mesh(mesh_obj, basin['facets'], color)
|
|
|
|
'''for path in flow_paths:
|
|
create_polyline(path)'''
|
|
|
|
FreeCAD.ActiveDocument.recompute()
|
|
|
|
'''except Exception as e:
|
|
FreeCAD.Console.PrintError(f"Error: {str(e)}\n")
|
|
finally:
|
|
# Limpieza de memoria
|
|
import gc
|
|
gc.collect()'''
|