Source code for hydromodpy.watershed_root

# -*- coding: utf-8 -*-
"""
 * Copyright (C) 2023-2025 Alexandre Gauvain, Ronan Abhervé, Jean-Raynald de Dreuzy
 *
 * This program and the accompanying materials are made available under the
 * terms of the Eclipse Public License 2.0 which is available at
 * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
 * which is available at https://www.apache.org/licenses/LICENSE-2.0.
 *
 * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
"""

#%% LIBRAIRIES

# Python
import sys
import os
import pickle
import pandas as pd
import geopandas as gpd
import rasterio
import whitebox
wbt = whitebox.WhiteboxTools()
wbt.verbose = False

# Root
from os.path import dirname, abspath
root_dir = (dirname(abspath(__file__)))
sys.path.append(root_dir)

# HydroModPy
from hydromodpy.watershed import climatic, driasclimat, driaseau, geographic, geology, hydraulic, hydrography, hydrometry, intermittency, oceanic, piezometry, settings, safransurfex, subbasin, transport
from hydromodpy.modeling import modflow, modpath, mt3dms, timeseries, netcdf
from hydromodpy.tools import toolbox, get_logger
fontprop = toolbox.plot_params(8,15,18,20) # small, medium, interm, large

logger = get_logger(__name__)


#%% CLASS

[docs] class Watershed: """ Class Watershed is used to extract watershed and its data from regional DEM. Hub to all elements necessary or optional to construct watersheds (meaning catchements) and run modflow simulations. """ def __init__(self, dem_path: str, out_path: str, load: bool=False, watershed_name: str='Default', from_lib: str=None, # os.path.join(root_dir,'watershed_library.csv') from_dem: list=None, # [path, cell size] from_shp: list=None, # [path, buffer size] from_xyv: list=None, # [x, y, snap distance, buffer size] reg_fold: str=None, bottom_path: str=None, # path save_object: bool=True): """ Parameters ---------- dem_path : str Path of the initial Digital Elevation Model (DEM). out_path : str Path of the HydroModPy outputs to store results. load : bool, optional Load the existing watershed object. The default is False. watershed_name : str, optional Name of the watershed (name of folder results). The default is 'Default'. from_lib : str, optional Path of the library (.csv) with list of watershed to generate. The default is None. from_dem : list, optional List with two parameters: [path, cell_size] path: Path of the DEM cell_size: Resolution of the DEM. To change the initial resolution. The default is empty list. from_shp : list, optional List of tow parameters: [path, buffer_size] path: Path of the polygon shapefile. buffer_size: Buffer distance (value in percent) The default is empty list. from_xyv : list, optional List of four parameters: [x, y, snap_distance, buffer_size] x: X coordinate [m] of the watershed outlet y: Y coordinate [m] of the watershed outlet snap_dist: Maximum distance where the outlet can be moved. buffer_size: Buffer added to the generated watershed polygon (value in percent) The default is empty list. reg_fold : str, None Path of the folder with regional data/results. If informed, the regional results will not be created, just loaded from folder. The default is None. bottom_path : str, optional Path of a raster representing the bottom elevation. Need to be the same shape of the model domain area (watershed DEM). The default is None. save_object : bool, optional True : To save the watershed object (using pickle). The default is True. """ toolbox.print_hydromodpy() self.dem_path = dem_path self.out_path = out_path self.load = load self.watershed_name = watershed_name self.from_lib = from_lib self.from_dem = from_dem self.from_shp = from_shp self.from_xyv = from_xyv self.reg_fold = reg_fold self.bottom_path = bottom_path self.bin_path = os.path.join(os.path.dirname(root_dir), 'bin') self.watershed_folder = os.path.join(out_path, watershed_name) toolbox.create_folder(self.watershed_folder) # Setup simulation log in watershed folder from hydromodpy.tools import setup_simulation_log setup_simulation_log(self.watershed_folder) self.stable_folder = os.path.join(self.watershed_folder, 'results_stable') toolbox.create_folder(self.stable_folder) self.simulations_folder = os.path.join(self.watershed_folder, 'results_simulations') toolbox.create_folder(self.simulations_folder) self.calibration_folder = os.path.join(self.watershed_folder, 'results_calibration') toolbox.create_folder(self.calibration_folder) self.add_data_folder = os.path.join(self.stable_folder, 'add_data') toolbox.create_folder(self.add_data_folder) self.figure_folder = os.path.join(self.stable_folder, '_figures') toolbox.create_folder(self.figure_folder) self.elt_def = [] success = False if load==True: # Load from previously stored (saved) watershed success = self.__load_object() if success == True: logger.info("Python object was successfully loaded as requested; imported from output directory %s", self.watershed_folder) if success == False: logger.warning("Stored watershed object not available; rebuilding from inputs") # Definition of the watershed self.__init_object() # Creation of the watershed defined at the previous line self.__create_object() # Save object if save_object == True: self.save_object() else: logger.info("Initializing watershed object from scratch as requested") # Definition of the watershed self.__init_object() # Creation of the watershed defined at the previous line self.__create_object() # Save object if save_object == True: self.save_object() #%% PYTHON OBJECT def __load_object(self): """ Private method to load watershed object. Returns ------- success : bool True if the watershed object is load succesfully. """ if os.path.exists(os.path.join(self.watershed_folder, 'watershed_object')): # Load watershed object from pickle file with open(os.path.join(self.watershed_folder, 'watershed_object'), 'rb') as config_dictionary_file: BV = pickle.load(config_dictionary_file) # At least geographic should have been stored if ('geographic' in BV.__dir__()) == True: self.geographic = BV.geographic self.elt_def.append('geographic') else: logger.warning("geographic doesn't exist in object") return False if ('subbasin' in BV.__dir__()) == True: # Generates basin where there are hydrological stations self.subbasin = BV.subbasin self.elt_def.append('subbasin') # Sub-surface if ('hydraulic' in BV.__dir__()) == True: self.hydraulic = BV.hydraulic self.elt_def.append('hydraulic') if ('geology' in BV.__dir__()) == True: self.geology = BV.geology self.elt_def.append('geology') if ('geometric' in BV.__dir__()) == True: self.geometric = BV.geometric self.elt_def.append('geometric') if ('piezometry' in BV.__dir__()) == True: self.piezometry = BV.piezometry self.elt_def.append('piezometry') # Surface if ('hydrography' in BV.__dir__()) == True: self.hydrography = BV.hydrography self.elt_def.append('hydrography') if ('hydrometry' in BV.__dir__()) == True: self.hydrometry = BV.hydrometry self.elt_def.append('hydrometry') if ('intermittency' in BV.__dir__()) == True: self.intermittency = BV.intermittency self.elt_def.append('intermittency') # Atmospheric if ('safransurfex' in BV.__dir__()) == True: self.safransurfex = BV.safransurfex self.elt_def.append('safransurfex') if ('climatic' in BV.__dir__()) == True: self.climatic = BV.climatic self.elt_def.append('climatic') if ('driasclimat' in BV.__dir__()) == True: self.driasclimat = BV.driasclimat self.elt_def.append('driasclimat') if ('driaseau' in BV.__dir__()) == True: self.driaseau = BV.driaseau self.elt_def.append('driaseau') if ('oceanic' in BV.__dir__()) == True: self.oceanic = BV.oceanic self.elt_def.append('oceanic') if ('settings' in BV.__dir__()) == True: self.settings = BV.settings self.elt_def.append('settings') if ('transport' in BV.__dir__()) == True: self.transport = BV.transport self.elt_def.append('transport') return True else: logger.warning("watershed_object doesn't exist in %s", self.watershed_folder) return False def __init_object(self): """ Private method initializing condition to generate watershed. Returns ------- None. """ if self.from_lib != None: watershed_list = pd.read_csv(self.from_lib, delimiter=';') watershed_info = watershed_list.loc[watershed_list['watershed_name'] == self.watershed_name] self.dem_path = self.dem_path self.bottom_path = self.bottom_path self.cell_size = None self.x_outlet = watershed_info.iloc[0]['x_outlet'] self.y_outlet = watershed_info.iloc[0]['y_outlet'] self.snap_dist = watershed_info.iloc[0]['snap_dist'] self.buff_percent = watershed_info.iloc[0]['buff_percent'] self.crs_proj = watershed_info.iloc[0]['crs_proj'] if self.from_dem != None: with rasterio.open(self.from_dem[0]) as dem_src: src_crs = dem_src.crs self.dem_path = self.from_dem[0] self.bottom_path = self.bottom_path self.cell_size = self.from_dem[1] self.x_outlet = None self.y_outlet = None self.snap_dist = None self.buff_percent = None if src_crs: epsg_code = src_crs.to_epsg() self.crs_proj = f"EPSG:{epsg_code}" if epsg_code else src_crs.to_string() else: self.crs_proj = None if self.from_shp != None: shp_file = gpd.read_file(self.from_shp[0]) self.dem_path = self.dem_path self.bottom_path = self.bottom_path self.cell_size = None self.x_outlet = None self.y_outlet = None self.snap_dist = None self.buff_percent = self.from_shp[1] # self.crs_proj = shp_file.crs.srs.upper() self.crs_proj = f"EPSG:{shp_file.crs.to_epsg()}" if self.from_xyv != None: self.dem_path = self.dem_path self.bottom_path = self.bottom_path self.cell_size = None self.x_outlet = self.from_xyv[0] self.y_outlet = self.from_xyv[1] self.snap_dist = self.from_xyv[2] self.buff_percent = self.from_xyv[3] self.crs_proj = self.from_xyv[4] def __create_object(self): """ Private method to create geographic watershed. Returns ------- None. """ # Structure data self.geographic = geographic.Geographic(self.dem_path, self.bottom_path, self.cell_size, self.x_outlet, self.y_outlet, self.snap_dist, self.buff_percent, self.crs_proj, self.watershed_folder, self.stable_folder, self.simulations_folder, self.calibration_folder, self.from_lib, self.from_dem, self.from_shp, self.from_xyv, self.reg_fold) self.elt_def.append('geographic')
[docs] def save_object(self): """ Public method to save watershed object. Returns ------- None. """ # If folder already exists, removes it if os.path.exists(os.path.join(self.watershed_folder,'watershed_object')): os.remove(os.path.join(self.watershed_folder,'watershed_object')) with open(os.path.join(self.watershed_folder,'watershed_object'), 'xb') as config_dictionary_file: pickle.dump(self, config_dictionary_file) config_dictionary_file.close()
[docs] def display_object(self, dtype: str = 'watershed_dem'): """ Public method to display watershed. Parameters ---------- dtype : str, optional Three possibilities: - ``'watershed_dem'`` to display the watershed elevation (default). - ``'watershed_geology'`` to display the watershed geology. - ``'watershed_zones'`` to display the hydraulic zones of the watershed. """ try: from hydromodpy.display import visualization_watershed except Exception as exc: raise ModuleNotFoundError( "Display dependencies are not installed. Install the full stack (contextily, matplotlib, vedo)." ) from exc if dtype == 'watershed_dem': visualization_watershed.watershed_dem(self) if dtype == 'watershed_geology': visualization_watershed.watershed_geology(self) if dtype == 'watershed_zones': visualization_watershed.watershed_zones(self)
#%% ADDING DATA
[docs] def add_climatic(self): """ Public method to add climatic data. Returns ------- None. """ self.climatic = climatic.Climatic(out_path=self.watershed_folder) self.elt_def.append('climatic') self.save_object()
[docs] def add_driasclimat(self, driasclimat_path, list_models='all', list_vars='all'): """ Public method to add drias climat data. Link: https://www.drias-climat.fr/ Returns ------- None. """ self.driasclimat_path = driasclimat_path self.driasclimat = driasclimat.Driasclimat(out_path=self.watershed_folder, driasclimat_path=self.driasclimat_path, watershed_shp=self.geographic.watershed_shp, list_models=list_models, list_vars=list_vars) self.elt_def.append('driasclimat')
[docs] def add_driaseau(self, driaseau_path, list_models='all', list_vars='all'): """ Public method to add drias eau data. Link: https://www.drias-eau.fr/ Returns ------- None. """ self.driaseau_path = driaseau_path self.driaseau = driaseau.Driaseau(out_path=self.watershed_folder, driaseau_path=self.driaseau_path, watershed_shp=self.geographic.watershed_shp, list_models=list_models, list_vars=list_vars) self.elt_def.append('driaseau')
[docs] def add_geology(self, geology_path: str, types_obs: str='GEO1M.shp', fields_obs: str='CODE_LEG'): """ Public method to add geology data. Parameters ---------- geology_path : str Path where the polygon shapefile is located. To date, work for BRGM geological map data: http://infoterre.brgm.fr/page/telechargement-cartes-geologiques types_obs : str, optional Name of the geology shapefile. The default is 'GEO1M.shp'. fields_obs : str, optional Field data of the polygons. The default is 'CODE_LEG'. """ self.geology_path = geology_path self.geology = geology.Geology(out_path=self.watershed_folder, geographic=self.geographic, geo_path = self.geology_path, landsea=None, types_obs=types_obs, fields_obs= fields_obs) self.elt_def.append('geology') self.save_object()
[docs] def add_hydraulic(self): """ Public method to add hydraulic data. Returns ------- None. """ self.hydraulic = hydraulic.Hydraulic(nrow=self.geographic.y_pixel, ncol=self.geographic.x_pixel, box_dem=self.geographic.watershed_box_buff_dem) self.elt_def.append('hydraulic') self.save_object()
[docs] def add_hydrography(self, hydrography_path: str, types_obs: list=['streams'], fields_obs: list=['FID'], streams_file=None): """ Public method to add hydrography data. Parameters ---------- hydrography_path : str Path where the hydrography shapefiles are located. types_obs : list, optional List of shapefile names. The default is ['streams']. fields_obs : list, optional List of field names. The default is ['FID']. """ self.hydrography_path = hydrography_path self.types_obs = types_obs self.fields_obs = fields_obs self.hydrography = hydrography.Hydrography(out_path=self.watershed_folder, types_obs=self.types_obs, fields_obs=self.fields_obs, geographic=self.geographic, hydro_path=self.hydrography_path, streams_file=streams_file) self.elt_def.append('hydrography') self.save_object()
[docs] def add_hydrometry(self, hydrometry_path: str, file_name: str): """ Public method to add watershed hydrometry. Parameters ---------- hydrometry_path : str Path where the hydrometry files are located. file_name : str Name of the file. """ self.hydrometry_path = hydrometry_path self.hydrometry = hydrometry.Hydrometry(out_path=self.watershed_folder, hydrometry_path=self.hydrometry_path, file_name=file_name, geographic=self.geographic) self.elt_def.append('hydrometry') self.save_object()
[docs] def add_intermittency(self, intermittency_path: str, file_name: str): """ Public method to add hydraulic intermittency. Parameters ---------- intermittency_path : str Path where the intermittency files are located. file_name : str Name of the file. """ self.intermittency_path = intermittency_path self.intermittency = intermittency.Intermittency(out_path=self.watershed_folder, intermittency_path=self.intermittency_path, file_name=file_name, geographic=self.geographic) self.elt_def.append('intermittency') self.save_object()
[docs] def add_oceanic(self, oceanic_path: str): """ Public method to add oceanic/sea data. Parameters ---------- oceanic_path : str Path where the oceanic data are located. """ self.oceanic = oceanic.Oceanic() self.oceanic_path = oceanic_path self.oceanic.extract_data(out_path=self.watershed_folder, oceanic_path=self.oceanic_path, geographic=self.geographic) self.elt_def.append('oceanic') self.save_object()
[docs] def add_piezometry(self): """ Public method to add piezometric data. Returns ------- None. """ self.piezometry = piezometry.Piezometry(out_path=self.watershed_folder, geographic=self.geographic) self.elt_def.append('piezometry') self.save_object()
[docs] def add_settings(self): """ Pulic method to add specific model settings. Returns ------- None. """ self.settings = settings.Settings() self.elt_def.append('settings') self.save_object()
[docs] def add_safransurfex(self, safransurfex_path): """ Pulic method to add safran-surfex (historical reanalysis) climate data. Returns ------- None. """ self.safransurfex_path = safransurfex_path self.safransurfex = safransurfex.SafranSurfex(out_path=self.watershed_folder, safransurfex_path=self.safransurfex_path, watershed_shp=self.geographic.box_buff) safransurfex.Merge(out_path=self.watershed_folder) self.elt_def.append('safransurfex') self.save_object()
[docs] def add_subbasin(self, add_path: str = None, sub_snap_dist: int = 200): """ Public method to add subbasins. Parameters ---------- add_path : str, optional Path of the folder where the data are located. Default is None. sub_snap_dist : int Maximum distance where the subasin outlet can be moved. """ if not hasattr(self, 'hydrometry'): self.hydrometry = None if not hasattr(self, 'intermittency'): self.intermittency = None self.subbasin = subbasin.Subbasin( geographic=self.geographic, hydrometry=self.hydrometry, intermittency=self.intermittency, add_path=add_path, out_path=self.watershed_folder, sub_snap_dist=sub_snap_dist ) self.elt_def.append('subbasin') self.save_object()
[docs] def add_transport(self): """ Pulic method to add specific model transport parameters. Returns ------- None. """ self.transport = transport.Transport() self.elt_def.append('transport') self.save_object()
#%% MODFLOW MODEL
[docs] def preprocessing_modflow(self, for_calib: bool=False): """ Public method to build the hydrogeological model. Parameters ---------- for_calib : bool, False If False, the simulation results are store in folder results_simulations. If True, the simulation results are store in folder results_calibration. Returns ------- model_modflow : object Python object of the created MODFLOW model. """ if for_calib == False: model_folder = self.simulations_folder else: model_folder = self.calibration_folder # Type of run: classical simulation or calibration model_modflow = modflow.Modflow(self.geographic, # Workflow settings model_folder=model_folder, # self.simulations_folder model_name=self.settings.model_name, bin_path=self.bin_path, # Model settings box=self.settings.box, sink_fill=self.settings.sink_fill, sim_state=self.settings.sim_state, dis_perlen=self.settings.dis_perlen, # Well settings well_coords=self.settings.well_coords, well_fluxes=self.settings.well_fluxes, # Output settings plot_cross=self.settings.plot_cross, cross_ylim=self.settings.cross_ylim, check_grid=self.settings.check_grid, # Boundary settings sea_level=self.oceanic.MSL, bc_left=self.settings.bc_left, bc_right=self.settings.bc_right, # Climatic settings recharge=self.climatic.recharge, runoff=self.climatic.runoff, first_clim=self.climatic.first_clim, # Hydraulic settings bottom=self.hydraulic.bottom, thick=self.hydraulic.thick, nlay=self.hydraulic.nlay, lay_decay=self.hydraulic.lay_decay, hk_value=self.hydraulic.hk_value, sy_value=self.hydraulic.sy_value, ss_value=self.hydraulic.ss_value, hk_decay=self.hydraulic.hk_decay, sy_decay=self.hydraulic.sy_decay, ss_decay=self.hydraulic.ss_decay, verti_hk=self.hydraulic.verti_hk, verti_sy=self.hydraulic.verti_sy, verti_ss=self.hydraulic.verti_ss, cond_drain=self.hydraulic.cond_drain, vka=self.hydraulic.vka, exdp=self.hydraulic.exdp) # Preprocessing Modflow model_modflow.pre_processing() # verbose return model_modflow
[docs] def processing_modflow(self, model_modflow: object, write_model: bool=True, run_model: bool=False, link_mt3dms: bool=False): """ Public method to run the MODFLOW model. Parameters ---------- model_modflow : object MODFLOW model in a Python object. write_model : bool, True If True, write input files before run simulation. run_model : bool, False Run simulation. The default is False. Returns ------- success_model : bool Boolean to know if the simulation rans succesfully. """ # Processing Modflow success_model = model_modflow.processing(write_model=write_model, run_model=run_model, link_mt3dms=link_mt3dms) return success_model
[docs] def postprocessing_modflow(self, model_modflow: object, watertable_elevation: bool=True, watertable_depth: bool=True, seepage_areas: bool=True, outflow_drain: bool=True, groundwater_flux: bool=True, groundwater_storage: bool=True, accumulation_flux: bool=True, persistency_index: bool=False, intermittency_yearly: bool=False, intermittency_monthly: bool=False, intermittency_weekly: bool=False, intermittency_daily: bool=False, export_all_tif: bool=False): """ Public method to post-process the simulation of the model. Parameters ---------- model_modflow : object MODFLOW model in a Python object. watertable_elevation : bool, optional Build watertable elevation outputs. The default is True. watertable_depth : bool, optional Build watertable_depth outputs. The default is True. seepage_areas : bool, optional Build seepage area outputs. The default is True. outflow_drain : bool, optional Build outflow drain outputs. The default is True. groundwater_flux : bool, optional Build groudwater flux outputs. The default is True. groundwater_storage : bool, optional Build groundwater storage ouputs. The default is True. accumulation_flux : bool, optional Build accumulation flux outputs. The default is True. persistency_index : bool, optional Build persistency index outputs. The default is False. intermittency_monthly : bool, optional Build intermittency monthly. The default is False. intermittency_yearly : bool, optional Build intermittency daily. The default is False. intermittency_daily : bool, optional Build intermittency yearly. The default is False. export_all_tif : bool, optional Build tif files for all time steps. The default is False. """ # Postprocessing Modflow model_modflow.post_processing(model_modflow, watertable_elevation=watertable_elevation, watertable_depth=watertable_depth, seepage_areas=seepage_areas, outflow_drain=outflow_drain, groundwater_flux=groundwater_flux, groundwater_storage=groundwater_storage, accumulation_flux=accumulation_flux, persistency_index=persistency_index, intermittency_yearly=intermittency_yearly, intermittency_monthly=intermittency_monthly, intermittency_weekly=intermittency_weekly, intermittency_daily=intermittency_daily, export_all_tif=export_all_tif)
#%% MODPATH MODEL
[docs] def preprocessing_modpath(self, model_modflow: object, for_calib: bool=False): """ Public method to set the partickle tracking method. Parameters ---------- model_modflow : object MODFLOW model in a Python object. for_calib : bool, False If False, the simulation results are store in folder results_simulations. If True, the simulation results are store in folder results_calibration. Returns ------- model_modpath : object Python object of the created MODPATH model. """ if for_calib == False: model_folder = self.simulations_folder else: model_folder = self.calibration_folder model_modpath = modpath.Modpath(self.geographic, model_modflow, # Frame settings model_folder = model_folder, model_name = model_modflow.model_name, bin_path = self.bin_path, # Specific settings zone_partic = self.settings.zone_partic, cell_div = self.settings.cell_div, zloc_div = self.settings.zloc_div, bore_depth = self.settings.bore_depth, track_dir = self.settings.track_dir, sel_random = self.settings.sel_random, sel_slice = self.settings.sel_slice) # Preprocessing Modflow model_modpath.pre_processing() # verbose return model_modpath
[docs] def processing_modpath(self, model_modpath: object, write_model: bool=True, run_model: bool=False): """ Public method to run the partickle tracking. Parameters ---------- model_modpath : object MODPATH model in a Python object. write_model : bool, True If True, write input files before run simulation. run_model : bool, False Run simulation. The default is False. Returns ------- success_model : bool Boolean to know if the simulation rans succesfully. """ # Processing Modpath success_model = model_modpath.processing(write_model=write_model, run_model=run_model) return success_model
[docs] def postprocessing_modpath(self, model_modpath: object, ending_point: bool=True, starting_point: bool=True, pathlines_shp: bool=True, particles_shp: bool=True, random_id: int=None, norm_flux: bool=False): """ Public method to post-process the simulation of the particle tracking. Parameters ---------- model_modpath : object MODPATH model in a Python object. ending_point : bool, optional Save ending point shapefile of particles. The default is True. starting_point : bool, optional Save starting point shapefile of particles. The default is True. pathlines_shp : bool, optional Save pathlines shapefile (one line per particles). The default is True. particles_shp : bool, optional Save particles shapefile (some lines per particles). The default is True. random_id : int, optional Number of particles to save randomly. The default is None. """ model_modpath.post_processing(model_modpath, ending_point=ending_point, starting_point=starting_point, pathlines_shp=pathlines_shp, particles_shp=particles_shp, random_id=random_id)
[docs] def filtprocessing_modpath(self, model_modpath: object, norm_flux: bool=False, filt_time: bool=True, # delete particles with time at 0, add a column with time divided by 365 (considering recharge in days) filt_seep: bool=True, # only forward, keep only particles finishing in zone1 (seepage), keep only particles finishing in k1 (first layer) filt_inout: bool=True, # delete particles in and out in the same cell (first layer) calc_rtd: bool=True, # compute residence time distribution random_id: int=None # select randomly to keep ): """ Public method to filter output shapefiles of particles. Parameters ---------- model_modpath : object MODPATH model in a Python object. norm_flux : bool, optional Noramlization of time by input fluxes (recharge). The default is False. filt_time : bool, optional Divide the output column "time" by 365 to converte days in years. Delete particles with time at 0. The default is True. filt_seep : bool, optional Only if 'track_dir' is 'forward'. Keep only particles ending in the first layer. The default is True. filt_inout : bool, optional Delete partciles in and out in the same cell. The default is True. calc_rtd : bool, optional Compute and plot the PDF of residence times. The default is True. random_id : int, optional Select randomly the number of prticles to keep. The default is None. """ model_modpath.filt_processing(model_modpath, norm_flux, filt_time, # delete particles with time at 0, add a column with time divided by 365 (considering recharge in days) filt_seep, # only forward, keep only particles finishing in zone1 (seepage), keep only particles finishing in k1 (first layer) filt_inout, # delete particles in and out in the same cell (first layer) calc_rtd, # compute residence time distribution random_id # select randomly to keep )
#%% MT3DMS MODEL
[docs] def preprocessing_mt3dms(self, model_modflow: object, for_calib: bool=False, suffix_name: str='_mt'): """ Public method to set the partickle tracking method. Parameters ---------- model_modflow : object MODFLOW model in a Python object. for_calib : bool, False If False, the simulation results are store in folder results_simulations. If True, the simulation results are store in folder results_calibration. Returns ------- model_mt3dms : object Python object of the created MT3DMS model. """ if for_calib == False: model_folder = self.simulations_folder else: model_folder = self.calibration_folder model_mt3dms = mt3dms.Mt3dms(self.geographic, model_modflow, # Frame settings model_folder = model_folder, model_name = model_modflow.model_name, suffix_name = suffix_name, bin_path = self.bin_path, # Specific settings spc_name = self.transport.spc_name, sconc_init = self.transport.sconc_init, sconc_input = self.transport.sconc_input, disp_long = self.transport.disp_long, disp_transh = self.transport.disp_transh, disp_transv = self.transport.disp_transv, diffu_coeff = self.transport.diffu_coeff, react_order = self.transport.react_order, rate_decay = self.transport.rate_decay, plot_conc = self.transport.plot_conc, ) # Preprocessing Modflow model_mt3dms.pre_processing() # verbose return model_mt3dms
[docs] def processing_mt3dms(self, model_mt3dms: object, write_model: bool=True, run_model: bool=False, verbose: bool=True): """ Public method to run the partickle tracking. Parameters ---------- model_mt3dms : object MT3DMS model in a Python object. write_model : bool, True If True, write input files before run simulation. run_model : bool, False Run simulation. The default is False. Returns ------- success_model : bool Boolean to know if the simulation rans succesfully. """ # Processing Modpath success_model = model_mt3dms.processing(write_model=write_model, run_model=run_model, verbose=verbose) return success_model
[docs] def postprocessing_mt3dms(self, model_mt3dms: object, concentration_seepage:bool=True, mass_seepage:bool=True, mass_accumulated:bool=False, export_all_tif:bool=False): """ Public method to post-process the simulation of the particle tracking. Parameters ---------- model_mt3dms : object MT3DMS model in a Python object. """ model_mt3dms.post_processing(model_mt3dms, concentration_seepage=concentration_seepage, mass_seepage=mass_seepage, mass_accumulated=mass_accumulated, export_all_tif=export_all_tif) return model_mt3dms
#%% EXTRACT TIMESERIES
[docs] def postprocessing_timeseries(self, model_modflow: object, model_modpath: int=None, model_mt3dms: int=None, suffix_name: int=None, datetime_format: bool=True, subbasin_results: bool=True, intermittency_yearly: bool=False, intermittency_monthly: bool=False, intermittency_weekly: bool=False, intermittency_daily: bool=False, residence_times: bool=False, concentration_seepage: bool=False, mass_accumulated: bool=False ): """ Public method to postprocess the watershed timeseries. Parameters ---------- model_modflow : object MODFLOW model in a Python object. model_modpath : object MODPATH model in a Python object. Optional if only flow outputs are required. model_mt3dms : object, optional MT3DMS model used when extracting concentration/mass indicators. datetime_format : bool, optional True if the index is in datetime format (e.g. 1995-10-17 00:00:00). The default is True. subbasin_results : bool, optional Extract and clip results for each subbasins previously generated and stored. The default is True. intermittency_yearly : bool, optional Compute yearly intermittency metrics for the hydrographic network. intermittency_monthly : bool If True, the intermittent and perennial part of hydrographic network is calculated on a monthly basis. intermittency_weekly : bool If True, the intermittent and perennial part of hydrographic network is calculated on a weekly basis. intermittency_daily : bool If True, the intermittent and perennial part of hydrographic network is calculated on a daily basis. residence_times : bool, optional Export residence-time diagnostics if MODPATH results are available. concentration_seepage : bool, optional When True the MT3DMS seepage concentrations are summarised. mass_accumulated : bool, optional Aggregate the accumulated mass time series from MT3DMS outputs. Returns ------- timeseries_results : hydromodpy.modeling.timeseries.Timeseries Python object with results stored. The variable 'mfdata' inside correspond to the .csv file results. """ if model_modflow != None: timeseries_results = timeseries.Timeseries(self.geographic, model_modflow=model_modflow, model_modpath=model_modpath, model_mt3dms=model_mt3dms, suffix_name=suffix_name, datetime_format=datetime_format, subbasin_results=subbasin_results, intermittency_yearly=intermittency_yearly, intermittency_monthly=intermittency_monthly, intermittency_weekly=intermittency_weekly, intermittency_daily=intermittency_daily, residence_times=residence_times, concentration_seepage=concentration_seepage, mass_accumulated=mass_accumulated ) return timeseries_results
#%% EXTRACT NETCDF
[docs] def postprocessing_netcdf(self, model_modflow: object, datetime_format: bool=True): """ Public method to postprocess the watershed netCDF. Parameters ---------- model_modflow : object MODFLOW model in a Python object. datetime_format : bool, optional True if the index is in datetime format (e.g. 1995-10-17 00:00:00). The default is True. Returns ------- netcdf_results : Python object with results stored. """ if model_modflow != None: netcdf_results = netcdf.Netcdf(self.geographic, model_modflow=model_modflow, datetime_format=datetime_format) return netcdf_results
#%% PYHELP
[docs] def preprocessing_pyhelp( self, *, grid_csv, # nom « officiel » grid_base, # alias rétro-compat workdir : str, ready_csvs, # [precip, tair, solrad] grid_patch, # ex. {"dem": dem_path, "CN":75} compress_level: int = 4, ): from hydromodpy.pyhelp import pyhelp_netcdf # 1) compatibilité ancien nom if grid_csv is None: grid_csv = grid_base if grid_csv is None: raise ValueError("Vous devez fournir grid_csv ou grid_base.") # 2) dépaqueter la liste météo try: precip_csv, tair_csv, solrad_csv = ready_csvs except ValueError: raise ValueError( "`ready_csvs` doit contenir [precip_csv, tair_csv, solrad_csv]" ) # 3) appel correctement typé return pyhelp_netcdf.preprocessing_pyhelp_netcdf( workdir = workdir, grid_csv = grid_csv, precip_csv = precip_csv, tair_csv = tair_csv, solrad_csv = solrad_csv, grid_patch = grid_patch, compress_level = compress_level, )
#%% NOTES