Files

326 lines
12 KiB
Python
Raw Permalink Normal View History

2025-01-28 00:04:13 +01:00
'''
This tool is part of the WhiteboxTools geospatial analysis library.
Authors: Dr. John Lindsay
Created: 01/07/2017
Last Modified: 18/10/2019
License: MIT
'''
'''use whitebox_raster::*
use whitebox_common::structures::Array2D
use crate::tools::*
use std::env
use std::f64
use std::io::Error, ErrorKind
use std::path'''
from crate.tools import *
from whitebox_raster import *
# This tool can be used to delineate all of the drainage basins contained within a local drainage direction,
# or flow pointer raster (`--d8_pntr`), and draining to the edge of the data. The flow pointer raster must be derived using
# the `D8Pointer` tool and should have been extracted from a digital elevation model (DEM) that has been
# hydrologically pre-processed to remove topographic depressions and flat areas, e.g. using the `BreachDepressions`
# tool. By default, the flow pointer raster is assumed to use the clockwise indexing method used by WhiteboxTools:
#
# | . | . | . |
# |:--:|:---:|:--:|
# | 64 | 128 | 1 |
# | 32 | 0 | 2 |
# | 16 | 8 | 4 |
#
# If the pointer file contains ESRI flow direction values instead, the `--esri_pntr` parameter must be specified.
#
# The `Basins` and `Watershed` tools are similar in function but while the `Watershed` tool identifies the upslope areas
# that drain to one or more user-specified outpoints, the `Basins` tool automatically sets outlets to all grid cells
# situated along the edge of the data that do not have a defined flow direction (i.e. they do not have a lower neighbour).
# Notice that these edge outlets need not be situated along the edges of the flow-pointer raster, but rather along the
# edges of the region of valid data. That is, the DEM from which the flow-pointer has been extracted may incompletely
# fill the containing raster, if it is irregular shaped, and NoData regions may occupy the peripherals. Thus, the entire
# region of valid data in the flow pointer raster will be divided into a set of mutually exclusive basins using this tool.
#
# # See Also
# `Watershed`, `D8Pointer`, `BreachDepressions`
class Basins:
def run(self, args: List[str], working_directory: str, verbose: bool) -> None:
d8_file = ''
output_file = ''
esri_style = False
if len(args) == 0:
raise ValueError("Tool run with no parameters.")
i = 0
'''while i < len(args):
arg = args[i].replace('"', '').replace("'", "")
cmd = arg.split("=")
vec = list(cmd)
keyval = False
if len(vec) > 1:
keyval = True
flag_val = vec[0].lower().replace("--", "-")
if flag_val == "-d8_pntr":
d8_file = vec[1] if keyval else args[i + 1]
elif flag_val == "-o" or flag_val == "-output":
output_file = vec[1] if keyval else args[i + 1]
elif flag_val == "-esri_pntr" or flag_val == "-esri_style":
if len(vec) == 1 or not vec[1].lower().contains("false"):
esri_style = True
i += 1
if verbose:
tool_name = self.get_tool_name()
welcome_len = max(len(f"* Welcome to {tool_name} *"), 28) # 28 = length of the 'Powered by' by statement.
print("*" * welcome_len)
print(f"* Welcome to {tool_name} *{' ' * (welcome_len - 15 - len(tool_name))}")
print("* Powered by WhiteboxTools *{' ' * (welcome_len - 28)}")
print("* www.whiteboxgeo.com *{' ' * (welcome_len - 23)}")
print("*" * welcome_len)'''
sep = os.path.sep
progress = 0
old_progress = 1
pntr = Raster(d8_file, mode="r")
start = time.time()
rows = pntr.configs.rows
columns = pntr.configs.columns
nodata = pntr.configs.nodata
dx = [1, 1, 1, 0, -1, -1, -1, 0]
dy = [-1, 0, 1, 1, 1, 0, -1, -1]
import numpy as np
flow_dir = np.full((rows, columns), -2, dtype=np.int8)
output = Raster(output_file, mode="w", raster=pntr)
output.data_type = DataType.FLOAT32
output.palette = "qual.plt"
output.photometric_interp = PhotometricInterpretation.CATEGORICAL
low_value = np.finfo(np.float64).min
output.reinitialize_values(low_value)
# Create a mapping from the pointer values to cell offsets.
# This may seem wasteful, using only 8 of 129 values in the array,
# but the mapping method is far faster than calculating np.log(z) / np.log(2.0).
# It's also a good way of allowing for different point styles.
pntr_matches = np.zeros(129, dtype=np.int8)
if not esri_style:
# This maps Whitebox-style D8 pointer values
# onto the cell offsets in d_x and d_y.
pntr_matches[1] = 0
pntr_matches[2] = 1
pntr_matches[4] = 2
pntr_matches[8] = 3
pntr_matches[16] = 4
pntr_matches[32] = 5
pntr_matches[64] = 6
pntr_matches[128] = 7
else:
# This maps Esri-style D8 pointer values
# onto the cell offsets in d_x and d_y.
pntr_matches[1] = 1
pntr_matches[2] = 2
pntr_matches[4] = 3
pntr_matches[8] = 4
pntr_matches[16] = 5
pntr_matches[32] = 6
pntr_matches[64] = 7
pntr_matches[128] = 0
basin_id = 0.0
for row in range(rows):
for col in range(columns):
z = pntr[row, col]
if z != nodata:
if z > 0.0:
flow_dir[row, col] = pntr_matches[int(z)]
else:
flow_dir[row, col] = -1
basin_id += 1.0
output[row, col] = basin_id
else:
output[row, col] = nodata
flag = False
x = 0
y = 0
dir = 0
outlet_id = nodata
for row in range(rows):
for col in range(columns):
if output[row, col] == low_value:
flag = False
x = col
y = row
outlet_id = nodata
while not flag:
# find its downslope neighbour
dir = flow_dir[y, x]
if dir >= 0:
# move x and y accordingly
x += dx[dir]
y += dy[dir]
# if the new cell already has a value in the output, use that as the outletID
z = output[y, x]
if z != low_value:
outlet_id = z
flag = True
else:
flag = True
flag = False
x = col
y = row
output[y, x] = outlet_id
while not flag:
# find its downslope neighbour
dir = flow_dir[y, x]
if dir >= 0:
# move x and y accordingly
x += dx[dir]
y += dy[dir]
# if the new cell already has a value in the output, use that as the outletID
if output[y, x] != low_value:
flag = True
else:
flag = True
output[y, x] = outlet_id
elapsed_time = get_formatted_elapsed_time(start)
output.add_metadata_entry(f"Created by whitebox_tools' {self.get_tool_name()}")
output.add_metadata_entry(f"D8 pointer file: {d8_file}")
output.add_metadata_entry(f"Elapsed Time (excluding I/O): {elapsed_time}")
try:
_ = output.write()
except Exception as e:
return Err(e)
from whitebox_tools import WhiteboxTools
import os
from whitebox_tools.types import Array2D, DataType, PhotometricInterpretation
class Basins(WhiteboxTools):
def run(self, args, working_directory, verbose):
d8_file = ''
output_file = ''
esri_style = False
sep = os.path.sep
progress = 0
old_progress = 1
# Reading data...
pntr = self.open_raster(d8_file)
pntr_info = self.get_raster_info(d8_file)
rows = pntr_info['rows']
columns = pntr_info['columns']
nodata = pntr_info['nodata']
dx = [1, 1, 1, 0, -1, -1, -1, 0]
dy = [-1, 0, 1, 1, 1, 0, -1, -1]
flow_dir = Array2D(rows, columns, -2)
output = self.create_new_raster(output_file, d8_file, DataType.F32)
output.configs.palette = "qual.plt"
output.configs.photometric_interp = PhotometricInterpretation.Categoricaloutput.reinitialize_values(float('-inf'))
pntr_matches = [0] * 129
if not esri_style:
pntr_matches[1] = 0
pntr_matches[2] = 1
pntr_matches[4] = 2
pntr_matches[8] = 3
pntr_matches[16] = 4
pntr_matches[32] = 5
pntr_matches[64] = 6
pntr_matches[128] = 7
else:
pntr_matches[1] = 1
pntr_matches[2] = 2
pntr_matches[4] = 3
pntr_matches[8] = 4
pntr_matches[16] = 5
pntr_matches[32] = 6
pntr_matches[64] = 7
pntr_matches[128] = 0
basin_id = 0
for row in range(rows):
for col in range(columns):
z = pntr[row, col]
if z != nodata:
if z > 0:
flow_dir[row, col] = pntr_matches[int(z)]
else:
flow_dir[row, col] = -1
basin_id += 1
output[row, col] = basin_id
else:
output[row, col] = nodata
if verbose:
progress = int(100 * (row * columns + col) / (rows * columns - 1))
if progress != old_progress:
print("Initializing: {}%".format(progress))
old_progress = progress
for row in range(rows):
for col in range(columns):
if output[row, col] == float('-inf'):
flag = False
x = col
y = row
outlet_id = nodata
while not flag:
dir = flow_dir[y, x]
if dir >= 0:
x += dx[dir]
y += dy[dir]
z = output[y, x]
if z != float('-inf'):
outlet_id = z
flag = True
else:
flag = True
flag = False
x = col
y = row
output[y, x] = outlet_id
while not flag:
dir = flow_dir[y, x]
if dir >= 0:
x += dx[dir]
y += dy[dir]
if output[y, x] != float('-inf'):
flag = True
else:
flag = True
output[y, x] = outlet_id
if verbose:
progress = int(100 * (row * columns + col) / (rows * columns - 1))
if progress != old_progress:
print("Progress: {}%".format(progress))
old_progress = progress
elapsed_time = self.get_formatted_elapsed_time(start)
output.add_metadata_entry("Created by whitebox_tools' {} tool".format(self.get_tool_name()))
output.add_metadata_entry("D8 pointer file: {}".format(d8_file))
output.add_metadata_entry("ESRI-style output: {}".format(str(esri_style)))
output.add_metadata_entry("Elapsed Time (excluding I/O): {}".format(elapsed_time))
if verbose:
print("Saving data...")
print("Output file written")