Source code for hydromodpy.modeling.timeseries

# -*- coding: utf-8 -*-
"""
 * Copyright (C) 2023-2025 Alexandre Gauvain, Ronan Abhervé, Jean-Raynald de Dreuzy
 *
 * This program and the accompanying materials are made available under the
 * terms of the Eclipse Public License 2.0 which is available at
 * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
 * which is available at https://www.apache.org/licenses/LICENSE-2.0.
 *
 * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
"""

#%% LIBRAIRIES

# Python
import numpy as np
import os
import pandas as pd
import sys
import rasterio
from os.path import dirname, abspath
import geopandas as gpd
import warnings

# Root
df = dirname(dirname(abspath(__file__)))
sys.path.append(df)

# HydroModPy
from hydromodpy.tools import toolbox, get_logger

#%% CLASS

logger = get_logger(__name__)
# Silence pandas masked-to-nan spam when handling masked arrays
warnings.filterwarnings("ignore", message=".*converting a masked element to nan.*")


[docs] class Timeseries: """ Extract timeseries results from rasters and shapefiles created. """ def __init__(self, geographic: object, model_modflow: object, model_modpath: int=None, model_mt3dms: int=None, suffix_name: int=None, datetime_format: bool=True, subbasin_results: bool=True, intermittency_yearly:bool=False, intermittency_monthly: bool=False, intermittency_weekly: bool=False, intermittency_daily: bool=False, residence_times: bool=False, concentration_seepage: bool=False, mass_accumulated: bool=False): """ Parameters ---------- geographic : object Variable object of the model domain (watershed). model_modflow : object MODFLOW model object. model_modpath : object MODPATH model object. datetime_format : bool, optional Indicate if the model is referenced with datetime format. The default is True. subbasin_results : bool, optional Indicated if simulation results need to be created at subassins scale. The default is True. intermittency_monthly : bool If True, the intermittent and perennial part of hydrographic network is calculated on a monthly basis. intermittency_weekly : bool If True, the intermittent and perennial part of hydrographic network is calculated on a weekly basis. intermittency_daily : bool If True, the intermittent and perennial part of hydrographic network is calculated on a daily basis. """ # Init parameters self.suffix_name = suffix_name self.geographic = geographic self.stable_folder = geographic.stable_folder self.simulations = geographic.simulations_folder self.model_name = model_modflow.model_name self.model_folder = model_modflow.model_folder self.full_path = os.path.join(self.model_folder, self.model_name) self.tifs_file = os.path.join(self.full_path, '_postprocess', '_rasters') self.save_file = os.path.join(self.full_path, '_postprocess') if not os.path.exists(self.save_file): toolbox.create_folder(self.save_file) self.timeseries_file = os.path.join(self.save_file, '_timeseries') if not os.path.exists(self.timeseries_file): toolbox.create_folder(self.timeseries_file) self.recharge = model_modflow.recharge self.runoff = model_modflow.runoff self.intermittency_yearly = intermittency_yearly self.intermittency_monthly = intermittency_monthly self.intermittency_weekly = intermittency_weekly self.intermittency_daily = intermittency_daily self.residence_times = residence_times self.concentration_seepage = concentration_seepage self.mass_acumulated = mass_accumulated self.datetime_format = datetime_format ### Recharge management to initiate the .csv file results if isinstance(self.recharge,(int,float)) == True: time=[0] recharge = self.recharge if isinstance(self.recharge,(pd.Series)) == True: time = self.recharge.index recharge = self.recharge.values if isinstance(self.recharge,(dict)) == True: time = range(len(self.recharge)) recharge = pd.Series(np.array(list(({k:np.nanmean(v) for k,v in self.recharge.items()}).values())), index=range(len(self.recharge))) ### Runoff management to fill the .csv file results if self.runoff is not None and (not isinstance(self.runoff, pd.DataFrame) or not self.runoff.empty): if isinstance(self.runoff, (int, float)): time = [0] runoff = self.runoff elif isinstance(self.runoff, pd.Series): time = self.runoff.index runoff = self.runoff.values elif isinstance(self.runoff, dict): time = range(len(self.runoff)) runoff = pd.Series( np.array([np.nanmean(v) for v in self.runoff.values()]), index=range(len(self.runoff)) ) else: runoff = recharge * np.nan # npy_list = [] # for f in os.listdir(self.save_file): # name, ext = os.path.splitext(f) # if ext == '.npy': # npy_list.append(name) ### Open .npy files if they exist try: self.watertable_elevation = np.load(os.path.join(self.save_file, 'watertable_elevation'+'.npy'), allow_pickle=True).item() except: pass try: self.watertable_depth = np.load(os.path.join(self.save_file, 'watertable_depth'+'.npy'), allow_pickle=True).item() except: pass try: self.seepage_areas = np.load(os.path.join(self.save_file, 'seepage_areas'+'.npy'), allow_pickle=True).item() except: pass try: self.outflow_drain = np.load(os.path.join(self.save_file, 'outflow_drain'+'.npy'), allow_pickle=True).item() except: pass try: self.groundwater_flux = np.load(os.path.join(self.save_file, 'groundwater_flux'+'.npy'), allow_pickle=True).item() except: pass try: self.groundwater_storage = np.load(os.path.join(self.save_file, 'groundwater_storage'+'.npy'), allow_pickle=True).item() except: pass try: self.accumulation_flux = np.load(os.path.join(self.save_file, 'accumulation_flux'+'.npy'), allow_pickle=True).item() except: pass if model_modpath != None: if model_modpath.track_dir == 'forward': type_dir = 'ending' else: type_dir = 'starting' try: self.shp_particles = gpd.read_file(os.path.join(self.save_file, '_particles', type_dir+'_weighted'+'.shp')) except: self.shp_particles = gpd.read_file(os.path.join(self.save_file, '_particles', type_dir+'.shp')) pass if model_mt3dms != None: try: self.concentration_seepage = np.load(os.path.join(self.save_file, 'concentration_seepage'+'.npy'), allow_pickle=True).item() except: pass try: self.mass_accumulated = np.load(os.path.join(self.save_file, 'mass_accumulated'+'.npy'), allow_pickle=True).item() except: pass ### For total catchment with rasterio.open(self.geographic.watershed_dem) as src: dem_clip = src.read(1) self.cell = np.ma.masked_array(dem_clip, mask=(dem_clip<0)).count() self.resolution = model_modflow.resolution self.extract_results(dem_clip, time, recharge, runoff, self.timeseries_file) logger.info("Exported catchment time series to %s", self.timeseries_file) ### For sub-catchments if subbasin_results == True: try: self.zones_folder = os.path.join(self.stable_folder, 'subbasin') self.zones_list = os.listdir(self.zones_folder) for zi, zone_name in enumerate(self.zones_list): sub_file = os.path.join(self.full_path, '_subbasins', zone_name) if not os.path.exists(sub_file): toolbox.create_folder(sub_file) try: with rasterio.open(os.path.join(self.zones_folder, zone_name, 'watershed_dem.tif')) as src: dem_clip = src.read(1) self.cell = np.ma.masked_array(dem_clip, mask=(dem_clip<0)).count() self.extract_results(dem_clip, time, recharge, runoff, sub_file) logger.info("Exported time series for subbasin %s to %s", zi + 1, sub_file) except: pass except: pass #%% EXTRACT DATA AT THE CATCHMENT SCLAE IN CSV
[docs] def extract_results(self, dem_clip, time, recharge, runoff, timeseries_file): """ Calculate catchment-scale values and save them in a data frame (.csv).. Parameters ---------- dem_clip : 2D matrix Masked raster data of the model domain (watershed). time : DatetimeIndex or list Index for time. recharge : Series or list Values of recharge input. timeseries_file : str Path folder to save .csv file results. """ def calc_max(key, data_process, target_data, mask_data, cond_symb, value_masked): masked = toolbox.mask_by_dem(target_data[key], mask_data, cond_symb, value_masked) calc = np.nanmax(masked) return calc def calc_mean(key, data_process, target_data, mask_data, cond_symb, value_masked): masked = toolbox.mask_by_dem(target_data[key], mask_data, cond_symb, value_masked) masked[masked<0] = 0 ### ATTENTION masked[masked<-1] = np.nan ### ATTENTION calc = np.nanmean(masked) return calc def calc_sum(key, data_process, target_data, mask_data, cond_symb, value_masked, resolution): masked = toolbox.mask_by_dem(target_data[key], mask_data, cond_symb, value_masked) calc = (np.nansum(masked)) return calc def calc_qspe(key, data_process, target_data, mask_data, cond_symb, value_masked, resolution): masked = toolbox.mask_by_dem(target_data[key], mask_data, cond_symb, value_masked) cell = masked.count() calc = (np.nansum(masked) / (cell * resolution**2)) return calc def calc_percent(key, data_process, target_data, mask_data, cond_symb, value_masked): masked = toolbox.mask_by_dem(target_data[key], mask_data, cond_symb, value_masked) cell = masked.count() count = (masked > 0).sum() calc = (count/cell) * 100 return calc ### Create the initial dataframe file self.mfdata = pd.DataFrame({"date": time, "recharge": recharge}, index=range(len(time))) try: self.mfdata['runoff'] = runoff except: pass ### watertable_elevation try: for key in self.watertable_elevation: calc = calc_mean(key, 'watertable_elevation', self.watertable_elevation, dem_clip, '==', self.geographic.nodata) self.mfdata.loc[key,'watertable_elevation'] = calc except: pass ### watertable_depth try: for key in self.watertable_depth: calc = calc_mean(key, 'watertable_depth', self.watertable_depth, dem_clip, '==', self.geographic.nodata) self.mfdata.loc[key,'watertable_depth'] = calc except: pass ### seepage_areas try: for key in self.seepage_areas: calc = calc_percent(key, 'seepage_areas', self.seepage_areas, dem_clip, '==', self.geographic.nodata) self.mfdata.loc[key,'seepage_areas'] = calc except: pass ### outflow_drain try: for key in self.outflow_drain: calc = calc_qspe(key, 'outflow_drain', self.outflow_drain, dem_clip, '==', self.geographic.nodata, self.resolution) self.mfdata.loc[key,'outflow_drain'] = calc except: pass ### groundwater_flux try: for key in self.groundwater_flux: calc = calc_mean(key, 'groundwater_flux', self.groundwater_flux, dem_clip, '==', self.geographic.nodata) self.mfdata.loc[key,'groundwater_flux'] = calc except: pass ### groundwater_storage try: for key in self.groundwater_storage: calc = calc_sum(key, 'groundwater_storage', self.groundwater_storage, dem_clip, '==', self.geographic.nodata, self.resolution) self.mfdata.loc[key,'groundwater_storage'] = calc except: pass ### accumulation_flux try: for key in self.accumulation_flux: calc = calc_max(key, 'accumulation_flux', self.accumulation_flux, dem_clip, '==', self.geographic.nodata) self.mfdata.loc[key,'accumulation_flux'] = calc except: pass ### concentration_seepage try: for key in self.concentration_seepage: calc = calc_mean(key, 'concentration_seepage', self.concentration_seepage, dem_clip, '==', self.geographic.nodata) self.mfdata.loc[key,'concentration_seepage'] = calc except: pass ### mass_accumualted try: for key in self.mass_accumulated: calc = calc_max(key, 'mass_accumulated', self.mass_accumulated, dem_clip, '==', self.geographic.nodata) self.mfdata.loc[key,'mass_accumulated'] = calc except: pass ### intermittency_saturation if self.intermittency_yearly == True: try: if len(self.accumulation_flux)>=1: inf = 0 sup = 1 step = int(round(len(self.accumulation_flux)/1)) compt=0 for i in range(step): logger.debug('Computing yearly intermittency: %d / %d', i, step) interv = list(self.accumulation_flux.items())[inf:sup] for key in range(len(interv)): mask = dem_clip.copy() interv[key] = np.ma.masked_array(interv[key][1], mask=(mask<0)) zero = self.accumulation_flux[0] * 0 for j in range(len(interv)): tempo = interv[j].copy() tempo[tempo>0] = 1 zero = zero + tempo days_flux = zero.copy() days_flux = np.ma.masked_array(days_flux, mask=(mask<0)) days_flux = np.ma.masked_array(days_flux, mask=(days_flux<=0)) for k in range(len(interv)): tempo = np.ma.masked_where(interv[k]<=0, interv[k]) tempo[days_flux<1] = 0 tempo[days_flux==1] = 1 tempo = np.ma.masked_where(interv[k]<=0, tempo) surflow = (((tempo >= 0).sum()) / self.cell) * 100 perenn = (((tempo == 1).sum()) / self.cell) * 100 intermit = (((tempo == 0).sum()) / self.cell) * 100 self.mfdata.loc[compt,'total_areas'] = surflow self.mfdata.loc[compt,'perenn_areas'] = perenn self.mfdata.loc[compt,'intermit_areas'] = intermit compt+=1 inf+=1 sup+=1 except: pass ### intermittency_saturation if self.intermittency_monthly == True: try: if len(self.accumulation_flux)>=12: inf = 0 sup = 12 step = int(round(len(self.accumulation_flux)/12)) compt=0 for i in range(step): logger.debug('Computing monthly intermittency: %d / %d', i, step) interv = list(self.accumulation_flux.items())[inf:sup] for key in range(len(interv)): mask = dem_clip.copy() interv[key] = np.ma.masked_array(interv[key][1], mask=(mask<0)) zero = self.accumulation_flux[0] * 0 for j in range(len(interv)): tempo = interv[j].copy() tempo[tempo>0] = 1 zero = zero + tempo days_flux = zero.copy() days_flux = np.ma.masked_array(days_flux, mask=(mask<0)) days_flux = np.ma.masked_array(days_flux, mask=(days_flux<=0)) for k in range(len(interv)): tempo = np.ma.masked_where(interv[k]<=0, interv[k]) tempo[days_flux<12] = 0 tempo[days_flux==12] = 1 tempo = np.ma.masked_where(interv[k]<=0, tempo) surflow = (((tempo >= 0).sum()) / self.cell) * 100 perenn = (((tempo == 1).sum()) / self.cell) * 100 intermit = (((tempo == 0).sum()) / self.cell) * 100 self.mfdata.loc[compt,'total_areas'] = surflow self.mfdata.loc[compt,'perenn_areas'] = perenn self.mfdata.loc[compt,'intermit_areas'] = intermit compt+=1 inf+=12 sup+=12 except: pass if self.intermittency_weekly == True: try: if len(self.accumulation_flux)>=52: inf = 0 sup = 52 step = int(round(len(self.accumulation_flux)/52)) compt=0 for i in range(step): logger.debug('Computing weekly intermittency: %d / %d', i, step) interv = list(self.accumulation_flux.items())[inf:sup] for key in range(len(interv)): mask = dem_clip.copy() interv[key] = np.ma.masked_array(interv[key][1], mask=(mask<0)) zero = self.accumulation_flux[0] * 0 for j in range(len(interv)): tempo = interv[j].copy() tempo[tempo>0] = 1 zero = zero + tempo days_flux = zero.copy() days_flux = np.ma.masked_array(days_flux, mask=(mask<0)) days_flux = np.ma.masked_array(days_flux, mask=(days_flux<=0)) for k in range(len(interv)): tempo = np.ma.masked_where(interv[k]<=0, interv[k]) tempo[days_flux<52] = 0 tempo[days_flux==52] = 1 tempo = np.ma.masked_where(interv[k]<=0, tempo) surflow = (((tempo >= 0).sum()) / self.cell) * 100 perenn = (((tempo == 1).sum()) / self.cell) * 100 intermit = (((tempo == 0).sum()) / self.cell) * 100 self.mfdata.loc[compt,'total_areas'] = surflow self.mfdata.loc[compt,'perenn_areas'] = perenn self.mfdata.loc[compt,'intermit_areas'] = intermit compt+=1 inf+=52 sup+=52 except: pass if self.intermittency_daily == True: try: if len(self.accumulation_flux)>=365: inf = 0 sup = 365 step = int(round(len(self.accumulation_flux)/365)) compt=0 for i in range(step): logger.debug('Computing daily intermittency: %d / %d', i, step) interv = list(self.accumulation_flux.items())[inf:sup] for key in range(len(interv)): mask = dem_clip.copy() interv[key] = np.ma.masked_array(interv[key][1], mask=(mask<0)) zero = self.accumulation_flux[0] * 0 for j in range(len(interv)): tempo = interv[j].copy() tempo[tempo>0] = 1 zero = zero + tempo days_flux = zero.copy() days_flux = np.ma.masked_array(days_flux, mask=(mask<0)) days_flux = np.ma.masked_array(days_flux, mask=(days_flux<=0)) for k in range(len(interv)): tempo = np.ma.masked_where(interv[k]<=0, interv[k]) tempo[days_flux<365] = 0 tempo[days_flux==365] = 1 tempo = np.ma.masked_where(interv[k]<=0, tempo) surflow = (((tempo >= 0).sum()) / self.cell) * 100 perenn = (((tempo == 1).sum()) / self.cell) * 100 intermit = (((tempo == 0).sum()) / self.cell) * 100 self.mfdata.loc[compt,'total_areas'] = surflow self.mfdata.loc[compt,'perenn_areas'] = perenn self.mfdata.loc[compt,'intermit_areas'] = intermit compt+=1 inf+=365 sup+=365 except: pass ### residence_times if self.residence_times == True: try: for key in [0]: try: shp_frame = gpd.read_file(self.geographic.watershed_shp) self.shp_particles = self.shp_particles.clip(shp_frame) except: pass try: calc = np.nanmean(self.shp_particles['time_win']) except: calc = np.nanmean(self.shp_particles['time']) pass self.mfdata.loc[key,'residence_times'] = calc except: pass ### save files if self.datetime_format==True: try: self.mfdata['date'] = pd.to_datetime(time, format='%Y-%m-%d') except: self.mfdata['date'] = np.arange(0,len(self.mfdata),1) pass self.mfdata = self.mfdata.set_index(['date']) if self.suffix_name == None: self.mfdata.to_csv(timeseries_file + '/_simulated_timeseries.csv', sep=';') else: self.mfdata.to_csv(timeseries_file + '/_simulated_timeseries'+'_'+self.suffix_name+'.csv', sep=';') if timeseries_file == self.timeseries_file: return self.mfdata
#%% NOTES