import FreeCAD import FreeCADGui import Mesh import Part import numpy as np import random from concurrent.futures import ThreadPoolExecutor from multiprocessing import Pool, cpu_count from collections import deque import os from PVPlantResources import DirIcons as DirIcons def mesh_to_numpy(mesh_obj): """Convierte la malla a arrays de NumPy con validación robusta""" mesh = mesh_obj.Mesh # Convertir vértices a array NumPy (shape: Nx3) vertices = np.array([(v.x, v.y, v.z) for v in mesh.Points], dtype=np.float32) # Convertir facetas a array NumPy (shape: Mx3) facets = np.array( [f.PointIndices for f in mesh.Facets], dtype=np.uint32) # Verificar integridad de índices max_index = len(mesh.Points) - 1 if facets.size > 0 and (facets > max_index).any(): raise ValueError("Índices de vértices fuera de rango") return vertices, facets def build_adjacency_matrix(facets): """Construye matriz de adyacencia con conversión segura de tipos""" edges = {} adjacency = [[] for _ in range(len(facets))] for idx, facet in enumerate(facets): if len(facet) != 3: continue v0, v1, v2 = facet for edge in [(v0, v1), (v1, v2), (v2, v0)]: sorted_edge = tuple(sorted(edge)) if sorted_edge not in edges: edges[sorted_edge] = [] edges[sorted_edge].append(idx) # Procesar solo aristas con 2 facetas for edge, facet_indices in edges.items(): if len(facet_indices) == 2: f1, f2 = facet_indices adjacency[f1].append(f2) adjacency[f2].append(f1) return adjacency def calculate_incenters_parallel(vertices, facets): """Cálculo paralelizado de incentros usando NumPy""" v0 = vertices[facets[:, 0]] v1 = vertices[facets[:, 1]] v2 = vertices[facets[:, 2]] a = np.linalg.norm(v1 - v2, axis=1) b = np.linalg.norm(v0 - v2, axis=1) c = np.linalg.norm(v0 - v1, axis=1) perimeters = a + b + c return (a[:, None] * v0 + b[:, None] * v1 + c[:, None] * v2) / perimeters[:, None] def find_basins_parallel(args): """Función paralelizable para procesamiento de cuencas""" chunk, adjacency, elevations = args basins = [] visited = np.zeros(len(elevations), dtype=bool) for seed in chunk: if visited[seed]: continue queue = deque([seed]) basin = [] min_elev = elevations[seed] while queue: current = queue.popleft() if visited[current]: continue visited[current] = True basin.append(current) neighbors = [n for n in adjacency[current] if elevations[n] >= min_elev] queue.extend(neighbors) if len(basin) > 0: basins.append(basin) return basins def find_hydrological_basins(mesh_obj, min_area=100): """Identificación de cuencas optimizada""" FreeCAD.Console.PrintMessage(f" -- vertices y facets: ") FreeCADGui.updateGui() vertices, facets = mesh_to_numpy(mesh_obj) FreeCAD.Console.PrintMessage(f" -- Adjacency: ") FreeCADGui.updateGui() adjacency = build_adjacency_matrix(facets) FreeCAD.Console.PrintMessage(f" -- Elevations: ") FreeCADGui.updateGui() elevations = calculate_incenters_parallel(vertices, facets)[:, 2] # Dividir trabajo en chunks chunk_size = len(facets) // (cpu_count() * 2) chunks = [ (chunk_range, adjacency, elevations) # Empaqueta los 3 argumentos for chunk_range in [ range(i, min(i + chunk_size, len(facets))) for i in range(0, len(facets), chunk_size) ] ] # Procesamiento paralelo with ThreadPoolExecutor(max_workers=cpu_count()) as executor: results = list(executor.map(find_basins_parallel, chunks)) # Combinar resultados all_basins = [b for sublist in results for b in sublist] # Filtrar por área mínima valid_basins = [] for basin in all_basins: area = sum(triangle_area(vertices[facets[i]]) for i in basin) if area >= min_area: valid_basins.append({'facets': basin, 'area': area}) return valid_basins def triangle_area(vertices): """Cálculo rápido de área con producto cruz""" return 0.5 * np.linalg.norm( np.cross(vertices[1] - vertices[0], vertices[2] - vertices[0]) ) def validate_facet(facet): """Valida que la faceta sea un triángulo válido""" return hasattr(facet, 'Points') and len(facet.Points) == 3 def calculate_incenter(facet): """Calcula el incentro usando la función nativa de FreeCAD""" try: return facet.InCircle[0] # (x, y, z) except: return None def build_adjacency(mesh): """Construye matriz de adyacencia eficiente en memoria""" edges = {} adjacency = [[] for _ in mesh.Facets] for idx, facet in enumerate(mesh.Facets): if not validate_facet(facet): continue pts = facet.Points for edge in [(min(pts[0], pts[1]), max(pts[0], pts[1])), (min(pts[1], pts[2]), max(pts[1], pts[2])), (min(pts[2], pts[0]), max(pts[2], pts[0]))]: if edge in edges: neighbor = edges[edge] adjacency[idx].append(neighbor) adjacency[neighbor].append(idx) del edges[edge] # Liberar memoria else: edges[edge] = idx return adjacency def find_hydrological_basins_old(mesh_obj, min_area=100): """Identificación de cuencas con validación de datos""" mesh = mesh_obj.Mesh adjacency = build_adjacency(mesh) basin_map = {} current_basin = 0 for seed in range(len(mesh.Facets)): if seed in basin_map or not validate_facet(mesh.Facets[seed]): continue queue = deque([seed]) basin_area = 0.0 basin_facets = [] while queue: facet_idx = queue.popleft() if facet_idx in basin_map: continue facet = mesh.Facets[facet_idx] in_center = calculate_incenter(facet) if not in_center: continue # Verificar mínimo local is_sink = True for neighbor in adjacency[facet_idx]: if neighbor >= len(mesh.Facets) or not validate_facet(mesh.Facets[neighbor]): continue n_center = calculate_incenter(mesh.Facets[neighbor]) if n_center and n_center[2] < in_center[2]: is_sink = False break if is_sink: basin_map[facet_idx] = current_basin basin_facets.append(facet_idx) basin_area += facet.Area # Expansión controlada for neighbor in adjacency[facet_idx]: if neighbor not in basin_map: queue.append(neighbor) if basin_area >= min_area: yield { 'facets': basin_facets, 'area': basin_area, 'depth': calculate_basin_depth(mesh, basin_facets) } current_basin += 1 def calculate_basin_depth(mesh, basin_facets): """Calcula la profundidad máxima de la cuenca""" min_z = float('inf') max_z = -float('inf') for idx in basin_facets: center = calculate_incenter(mesh.Facets[idx]) if center: min_z = min(min_z, center[2]) max_z = max(max_z, center[2]) return max_z - min_z if max_z != min_z else 0.0 def simulate_water_flow(mesh_obj, basins, rainfall=1.0): """ Simulación de flujo con prevención de bucles infinitos """ mesh = mesh_obj.Mesh adjacency = build_adjacency(mesh) flow_paths = [] for basin in basins: start_facets = basin['facets'][:2] # Muestra primeros 10 caminos for start in start_facets: path = [] visited = set() current = start while current is not None and current not in visited: visited.add(current) facet = mesh.Facets[current] center = calculate_incenter(facet) if not center: break path.append(FreeCAD.Vector(*center)) # Buscar vecino más bajo next_facet = None min_elev = float('inf') for neighbor in adjacency[current]: if neighbor >= len(mesh.Facets): continue n_center = calculate_incenter(mesh.Facets[neighbor]) if n_center and n_center[2] < min_elev: min_elev = n_center[2] next_facet = neighbor current = next_facet if min_elev < center[2] else None if len(path) > 1: flow_paths.append(path) return flow_paths def colorize_mesh(mesh_obj, facet_indices, color): """Coloriza facetas específicas de forma compatible""" mesh = mesh_obj.Mesh # Crear nuevo objeto Mesh colored_mesh = Mesh.Mesh() colored_mesh.addMesh(mesh) # Crear nuevo objeto en el documento new_obj = FreeCAD.ActiveDocument.addObject("Mesh::Feature", "ColoredBasin") new_obj.Mesh = colored_mesh # Asignar colores a los vértices vcolors = [] for idx in range(len(mesh.Points)): vcolors.append((0.8, 0.8, 0.8)) # Color base for facet_id in facet_indices: facet = mesh.Facets[facet_id] for vtx in facet.PointIndices: vcolors[vtx] = color # Color de la cuenca new_obj.ViewObject.PointColor = vcolors new_obj.ViewObject.Lighting = "One side" new_obj.ViewObject.Shading = "Flat Lines" def create_polyline(points): """Crea un objeto Polyline en FreeCAD""" if len(points) < 2: return poly = Part.makePolygon(points) obj = FreeCAD.ActiveDocument.addObject("Part::Feature", "FlowPath") obj.Shape = poly obj.ViewObject.LineWidth = 2.0 obj.ViewObject.LineColor = (0.0, 0.0, 1.0) class CommandHydrologicalAnalysis: def GetResources(self): return {'Pixmap': str(os.path.join(DirIcons, "drop.jpg")), 'MenuText': "Hidrological analysis", 'Accel': "H, A", 'ToolTip': "Hidrological analysis"} def IsActive(self): return True def Activated(self): # User input parameters (example values) os.environ['OMP_NUM_THREADS'] = str(cpu_count()) os.environ['MKL_NUM_THREADS'] = str(cpu_count()) os.environ["FREECAD_NO_FORK"] = "1" # Desactiva el fork en sistemas Unix #try: # Parámetros de usuario min_basin_area = 100 # m² rainfall_intensity = 1.0 # Validar selección mesh_obj = FreeCADGui.Selection.getSelection()[0] if not mesh_obj.isDerivedFrom("Mesh::Feature"): raise ValueError("Selecciona un objeto de malla") # Procesamiento principal FreeCAD.Console.PrintMessage(f"buscar basins: ") FreeCADGui.updateGui() basins = list(find_hydrological_basins(mesh_obj, min_basin_area)) FreeCAD.Console.PrintMessage(f" - Cuencas identificadas: {len(basins)}\n") '''FreeCAD.Console.PrintMessage(f"simulate_water_flow: ") FreeCADGui.updateGui() flow_paths = simulate_water_flow(mesh_obj, basins, rainfall_intensity) FreeCAD.Console.PrintMessage(f" - Trayectorias de flujo generadas: {len(flow_paths)}\n") FreeCADGui.updateGui()''' # Visualización for basin in basins: color = (random.random(), random.random(), random.random()) colorize_mesh(mesh_obj, basin['facets'], color) '''for path in flow_paths: create_polyline(path)''' FreeCAD.ActiveDocument.recompute() '''except Exception as e: FreeCAD.Console.PrintError(f"Error: {str(e)}\n") finally: # Limpieza de memoria import gc gc.collect()'''