# -*- coding: utf-8 -*-
"""
* Copyright (C) 2023-2025 Alexandre Gauvain, Ronan Abhervé, Jean-Raynald de Dreuzy
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
"""
#%% LIBRAIRIES
import os
import re
import numbers
import datetime
import matplotlib.pyplot as plt
import matplotlib as mpl
from matplotlib.font_manager import FontProperties
import rasterio as rio
import rasterio.features # necessary to avoid a bug
from rasterio.warp import calculate_default_transform, reproject
import geopandas as gpd
from shapely.geometry import Point
import xarray as xr
xr.set_options(keep_attrs = True)
# import rioxarray as rio #Not necessary, the rio module from xarray is enough
from pyproj import CRS
from pyproj import Transformer
from pyproj.aoi import AreaOfInterest
from pyproj.database import query_utm_crs_info
import pandas as pd
import numpy as np
import whitebox
wbt = whitebox.WhiteboxTools()
wbt.verbose = False
from .log_manager import get_logger
logger = get_logger(__name__)
#%% DIRECTORY MANAGEMENT
[docs]
def create_folder(path):
"""
If not exist, create a new empty folder.
Parameters
----------
path : str
Folder path.
"""
if not os.path.exists(path):
os.makedirs(path)
#%% RASTER PROCESSING
[docs]
def clip_tif(tif_path, shp_path, out_path, maintain_dimensions):
"""
Clip a raster from a shapefile polygon.
Parameters
----------
tif_path : str
Raster path.
shp_path : str
Shapefile path.
out_path : str
Ouput result path.
maintain_dimensions : bool
Maintain the raster dimension or not.
"""
wbt.clip_raster_to_polygon(tif_path, shp_path, out_path, maintain_dimensions=maintain_dimensions)
[docs]
def mask_by_dem(target_data, mask_data, cond_symb, value_masked):
"""
Mask raster from different conditions
Parameters
----------
target_data : 2D matrix
Raster data to mask.
mask_data : 2D matrix
Raster reference for mask.
cond_symb : str
Select the mask consition: '==','!=','<=','>=','>','<'.
value_masked : float
Value to mask.
Returns
-------
masked : 2D matrix
Masked raster.
"""
if cond_symb == '==':
masked = np.ma.masked_array(target_data, mask=mask_data==value_masked)
if cond_symb == '!=':
masked = np.ma.masked_array(target_data, mask=mask_data!=value_masked)
if cond_symb == '<=':
masked = np.ma.masked_array(target_data, mask=mask_data<=value_masked)
if cond_symb == '>=':
masked = np.ma.masked_array(target_data, mask=mask_data>=value_masked)
if cond_symb == '>':
masked = np.ma.masked_array(target_data, mask=mask_data>value_masked)
if cond_symb == '<':
masked = np.ma.masked_array(target_data, mask=mask_data<value_masked)
return masked
def load_to_numpy(file, src_crs=None,
base_path:str=None, dst_crs=None, out_path:str=None):
"""
Generate a numpy array from a source file (vector or raster) and a base
raster. The numpy array profile (shape, resolution, extent...) matches
with the base one.
If the base raster is not specified (base_path), then the generated numpy
array has the same profile as the source file.
When the source CRS is not embeded in the source file, it can be specified
with src_crs.
When the destination CRS is not embeded in the base file, it can also be
specified with dst_crs.
out_path gives the possibility to export the result as a .tif file.
Parameters
----------
file : str or geopandas.GeoDataFrame
Path to the input file to process, or geopandas GoDataFrame.
src_crs : int or str, optional (The default is None)
If the CRS is not embeded in the input file, it is possible to
specify it here, as an integer (EPSG), or a str 'EPSG:<int>'
base_path : str, optional (The default is None)
Path to the file that will serve as the base for dimensions, resolution,
extent...
dst_crs : int or str, optional (The default is None)
If the CRS is not embeded in the base file, it is possible to
specify it here, as an integer (EPSG), or a str 'EPSG:<int>'
out_path : str, optional (The default is None)
If specified, the numpy array will be saved as a .tif file, using the
profile from the base file.
Returns
-------
val : numpy.ndarray
"""
# Initializations:
if base_path:
with rio.open(base_path, 'r') as base:
base_profile = base.profile
base_val = base.read(1) # base.read()[0]
else:
base_profile = None
if isinstance(src_crs, str): src_crs = rio.crs.CRS.from_string(src_crs)
elif isinstance(src_crs, int): src_crs = rio.crs.CRS.from_epsg(src_crs)
if isinstance(dst_crs, str): dst_crs = rio.crs.CRS.from_string(dst_crs)
elif isinstance(dst_crs, int): dst_crs = rio.crs.CRS.from_epsg(dst_crs)
file_vect = None
if isinstance(file, gpd.geodataframe.GeoDataFrame):
file_vect = file
elif os.path.splitext(file)[-1] in ['.shp', '.dbf', '.shx']: # shapefile
file_vect = gpd.read_file(file)
elif os.path.splitext(file)[-1] in ['.txt', '.csv']: # coordinates array
"""
The input file should be formated as:
id;x;y
0;34500;7456125
1;35675;7991500
...
"""
try:
df = pd.read_csv(file, sep = ";")
geometry = [Point(xy) for xy in zip(df.x, df.y)]
df = df.drop(columns = ['x', 'y'])
file_vect = gpd.GeoDataFrame(df, geometry = geometry)
except Exception as exc:
logger.error(
"Failed to read coordinate CSV %s; expected columns 'id;x;y'",
file,
)
logger.debug("Coordinate CSV parsing error: %s", exc)
if file_vect is not None: # shapefile
if base_profile:
# CRS initialization
if not file_vect.crs: # if not file_vect.crs.is_geographic nor file_vect.crs.is_projected:
if src_crs:
file_vect.set_crs(crs = src_crs, inplace = True, allow_override = True)
else:
logger.error("Source CRS (src_crs) required to rasterize vector dataset")
return
if not base_profile['crs'].is_valid:
if dst_crs: base_profile['crs'] = dst_crs
else:
logger.error("Destination CRS (dst_crs) required to rasterize vector dataset")
return
# The vector needs to be in the same CRS as the base raster:
logger.info(f"Before rasterization, the vector will be converted from 'EPSG:{file_vect.crs.to_epsg()}' into 'EPSG:{base_profile['crs'].to_epsg()}'.")
file_vect.to_crs(crs = base_profile['crs'].to_epsg(), inplace = True)
# Rasterize:
val = rio.features.rasterize(
[(val.geometry, 1) for _, val in file_vect.iterrows()],
out_shape = (base_profile['height'], base_profile['width']),
transform = base_profile['transform'],
fill = base_profile['nodata'],
all_touched = False)
# update profile
data_profile = base_profile
else: # if there is no base_profile
logger.error("Raster profile required to rasterize vector dataset; provide base raster or profile")
return
else: # input file is a raster
with rio.open(file, 'r') as data:
data_profile = data.profile
if src_crs and not data_profile['crs'].is_valid:
data_profile['crs'] = src_crs
# print(f"The CRS of input data has been set to 'EPSG:{data_profile['crs'].to_epsg()}'.\n")
# data_crs = data.crs
val = data.read(1) # data.read()[0] # extract the first layer
# Reprojection:
# if (crs_proj and (str(data_crs) != crs_proj)) or (base_profile and (data_profile != base_profile)):
if base_profile:
# CRS initialization
if dst_crs and not base_profile['crs'].is_valid:
base_profile['crs'] = dst_crs
if data_profile != base_profile:
if not data_profile['crs'].is_valid:
logger.error("Source CRS (src_crs) required to reproject raster dataset")
return
if not base_profile['crs'].is_valid:
logger.error("Destination CRS (dst_crs) required to reproject raster dataset")
return
rio.warp.reproject(source = val,
destination = base_val,
src_transform = data_profile['transform'],
src_crs = data_profile['crs'],
src_nodata = data_profile['nodata'],
dst_transform = base_profile['transform'],
dst_crs = base_profile['crs'],
dst_nodata = base_profile['nodata'],
# resampling = rio.enums.Resampling(0),
resampling = rasterio.enums.Resampling(1), # (0), (5)
)
# update_profile
data_profile = base_profile
# update values array
val = base_val
# Ne fonctionne pas encore
# =============================================================================
# # Drop nodata margins:
# J, I = np.where(val == 1)
# imin = I.min()
# imax = I.max()
# jmin = J.min()
# jmax = J.max()
# xmin = data_profile['transform'][2] + imin*data_profile['transform'][0]
# ymax = data_profile['transform'][5] + (data_profile['height']-jmax)*data_profile['transform'][5]
# data_profile['transform'] = Affine(data_profile['transform'][0],
# data_profile['transform'][1],
# xmin,
# data_profile['transform'][3],
# data_profile['transform'][4],
# ymax)
# data_profile['width'] = imax - imin
# data_profile['height'] = jmax - jmin
# =============================================================================
if out_path: # to export as a .tif file (optional)
with rio.open(out_path, 'w', **data_profile) as dst:
dst.write_band(1, val)
if base_profile:
dst_crs = base_profile['crs']
nodata = base_profile['nodata']
else:
nodata = None
if file_vect is not None:
src_crs = file_vect.crs
else:
src_crs = data_profile['crs']
return val, src_crs, dst_crs, nodata
def load_to_xarray(file, src_crs=None, main_var=None,
base_path:str=None, dst_crs=None):
"""
Parameters
----------
file : str (path) or xarray.Dataset
DESCRIPTION.
src_crs : TYPE, optional
DESCRIPTION. The default is None.
main_var : TYPE, optional
DESCRIPTION. The default is None.
base_path : str, optional
DESCRIPTION. The default is None.
dst_crs : TYPE, optional
DESCRIPTION. The default is None.
Returns
-------
TYPE
DESCRIPTION.
src_crs : TYPE
DESCRIPTION.
dst_crs : TYPE
DESCRIPTION.
nodata : TYPE
DESCRIPTION.
"""
# ---- Initialization
if base_path:
with rio.open(base_path, 'r') as base:
base_profile = base.profile
base_val = base.read(1) # base.read()[0]
else:
base_profile = None
if isinstance(src_crs, str): src_crs = rio.crs.CRS.from_string(src_crs)
elif isinstance(src_crs, int): src_crs = rio.crs.CRS.from_epsg(src_crs)
if isinstance(dst_crs, str): dst_crs = rio.crs.CRS.from_string(dst_crs)
elif isinstance(dst_crs, int): dst_crs = rio.crs.CRS.from_epsg(dst_crs)
# ---- Loading netcdf
if isinstance(file, str):
if os.path.splitext(file)[-1].casefold() in ['.tif', '.tiff']:
with xr.open_dataset(file) as ds:
ds.load() # to unlock the resource
ds = ds.squeeze('band')
ds = ds.drop_vars('band')
if main_var:
ds = ds.rename(dict(band_data = main_var))
elif os.path.splitext(file)[-1].casefold() == '.nc':
try:
with xr.open_dataset(file, decode_coords = 'all') as ds:
ds.load() # to unlock the resource
except ValueError:
# Usually this error appears when unable to decode
# time units 'Months since 1901-01-01' with
# "calendar 'proleptic_gregorian'"
logger.warning("Unable to decode NetCDF time units; falling back to manual parsing")
with xr.open_dataset(file, decode_coords = 'all',
decode_times = False) as ds:
ds.load()
try: ds.time.attrs['units']
except:
logger.error("No time unit metadata found in NetCDF dataset")
return
# Build back time scale:
# print(f"Time axis will be inferred from 'time' attributes: \"{ds.time.attrs['units']}\"...")
timeunit = ds.time.attrs['units'].split()[0].casefold()
if timeunit in ['month', 'months', 'mois']:
freq = 'MS'
freq_info = 'monthly'
elif timeunit in ['day', 'days', 'jour', 'jours']:
freq = '1D'
freq_info = 'daily'
logger.info(
"Expecting origin date formatted as YYYY MM DD or DD MM YYYY when rebuilding time axis"
)
# The format of the origin date is expected to be either
# YYYY MM DD or DD MM YYYY (with any separator)
# The american format MM DD YYYY is not considered
initdate_pattern = re.compile(r"\d{2,4}.*\d{2,4}")
initdate = initdate_pattern.search(ds.time.attrs['units']).group()
if initdate[2].isnumeric():
sep = initdate[4]
initdate = datetime.datetime.strptime(initdate, f"%Y{sep}%m{sep}%d")
else:
sep = initdate[2]
initdate = datetime.datetime.strptime(initdate, f"%d{sep}%m{sep}%Y")
start_date = pd.Series(pd.date_range(
initdate, periods = int(ds.time[0]) + 1, freq = freq)).iloc[-1]
date_index = pd.date_range(start = start_date,
periods = len(ds.time), freq = freq)
# print(f"Time axis from {date_index[0]} to {date_index[-1]} ({freq_info}).\n")
ds['time'] = date_index
else:
logger.error("File extension %s not supported for xarray loading", os.path.splitext(file)[-1])
return
elif isinstance(file, xr.core.dataset.Dataset):
ds = file
# ---- Reprojection
# Helper function to check if a CRS is invalid
def _is_crs_invalid(crs):
if crs is None:
return True
crs_str = str(crs)
invalid_patterns = ['EngineeringCRS', 'Unknown engineering datum',
'LOCAL_CS', 'UNIT["unknown"', 'unnamed']
if any(p in crs_str for p in invalid_patterns):
return True
try:
return crs.to_dict().get('type') == 'EngineeringCRS'
except:
return False
# Apply source CRS if provided and needed
if src_crs:
current_crs = ds.rio.crs
if _is_crs_invalid(current_crs) or 'spatial_ref' not in ds.coords:
if 'spatial_ref' in ds.coords:
ds = ds.drop_vars('spatial_ref')
ds.rio.write_crs(src_crs, inplace = True)
data_transform = ds.rio.transform()
# Reproject to match base profile if provided
if base_profile:
# Fix base CRS if invalid
if dst_crs and _is_crs_invalid(base_profile['crs']):
base_profile['crs'] = dst_crs
# Reproject if geometry or CRS differ
if (data_transform != base_profile['transform']) | (ds.rio.crs != base_profile['crs']):
if _is_crs_invalid(ds.rio.crs):
logger.error("Source CRS (src_crs) required to reproject xarray dataset")
return
if _is_crs_invalid(base_profile['crs']):
logger.error("Destination CRS (dst_crs) required to reproject xarray dataset")
return
ds = ds.rio.reproject(dst_crs = base_profile['crs'],
transform = base_profile['transform'],
shape = (base_profile['height'], base_profile['width']),
nodata = np.nan,
resampling = rasterio.enums.Resampling(1))
# Or reproject to dst_crs if specified without base
elif dst_crs is not None:
if _is_crs_invalid(ds.rio.crs):
logger.error("Source CRS (src_crs) required to reproject - current CRS is invalid: %s", ds.rio.crs)
return
ds = ds.rio.reproject(dst_crs = dst_crs)
# ---- Format spatial attributes for compatibility with QGIS
if 'units' in ds.x.attrs.keys() and ds.x.attrs['units'].casefold() in ['m', 'meter', 'meters', 'metre', 'metres']:
ds.x.attrs = {'standard_name': 'projection_x_coordinate',
'long_name': 'x coordinate of projection',
'units': 'Meter'}
ds.y.attrs = {'standard_name': 'projection_y_coordinate',
'long_name': 'y coordinate of projection',
'units': 'Meter'}
elif 'units' in ds.x.attrs.keys() and 'deg' in ds.x.attrs['units']:
ds.longitude.attrs = {'long_name': 'longitude',
'units': 'degrees_east'}
ds.latitude.attrs = {'long_name': 'latitude',
'units': 'degrees_north'}
return ds #, src_crs, dst_crs, nodata
#%% EXTRACTING FEATURES
[docs]
def basin_area(target_data, mask_data, cond_symb, value_masked, resolution):
"""
Calculate the area of a masked raster.
Parameters
----------
target_data : 2D matrix
Raster data to mask.
mask_data : 2D matrix
Raster reference for mask.
cond_symb : str
Select the mask consition: '==','!=','<=','>=','>','<'.
value_masked : float
Value to mask.
resolution : float
Cell resolution of the raster.
Returns
-------
area : float
Area in [km²] if resolution is in [m].
"""
masked = mask_by_dem(target_data, mask_data, cond_symb, value_masked)
cell = masked.count()
area = (cell * resolution**2) / 1000000
return area
def rmse_manual(sim, obs):
"""Root Mean Square Error (RMSE)."""
return np.sqrt(np.mean((sim - obs) ** 2))
def nse_manual(sim, obs, transform=None):
"""Nash–Sutcliffe Efficiency (optionally on log‑transformed Q)."""
if transform == 'log':
eps = 1e-6
sim, obs = np.log(sim + eps), np.log(obs + eps)
num = np.sum((obs - sim) ** 2)
den = np.sum((obs - np.mean(obs)) ** 2)
return 1 - num/den
def mare_manual(sim, obs):
"""Mean Absolute Relative Error (MARE)."""
return np.mean(np.abs(sim - obs) / obs)
def kge_manual(sim, obs):
"""Kling–Gupta Efficiency and its three components (r, α, β)."""
# Pearson r
r = np.corrcoef(sim, obs)[0,1]
# spread ratio α
alpha = np.std(sim) / np.std(obs)
# bias ratio β (sum‑based, same as mean‑based)
beta = np.sum(sim) / np.sum(obs)
kge = 1 - np.sqrt((r - 1)**2 + (alpha - 1)**2 + (beta - 1)**2)
return kge, r, alpha, beta
[docs]
def efficiency_criteria(sim, obs):
"""
Compute [RMSE, nRMSE, NSE, NSElog, BAL, MARE, KGE] on two 1D arrays,
doing pair‑wise deletion of NaNs in obs.
"""
# flatten and mask out any NaN in obs
sim = np.asarray(sim).ravel()
obs = np.asarray(obs).ravel()
mask = ~np.isnan(obs)
sim, obs = sim[mask], obs[mask]
# now all metrics on equal‑length vectors
rmse = rmse_manual(sim, obs)
nrmse = rmse / np.mean(obs)
nse = nse_manual(sim, obs)
nselog= nse_manual(sim, obs, transform='log')
bal = np.sum(sim) / np.sum(obs)
mare = mare_manual(sim, obs)
kge = kge_manual(sim, obs)[0]
return rmse, nrmse, nse, nselog, bal, mare, kge
[docs]
def date_range(start, periods, freq):
"""
Generate timestamp from datetime.
Parameters
----------
start : int
Starting year.
periods : int
Number of periods.
freq : str
Frequency of the datetime: 'D','W','M','Y'.
Returns
-------
time : datetime
Datetime generated.
"""
time = pd.date_range(str(start), periods=periods, freq=freq)
return time
def hydrological_mean(data, accuracy=15):
"""
Compute the mean value on the longest period that meets the following
conditions:
- period should be made of full years (period is a year-multiple)
- period should be larger than one year
- end date of the period should be same day and month as the first date
of the period, more or less the accuracy
Parameters
----------
data : pandas.core.series.Series or pandas.core.frame.DataFrame
DESCRIPTION.
accuracy : number, optional
DESCRIPTION. The default is 15.
Returns
-------
avg : float or pandas.core.series.Series
The average value.
"""
#% Get rid of the first and last value (there are great chances that
# they are irrelevant, especially in resampled data sets)
data = data[1:-1]
#% Format the index to Timestamp, if needed
if isinstance(data.index[0], numbers.Number):
data.index = data['time']
if isinstance(data.index[0], str):
data.index = pd.to_datetime(data.index)
# Safeguard
if not isinstance(data.index[0], datetime.datetime):
logger.error("No recognized datetime index in input series for hydrological_mean")
return
#% Get the most recent date that falls within the accuracy range
idx = data[data.index.month == data.index[0].month][
abs(data[data.index.month == data.index[0].month].index.day - \
data.index[0].day)-3 <= 0].index[-1]
# n_years = np.mean((data.index[-1]-data.index[0])/365.2425)
if (idx - data.index[0]).days < 350:
logger.warning("Time range shorter than one year; using simple mean instead of hydrological mean")
# print(f"Average values are computed from {data.index[0].strftime('%Y-%m-%d')} to {idx.strftime('%Y-%m-%d')}")
avg = data[data.index[0]:idx].mean(numeric_only = False)
return avg
#%% PLOT SETTINGS
[docs]
def plot_params(small,interm,medium,large):
"""
Change options for plots.
Parameters
----------
small : float
Small size.
interm : float
Intermediate size.
medium : float
Medium size.
large : float
Large size.
Returns
-------
fontprop : dict
Properties of font.
"""
small = small
interm = interm
medium = medium
large = large
# mpl.rcParams['backend'] = 'wxAgg'
mpl.style.use('classic')
mpl.rcParams["figure.facecolor"] = 'white'
mpl.rcParams['grid.color'] = 'darkgrey'
mpl.rcParams['grid.linestyle'] = '-'
mpl.rcParams['grid.alpha'] = 0.8
mpl.rcParams['axes.axisbelow'] = True
mpl.rcParams['axes.linewidth'] = 1.5
mpl.rcParams['figure.dpi'] = 300
mpl.rcParams['savefig.dpi'] = 300
mpl.rcParams['patch.force_edgecolor'] = True
mpl.rcParams['image.interpolation'] = 'nearest'
mpl.rcParams['image.resample'] = True
mpl.rcParams['axes.autolimit_mode'] = 'data' # 'round_numbers' #
mpl.rcParams['axes.xmargin'] = 0.05
mpl.rcParams['axes.ymargin'] = 0.05
mpl.rcParams['xtick.direction'] = 'in'
mpl.rcParams['ytick.direction'] = 'in'
mpl.rcParams['xtick.major.size'] = 5
mpl.rcParams['xtick.minor.size'] = 3
mpl.rcParams['xtick.major.width'] = 1.5
mpl.rcParams['xtick.minor.width'] = 1
mpl.rcParams['ytick.major.size'] = 5
mpl.rcParams['ytick.minor.size'] = 1.5
mpl.rcParams['ytick.major.width'] = 1.5
mpl.rcParams['ytick.minor.width'] = 1
mpl.rcParams['xtick.top'] = True
mpl.rcParams['ytick.right'] = True
mpl.rcParams['legend.numpoints'] = 1
mpl.rcParams['legend.scatterpoints'] = 1
mpl.rcParams['legend.edgecolor'] = 'grey'
mpl.rcParams['date.autoformatter.year'] = '%Y'
mpl.rcParams['date.autoformatter.month'] = '%Y-%m'
mpl.rcParams['date.autoformatter.day'] = '%Y-%m-%d'
mpl.rcParams['date.autoformatter.hour'] = '%H:%M'
mpl.rcParams['date.autoformatter.minute'] = '%H:%M:%S'
mpl.rcParams['date.autoformatter.second'] = '%H:%M:%S'
mpl.rcParams.update({'mathtext.default': 'regular' })
plt.rc('font', size=small) # controls default text sizes **font
plt.rc('figure', titlesize=large) # fontsize of the figure title
plt.rc('legend', fontsize=small) # legend fontsize
plt.rc('axes', titlesize=medium, labelpad=10) # fontsize of the axes title
plt.rc('axes', labelsize=medium, labelpad=12) # fontsize of the x and y labels
plt.rc('xtick', labelsize=interm) # fontsize of the tick labels
plt.rc('ytick', labelsize=interm) # fontsize of the tick labels
plt.rc('font', family='sans serif')
fontprop = FontProperties()
fontprop.set_family('sans serif') # for x and y label
fontdic = {'family' : 'sans serif', 'weight' : 'bold'} # for legend
return fontprop
#%% REPROJECT DATA
[docs]
def export_tif(base_dem_path, data_to_tif, data_tif_path,
data_nodata_val=None, data_crs=None):
"""
Export tif from 2D matrix data following raster reference.
Parameters
----------
base_dem_path : str
Path of raster reference.
data_to_tif : 2D matrix
Data to export in raster..
data_tif_path : TYPE
Output path of the exported raster.
data_nodata_val : float, optional
To replace base nodata value.
data_crs : str or int or CRS, optional
To replace base Coordinates Reference System
"""
# Open base dem
with rio.open(base_dem_path) as src:
ras_data = src.read()
ras_nodata = src.nodatavals
ras_dtype = src.dtypes
ras_meta = src.profile
# Type of data
data_dtype = data_to_tif.dtype
# Change base dem from data
ras_meta['dtype'] = data_dtype
if data_nodata_val is not None:
ras_meta['nodata'] = data_nodata_val
if data_crs is not None:
if isinstance(data_crs, str):
ras_meta['crs'] = rio.crs.CRS.from_string(data_crs)
elif isinstance(data_crs, int):
ras_meta['crs'] = rio.crs.CRS.from_epsg(data_crs)
else:
ras_meta['crs'] = data_crs
# Create new data raster with base dem size
with rio.open(data_tif_path, 'w', **ras_meta) as dst:
dst.write(data_to_tif, 1)
[docs]
def reproject_tif(raw_dem_path, wgs_dem_path, utm_dem_path):
"""
Reproject raster from WGS to UTM projection.
"""
with rio.open(raw_dem_path) as src:
dst_crs = rio.crs.CRS.from_epsg(4326)
transform, width, height = calculate_default_transform(
src.crs, dst_crs, src.width, src.height, *src.bounds
)
kwargs = src.meta.copy()
kwargs.update({
'crs': dst_crs,
'transform': transform,
'width': width,
'height': height
})
with rio.open(wgs_dem_path, 'w', **kwargs) as dst:
for band in range(1, src.count + 1):
reproject(
source=rio.band(src, band),
destination=rio.band(dst, band),
src_transform=src.transform,
src_crs=src.crs,
dst_transform=transform,
dst_crs=dst_crs,
resampling=rio.enums.Resampling.bilinear
)
with rio.open(wgs_dem_path) as wgs_dem:
wgs_dem_data = wgs_dem.read(1)
geodata = wgs_dem.transform.to_gdal()
x_pixel = wgs_dem_data.shape[1] # columns
y_pixel = wgs_dem_data.shape[0] # rows
resolution_x = geodata[1] # pixelWidth: positive
resolution_y = geodata[5] # pixelHeight: negative
resolution = resolution_x
xmin = geodata[0] # originX
ymax = geodata[3] # originY
xmax = xmin + x_pixel * resolution_x
ymin = ymax + y_pixel * resolution_y
centroid = [xmin+((xmax-xmin)/2),ymin+((ymax-ymin)/2)]
lon = centroid[0]
lat = centroid[1]
utm_crs_list = query_utm_crs_info(
datum_name="WGS 84",
area_of_interest=AreaOfInterest(
west_lon_degree=lon,
south_lat_degree=lat,
east_lon_degree=lon,
north_lat_degree=lat,
),
)
utm_crs = CRS.from_epsg(utm_crs_list[0].code).srs
dst_crs = rio.crs.CRS.from_string(utm_crs)
transform, width, height = calculate_default_transform(
wgs_dem.crs, dst_crs, wgs_dem.width, wgs_dem.height, *wgs_dem.bounds
)
kwargs = wgs_dem.meta.copy()
kwargs.update({
'crs': dst_crs,
'transform': transform,
'width': width,
'height': height
})
with rio.open(utm_dem_path, 'w', **kwargs) as dst:
for band in range(1, wgs_dem.count + 1):
reproject(
source=rio.band(wgs_dem, band),
destination=rio.band(dst, band),
src_transform=wgs_dem.transform,
src_crs=wgs_dem.crs,
dst_transform=transform,
dst_crs=dst_crs,
resampling=rio.enums.Resampling.bilinear
)
return utm_crs
[docs]
def reproject_coord(x_wgs, y_wgs):
"""
Reproject coordinate points WGS to UTM.
"""
# x_wgs=-2
# y_wgs=48
lon = x_wgs
lat = y_wgs
utm_crs_list = query_utm_crs_info(datum_name="WGS 84",area_of_interest=AreaOfInterest(
west_lon_degree=lon,
south_lat_degree=lat,
east_lon_degree=lon,
north_lat_degree=lat,),)
utm_crs = CRS.from_epsg(utm_crs_list[0].code).srs
transformer = Transformer.from_crs("epsg:4326", utm_crs)
x_utm, y_utm = transformer.transform(lat, lon)
return utm_crs, x_utm, y_utm
[docs]
def reproject_shp(raw_shp_path, out_shp_path, utm_crs):
"""
Reproject shapefile with defined UTM crs.
For example: 'EPSG:2154'
"""
crs_code = utm_crs[5:]
shp = gpd.read_file(raw_shp_path)
shp.set_crs(epsg=crs_code, inplace=True, allow_override=True)
# shp.to_crs(utm_crs)
shp.to_file(out_shp_path)
def select_period(df, first, last):
"""
Clip a timeseries from two boundary years.
Parameters
----------
df : DataFrame or Series
DataFrame or Series with datetime index.
first : int
Starting year.
last : int
Ending year.
Returns
-------
df : DataFrame or Series
Clipped variable.
"""
df = df[(df.index.year>=first) & (df.index.year<=last)]
return df
#%% PYHELP HELPER FUNCTIONS (migrated from pyhelp/helper.py)
def load_csv(file_path: str) -> pd.DataFrame:
"""
Load a CSV file into a dataframe.
"""
try:
return pd.read_csv(file_path)
except Exception as e:
logger.exception("Failed to load CSV file %s", file_path)
return pd.DataFrame()
def load_shapefile(shapefile_path: str) -> gpd.GeoDataFrame:
"""
Load a shapefile into a GeoDataFrame.
Return a geodataframe containing the shapefile geometry
"""
try:
return gpd.read_file(shapefile_path)
except Exception as e:
logger.exception("Failed to load shapefile %s", shapefile_path)
return None
def get_centroid_coordinates(gdf: gpd.GeoDataFrame) -> tuple:
"""
Calculate the centroid of the geometry contained in the given geodataframe file.
Return --> (float, float)
tuple (longitude, latitude) of the centroid
"""
if gdf is None:
logger.error("GeoDataFrame input is None")
return None, None
if gdf.empty:
logger.error("GeoDataFrame contains no features")
return None, None
if gdf.crs is None:
logger.error("GeoDataFrame has no CRS defined")
return None, None
try:
gdf = gdf.to_crs("EPSG:2056")
gdf["geometry"] = gdf.geometry.centroid
gdf = gdf.to_crs("EPSG:4326")
point = gdf.geometry.iloc[0]
return point.x, point.y
except Exception as e:
logger.exception("Failed computing centroid coordinates")
return None, None
def transform_coordinates(dem_file_path: str, from_crs: str, to_crs: str) -> list:
"""
Read a DEM file, iterate through its pixels and
convert the coordinates from a crs to another
return --> list of (float, float)
list of tuples (longitude, latitude)
"""
try:
dem_dataset = rio.open(dem_file_path)
transform = dem_dataset.transform
width, height = dem_dataset.width, dem_dataset.height
transformer = Transformer.from_crs(from_crs, to_crs, always_xy=True)
coordinates = []
for row in range(height):
for col in range(width):
x, y = transform * (col, row)
lon, lat = transformer.transform(x, y)
coordinates.append((lon, lat))
return coordinates
except Exception as e:
logger.exception("Failed processing DEM raster %s", dem_file_path)
return []
def filter_coordinates_by_shape(coordinates: list, shapefile_path: str, target_crs: str) -> list:
"""
Filter the DEM coordinates according to the watershed shapefile polygon.
return --> list of (float, float)
"""
try:
gdf = load_shapefile(shapefile_path)
if gdf is None:
return []
polygon = gdf.to_crs(target_crs).unary_union
filtered = [pt for pt in coordinates if polygon.covers(Point(pt))]
return filtered
except Exception as e:
logger.exception("Failed filtering coordinates with shapefile %s", shapefile_path)
return []
def select_nearest_point(ds: xr.Dataset, lon: float, lat: float) -> xr.Dataset:
"""
select the nearest point in a xr.dataset from the given longitude and latitude.
return --> a cropped dataset corresponding to the nearest point
"""
if lon is not None and lat is not None:
return ds.sel(longitude=lon, latitude=lat, method="nearest")
return None
def select_within_polygon_points(ds: xr.Dataset, gdf: gpd.GeoDataFrame) -> xr.Dataset:
"""
select and filter the points in a xr.dataset which coordinates
are within the perimeter of the given geodataframe.
return --> a cropped dataset corresponding to the filtered points
"""
try:
polygon = gdf.unary_union
lons = ds.longitude.values
lats = ds.latitude.values
LON, LAT = np.meshgrid(lons, lats)
mask = np.zeros(LON.shape, dtype=bool)
for i in range(LON.shape[0]):
for j in range(LON.shape[1]):
pt = Point(LON[i, j], LAT[i, j])
mask[i, j] = polygon.contains(pt)
ds_filtered = ds.where(mask, drop=True)
return ds_filtered
except Exception as e:
logger.exception("Failed selecting dataset points within polygon")
return ds
def convert_units(df: pd.DataFrame, var_key: str) -> pd.DataFrame:
"""
Convert precipitation to mm
temperature to Fahrenheit
and change the unit of the radiation
"""
if var_key == "precipitation":
df = df * 1000.0
pass
elif var_key == "temperature":
df = df - 273.15
elif var_key == "radiation":
df = df * 1e-6
return df
#%% DISPLAY
_banner_printed = False
def print_hydromodpy():
global _banner_printed
if _banner_printed:
return
banner_lines = [
r' __ __ __ __ ____ ________ ',
r' / / / / / / / \/ / / / __ / ',
r' / /_/ /_ ______/ /________ / /___ ____/ / /_/ /_ __ ',
r' / __ / / / / __ / ___/ __ \/ /\,-/ / __ \/ __ / ____/ / / / ',
r' / / / / /_/ / /_/ / / / /_/ / / / / /_/ / /_/ / / / /_/ / ',
r' /_/ /_/\__, /_____/_/ \____/_/ /_/\____/_____/_/____\__, / ',
r' /____/ Hydrological Modelling in Python /_____________/ ',
r' ',
]
for line in banner_lines:
logger.info(line)
_banner_printed = True
#%% NOTES