''' This tool is part of the WhiteboxTools geospatial analysis library. Authors: Dr. John Lindsay Created: 01/07/2017 Last Modified: 18/10/2019 License: MIT ''' '''use whitebox_raster::* use whitebox_common::structures::Array2D use crate::tools::* use std::env use std::f64 use std::io::Error, ErrorKind use std::path''' from crate.tools import * from whitebox_raster import * # This tool can be used to delineate all of the drainage basins contained within a local drainage direction, # or flow pointer raster (`--d8_pntr`), and draining to the edge of the data. The flow pointer raster must be derived using # the `D8Pointer` tool and should have been extracted from a digital elevation model (DEM) that has been # hydrologically pre-processed to remove topographic depressions and flat areas, e.g. using the `BreachDepressions` # tool. By default, the flow pointer raster is assumed to use the clockwise indexing method used by WhiteboxTools: # # | . | . | . | # |:--:|:---:|:--:| # | 64 | 128 | 1 | # | 32 | 0 | 2 | # | 16 | 8 | 4 | # # If the pointer file contains ESRI flow direction values instead, the `--esri_pntr` parameter must be specified. # # The `Basins` and `Watershed` tools are similar in function but while the `Watershed` tool identifies the upslope areas # that drain to one or more user-specified outpoints, the `Basins` tool automatically sets outlets to all grid cells # situated along the edge of the data that do not have a defined flow direction (i.e. they do not have a lower neighbour). # Notice that these edge outlets need not be situated along the edges of the flow-pointer raster, but rather along the # edges of the region of valid data. That is, the DEM from which the flow-pointer has been extracted may incompletely # fill the containing raster, if it is irregular shaped, and NoData regions may occupy the peripherals. Thus, the entire # region of valid data in the flow pointer raster will be divided into a set of mutually exclusive basins using this tool. # # # See Also # `Watershed`, `D8Pointer`, `BreachDepressions` class Basins: def run(self, args: List[str], working_directory: str, verbose: bool) -> None: d8_file = '' output_file = '' esri_style = False if len(args) == 0: raise ValueError("Tool run with no parameters.") i = 0 '''while i < len(args): arg = args[i].replace('"', '').replace("'", "") cmd = arg.split("=") vec = list(cmd) keyval = False if len(vec) > 1: keyval = True flag_val = vec[0].lower().replace("--", "-") if flag_val == "-d8_pntr": d8_file = vec[1] if keyval else args[i + 1] elif flag_val == "-o" or flag_val == "-output": output_file = vec[1] if keyval else args[i + 1] elif flag_val == "-esri_pntr" or flag_val == "-esri_style": if len(vec) == 1 or not vec[1].lower().contains("false"): esri_style = True i += 1 if verbose: tool_name = self.get_tool_name() welcome_len = max(len(f"* Welcome to {tool_name} *"), 28) # 28 = length of the 'Powered by' by statement. print("*" * welcome_len) print(f"* Welcome to {tool_name} *{' ' * (welcome_len - 15 - len(tool_name))}") print("* Powered by WhiteboxTools *{' ' * (welcome_len - 28)}") print("* www.whiteboxgeo.com *{' ' * (welcome_len - 23)}") print("*" * welcome_len)''' sep = os.path.sep progress = 0 old_progress = 1 pntr = Raster(d8_file, mode="r") start = time.time() rows = pntr.configs.rows columns = pntr.configs.columns nodata = pntr.configs.nodata dx = [1, 1, 1, 0, -1, -1, -1, 0] dy = [-1, 0, 1, 1, 1, 0, -1, -1] import numpy as np flow_dir = np.full((rows, columns), -2, dtype=np.int8) output = Raster(output_file, mode="w", raster=pntr) output.data_type = DataType.FLOAT32 output.palette = "qual.plt" output.photometric_interp = PhotometricInterpretation.CATEGORICAL low_value = np.finfo(np.float64).min output.reinitialize_values(low_value) # Create a mapping from the pointer values to cell offsets. # This may seem wasteful, using only 8 of 129 values in the array, # but the mapping method is far faster than calculating np.log(z) / np.log(2.0). # It's also a good way of allowing for different point styles. pntr_matches = np.zeros(129, dtype=np.int8) if not esri_style: # This maps Whitebox-style D8 pointer values # onto the cell offsets in d_x and d_y. pntr_matches[1] = 0 pntr_matches[2] = 1 pntr_matches[4] = 2 pntr_matches[8] = 3 pntr_matches[16] = 4 pntr_matches[32] = 5 pntr_matches[64] = 6 pntr_matches[128] = 7 else: # This maps Esri-style D8 pointer values # onto the cell offsets in d_x and d_y. pntr_matches[1] = 1 pntr_matches[2] = 2 pntr_matches[4] = 3 pntr_matches[8] = 4 pntr_matches[16] = 5 pntr_matches[32] = 6 pntr_matches[64] = 7 pntr_matches[128] = 0 basin_id = 0.0 for row in range(rows): for col in range(columns): z = pntr[row, col] if z != nodata: if z > 0.0: flow_dir[row, col] = pntr_matches[int(z)] else: flow_dir[row, col] = -1 basin_id += 1.0 output[row, col] = basin_id else: output[row, col] = nodata flag = False x = 0 y = 0 dir = 0 outlet_id = nodata for row in range(rows): for col in range(columns): if output[row, col] == low_value: flag = False x = col y = row outlet_id = nodata while not flag: # find its downslope neighbour dir = flow_dir[y, x] if dir >= 0: # move x and y accordingly x += dx[dir] y += dy[dir] # if the new cell already has a value in the output, use that as the outletID z = output[y, x] if z != low_value: outlet_id = z flag = True else: flag = True flag = False x = col y = row output[y, x] = outlet_id while not flag: # find its downslope neighbour dir = flow_dir[y, x] if dir >= 0: # move x and y accordingly x += dx[dir] y += dy[dir] # if the new cell already has a value in the output, use that as the outletID if output[y, x] != low_value: flag = True else: flag = True output[y, x] = outlet_id elapsed_time = get_formatted_elapsed_time(start) output.add_metadata_entry(f"Created by whitebox_tools' {self.get_tool_name()}") output.add_metadata_entry(f"D8 pointer file: {d8_file}") output.add_metadata_entry(f"Elapsed Time (excluding I/O): {elapsed_time}") try: _ = output.write() except Exception as e: return Err(e) from whitebox_tools import WhiteboxTools import os from whitebox_tools.types import Array2D, DataType, PhotometricInterpretation class Basins(WhiteboxTools): def run(self, args, working_directory, verbose): d8_file = '' output_file = '' esri_style = False sep = os.path.sep progress = 0 old_progress = 1 # Reading data... pntr = self.open_raster(d8_file) pntr_info = self.get_raster_info(d8_file) rows = pntr_info['rows'] columns = pntr_info['columns'] nodata = pntr_info['nodata'] dx = [1, 1, 1, 0, -1, -1, -1, 0] dy = [-1, 0, 1, 1, 1, 0, -1, -1] flow_dir = Array2D(rows, columns, -2) output = self.create_new_raster(output_file, d8_file, DataType.F32) output.configs.palette = "qual.plt" output.configs.photometric_interp = PhotometricInterpretation.Categoricaloutput.reinitialize_values(float('-inf')) pntr_matches = [0] * 129 if not esri_style: pntr_matches[1] = 0 pntr_matches[2] = 1 pntr_matches[4] = 2 pntr_matches[8] = 3 pntr_matches[16] = 4 pntr_matches[32] = 5 pntr_matches[64] = 6 pntr_matches[128] = 7 else: pntr_matches[1] = 1 pntr_matches[2] = 2 pntr_matches[4] = 3 pntr_matches[8] = 4 pntr_matches[16] = 5 pntr_matches[32] = 6 pntr_matches[64] = 7 pntr_matches[128] = 0 basin_id = 0 for row in range(rows): for col in range(columns): z = pntr[row, col] if z != nodata: if z > 0: flow_dir[row, col] = pntr_matches[int(z)] else: flow_dir[row, col] = -1 basin_id += 1 output[row, col] = basin_id else: output[row, col] = nodata if verbose: progress = int(100 * (row * columns + col) / (rows * columns - 1)) if progress != old_progress: print("Initializing: {}%".format(progress)) old_progress = progress for row in range(rows): for col in range(columns): if output[row, col] == float('-inf'): flag = False x = col y = row outlet_id = nodata while not flag: dir = flow_dir[y, x] if dir >= 0: x += dx[dir] y += dy[dir] z = output[y, x] if z != float('-inf'): outlet_id = z flag = True else: flag = True flag = False x = col y = row output[y, x] = outlet_id while not flag: dir = flow_dir[y, x] if dir >= 0: x += dx[dir] y += dy[dir] if output[y, x] != float('-inf'): flag = True else: flag = True output[y, x] = outlet_id if verbose: progress = int(100 * (row * columns + col) / (rows * columns - 1)) if progress != old_progress: print("Progress: {}%".format(progress)) old_progress = progress elapsed_time = self.get_formatted_elapsed_time(start) output.add_metadata_entry("Created by whitebox_tools' {} tool".format(self.get_tool_name())) output.add_metadata_entry("D8 pointer file: {}".format(d8_file)) output.add_metadata_entry("ESRI-style output: {}".format(str(esri_style))) output.add_metadata_entry("Elapsed Time (excluding I/O): {}".format(elapsed_time)) if verbose: print("Saving data...") print("Output file written")