Source code for hydromodpy.display.visualization_results

# -*- coding: utf-8 -*-
"""
 * Copyright (C) 2023-2025 Alexandre Gauvain, Ronan Abhervé, Jean-Raynald de Dreuzy
 *
 * This program and the accompanying materials are made available under the
 * terms of the Eclipse Public License 2.0 which is available at
 * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
 * which is available at https://www.apache.org/licenses/LICENSE-2.0.
 *
 * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
"""

#%% LIBRAIRIES

# Python
import numpy as np
from collections.abc import Sequence
from datetime import datetime
import matplotlib.pyplot as plt
from mpl_toolkits.axes_grid1 import make_axes_locatable
from matplotlib_scalebar.scalebar import ScaleBar
from matplotlib.collections import LineCollection
import rasterio
from rasterio.plot import show
import geopandas as gpd
import flopy
import os, sys
import contextily as cx
import matplotlib as mpl
from matplotlib import rcsetup

# HydroModPy
from hydromodpy.tools import toolbox, get_logger

logger = get_logger(__name__)


def _require_vedo():
    """
    Import vedo only when 3D visualizations are requested.

    Raises a clear error when the VTK stack is missing instead of
    crashing on module import.
    """
    try:
        import vedo  # type: ignore
    except ImportError as exc:
        raise ImportError(
            "3D visualization requires the 'vedo' dependency (VTK stack). "
            "Install it with 'pip install vedo' or use the full HydroModPy "
            "install (not the 'light' extra)."
        ) from exc
    except OSError as exc:
        raise ImportError(
            "The VTK runtime used by vedo could not start. Install the "
            "system libraries for VTK/Qt (e.g. libgl1, libx11-6) or run "
            "inside an image that bundles them."
        ) from exc

    return vedo

#%% CLASS

[docs] class Visualization(): """ Class to plot results by default. """ def __init__(self, watershed, modelname): """ Parameters ---------- watershed : object Variable object of the model domain (watershed). Created by geographic. modelname : str Name of the model simulation. """ self.watershed = watershed self.modelname = modelname #%% 2D
[docs] def visual2D(self, object_list: list=['map','grid', 'watertable', 'watertable_depth', 'drain_flow','surface_flow', 'pathlines','residence_times'], color_scale = None, time_step = 0, lines = 100, structure = 'v'): """ Parameters ---------- object_list : list Select the simulation results you wish to plot. color_scale : list, optional Boundary limits for color scale. The default is None. time_step : int, optional Choice the stress period to plot. The default is 0. lines : int, optional Number of randomly selected pathlines to be traced. The default is 100. structure : str, optional Structure of the frame figures 'h':horizontal or 'v': vertical. The default is 'v'. """ logger.info("Plotting 2D map visualizations for model %s", self.modelname) if len(object_list) == len(color_scale): pass elif color_scale is None: color_scale = [(None,None),(None,None), (None,None),(None,None), (None,None),(None,None), (None,None),(None,None)] else: logger.error("object_list and color_scale must have the same length.") sys.exit(1) def trim_axs(axs, N): """Help to manage the axs list in order to have correct lenght/height""" axs = axs.flat for ax in axs[N:]: ax.remove() return axs[:N] modelfolder = os.path.join(self.watershed.simulations_folder, self.modelname) fontprop = toolbox.plot_params(8,15,18,20) path_res = os.path.join(modelfolder,'_postprocess') try: contour = gpd.read_file(self.watershed.geographic.watershed_contour_shp) crs = contour.crs except: pass try: dem = rasterio.open(self.watershed.geographic.watershed_box_buff_dem) except: pass try: streams = gpd.read_file(self.watershed.hydrography.streams) except: pass # open the watertable elevation files try: watertable_file = os.path.join(path_res,'watertable_elevation.npy') watertable_elevation = np.load(watertable_file, allow_pickle=True).item() except: pass # open the watertable depth files try: watertable_depth_file = os.path.join(path_res,'watertable_depth.npy') watertable_depth= np.load(watertable_depth_file, allow_pickle=True).item() except: pass # open the drain flux files try: drain_file = os.path.join(path_res,'outflow_drain.npy') drain_area = np.load(drain_file, allow_pickle=True).item() except: pass # open the surface flux files try: surface_file = os.path.join(path_res,'accumulation_flux.npy') surface_area = np.load(surface_file, allow_pickle=True).item() except: pass N = len(object_list) if structure == 'v': C = int(np.sqrt(N)) R = int(N/C)+1 if structure == 'h': R = int(np.sqrt(N)) C = int(N/R)+1 fig, axs = plt.subplots(nrows=R, ncols=C ,figsize=(5*C,R*(5*dem.height/dem.width)), dpi=300) axs = trim_axs(axs,N) image = [] basemap = [] for i in range (0,len(object_list)): obj = object_list[i] if obj == 'grid': axs[i].set_title('Topographic elevation [m]') image_hidden = axs[i].imshow(np.ma.masked_where(dem.read(1) < -100, dem.read(1)), cmap='terrain', vmin=color_scale[i][0], vmax=color_scale[i][1]) image.append(image_hidden) basemap.append(0) show(np.ma.masked_where(dem.read(1) < -100, dem.read(1)), ax=axs[i], transform=dem.transform, cmap='terrain', alpha=1, zorder=2, aspect="auto", vmin=color_scale[i][0], vmax=color_scale[i][1]) try: streams.plot(ax=axs[i], lw=2, color='b', zorder=4, legend=False, # label='Hydrography' ) except: pass try: contour.plot(ax=axs[i], lw=2, edgecolor='k', facecolor='None', zorder=4, legend=False, # label='Watershed' ) except: pass if obj == 'watertable': axs[i].set_title('Watertable elevation [m]') image_hidden = axs[i].imshow(np.ma.masked_where((watertable_elevation[time_step]< -100), watertable_elevation[time_step]), cmap='Blues_r', vmin=color_scale[i][0], vmax=color_scale[i][1]) image.append(image_hidden) basemap.append(0) show(np.ma.masked_where(watertable_elevation[time_step]< -100, watertable_elevation[time_step]), ax=axs[i], transform=dem.transform, cmap='Blues_r', alpha=1, zorder=2, aspect="auto", vmin=color_scale[i][0], vmax=color_scale[i][1]) try: contour.plot(ax=axs[i], lw=2, edgecolor='k', facecolor='None', zorder=4, legend=False) except: pass if obj == 'watertable_depth': axs[i].set_title('Watertable depth [m]') image_hidden = axs[i].imshow(np.ma.masked_where((watertable_depth[time_step]< -100) | (dem.read(1) < -100), watertable_depth[time_step]), cmap='coolwarm_r', vmin=color_scale[i][0], vmax=color_scale[i][1]) image.append(image_hidden) basemap.append(0) show(np.ma.masked_where((watertable_depth[time_step]< -100) | (dem.read(1) < -100), watertable_depth[time_step]), ax=axs[i], transform=dem.transform, cmap='coolwarm_r', alpha=1, zorder=2, aspect="auto", vmin=color_scale[i][0], vmax=color_scale[i][1]) try: contour.plot(ax=axs[i], lw=2, edgecolor='k', facecolor='None', zorder=4, legend=False) except: pass if obj == 'drain_flow': # axs[i].set_title('Seepage rates, log(Q) [m/d]') axs[i].set_title('Seepage outflow [m$^3$/d]') # axs[i].set_title('Seepage outflow [m3/d]') drain = np.ma.masked_where(self.watershed.geographic.dem_clip<= 0, drain_area[time_step]) # image_hidden = axs[i].imshow(np.ma.masked_where(drain<= 0, np.log10(drain)), # cmap='jet', vmin=color_scale[i][0], vmax=color_scale[i][1]) image_hidden = axs[i].imshow(np.ma.masked_where(drain<= 0, (drain)), cmap='RdYlGn_r', vmin=color_scale[i][0], vmax=color_scale[i][1]) image.append(image_hidden) basemap.append(0) show(np.ma.masked_where(dem.read(1) < -100, dem.read(1)), ax=axs[i], transform=dem.transform, cmap='Greys', alpha=0.3, zorder=2, aspect="auto") # show(np.ma.masked_where(drain<= 0, np.log10(drain)), ax=axs[i], # transform=dem.transform, cmap='jet', alpha=1, zorder=2, aspect="auto", vmin=color_scale[i][0], # vmax=color_scale[i][1]) show(np.ma.masked_where(drain<= 0, (drain)), ax=axs[i], transform=dem.transform, cmap='RdYlGn_r', alpha=1, zorder=2, aspect="auto", vmin=color_scale[i][0], vmax=color_scale[i][1]) try: contour.plot(ax=axs[i], lw=2, edgecolor='k', facecolor='None', zorder=4, legend=False) except: pass if obj == 'surface_flow': # axs[i].set_title('Cumulate seepage rates, log(Q) [m/d]') axs[i].set_title('Accumulated outflow [m$^3$/d]') # axs[i].set_title('Accumulated outflow [m3/d]') surface = np.ma.masked_where(self.watershed.geographic.dem_clip<= 0, surface_area[time_step]) # image_hidden = axs[i].imshow(np.ma.masked_where(surface_area[time_step]<= 0, np.log10(surface)), image_hidden = axs[i].imshow(np.ma.masked_where(surface_area[time_step]<= 0, (surface)), cmap='jet', vmin=color_scale[i][0], vmax=color_scale[i][1]) image.append(image_hidden) basemap.append(0) show(np.ma.masked_where(dem.read(1) < -100, dem.read(1)), ax=axs[i], transform=dem.transform, cmap='Greys', alpha=0.3, zorder=0, aspect="auto") # show(np.ma.masked_where(surface_area[time_step]<= 0, np.log10(surface)), ax=axs[i], # transform=dem.transform, cmap='jet', alpha=1, zorder=2, aspect="auto", vmin=color_scale[i][0], # vmax=color_scale[i][1]) show(np.ma.masked_where(surface_area[time_step]<= 0, (surface)), ax=axs[i], transform=dem.transform, cmap='jet', alpha=1, zorder=2, aspect="auto", vmin=color_scale[i][0], vmax=color_scale[i][1]) try: contour.plot(ax=axs[i], lw=2, edgecolor='k', facecolor='None', zorder=4, legend=False) except: pass if obj == 'pathlines': # show(np.ma.masked_where(dem.read(1) < -100, dem.read(1)), ax=axs[i], # transform=dem.transform, cmap='Greys', alpha=0.3, zorder=0, aspect="auto") # axs[i].set_title('Pathlines, log(t) [d]') axs[i].set_title('Time pathlines [y]') pthobj = flopy.utils.PathlineFile(os.path.join(modelfolder,self.modelname+'.mppth')) pth_data = pthobj.get_alldata() if lines != None: random_indices = np.random.choice(len(pth_data), size=lines) # RANDOM LINES if lines == None: random_indices = np.arange(len(pth_data)) geotx_p = self.watershed.geographic.x_coord geoty_p = self.watershed.geographic.y_coord geot_p = self.watershed.geographic.geodata cols = geotx_p.shape[0] rows = geoty_p.shape[0] ext = [] xarr = [0, cols] yarr = [0, rows] for px in xarr: for py in yarr: x = geotx_p[0] + (px * geot_p[1]) + (py * geot_p[2]) y = geoty_p[0] + (px * geot_p[4]) + (py * geot_p[5]) ext.append([x, y]) max_time = [] min_time = [] for j in random_indices: # max_time.append(np.max(np.log10(pth_data[j].time))) # min_time.append(np.min(np.log10(pth_data[j].time))) max_time.append(np.max((pth_data[j].time))) min_time.append(np.min((pth_data[j].time))) for j in random_indices: x = pth_data[j].x + ext[1][0] y = pth_data[j].y + ext[1][1] points = np.array([x, y]).T.reshape(-1, 1, 2) segments = np.concatenate([points[:-1], points[1:]], axis=1) lc = LineCollection(segments, cmap='plasma_r', alpha=0.8) # lc.set_array(np.log10(pth_data[j].time/365)) # log(t) in days lc.set_array(pth_data[j].time / 365) # t in years lc.set_linewidth(2) if color_scale[i][0] == None: lc.set_clim(1,np.max(max_time)) else: lc.set_clim(color_scale[i][0],color_scale[i][1]) line = axs[i].add_collection(lc) image.append(line) basemap.append(0) try: contour.plot(ax=axs[i], lw=2, edgecolor='k', facecolor='None', zorder=4, legend=False) except: pass if obj == 'residence_times': axs[i].set_title('Residence times [y]') res_time = np.zeros(np.shape(dem)) endobj = flopy.utils.EndpointFile(os.path.join(modelfolder,self.modelname+'.mpend')) e = endobj.get_alldata() for j in range(len(e)): # time_out = pth_data[j].time[0] # explore pathlines # res_time[e[j].i0,e[j].j0] = np.log10(e[j].time) # where infiltrated res_time[e[j].i,e[j].j] = (e[j].time) /365 # where outputed res_time = np.ma.masked_where(res_time <= 0, res_time) image_hidden = axs[i].imshow(np.ma.masked_where(self.watershed.geographic.dem_clip<= 0, res_time), cmap='cool', vmin=color_scale[i][0], vmax=color_scale[i][1]) # show(np.ma.masked_where(dem.read(1) < -100, dem.read(1)), ax=axs[i], # transform=dem.transform, cmap='Greys', alpha=0.3, zorder=0, aspect="auto") image.append(image_hidden) basemap.append(0) show(np.ma.masked_where(self.watershed.geographic.dem_clip<= 0, res_time), ax=axs[i], transform=dem.transform, cmap='cool', alpha=1, zorder=2, aspect="auto", vmin=color_scale[i][0], vmax=color_scale[i][1]) try: contour.plot(ax=axs[i], lw=2, edgecolor='k', facecolor='None', zorder=4, legend=False) except: pass if obj == 'map': axs[i].set_title('Watershed boundary') basemap.append(1) image.append(None) try: contour.plot(ax=axs[i], lw=2, edgecolor='k', facecolor='None', zorder=4, legend=True, label='Watershed') except: pass try: streams.plot(ax=axs[i], lw=2, color='b', zorder=4, legend=True, label='Hydrography') except: pass compt = 0 for ax in axs: ## Rajouter ici if 'conceptal' then do not display watershed boundary # contour.plot(ax=ax, lw=2, color='k', zorder=4,legend=True, label='Watershed') bounds = dem.bounds xlim = ([bounds[0], bounds[2]]) ylim = ([bounds[1], bounds[3]]) ax.set_xlim(xlim) ax.set_ylim(ylim) ax.set_aspect('equal') scalebar = ScaleBar(1,box_alpha=0, scale_loc = 'top', location='lower right', rotation='horizontal-only') ax.add_artist(scalebar) ax.get_xaxis().set_visible(False) ax.get_yaxis().set_visible(False) if image[compt] != None: divider = make_axes_locatable(ax) cax = divider.append_axes(size="4%",position='right', pad=0.05) fig.add_axes(cax) cbar = fig.colorbar(image[compt], cax=cax, orientation="vertical") cbar.ax.get_ymajorticklabels() list(cbar.get_ticks()) cbar.ax.tick_params(labelsize=10) cbar.ax.yaxis.set_ticks_position('right') cbar.ax.tick_params(size=2) if basemap[compt] == 1: try: cx.add_basemap(ax,crs=crs,source='https://{s}.tile.openstreetmap.org/{z}/{x}/{y}.png') except: pass handles, labels = ax.get_legend_handles_labels() if handles: ax.legend(loc='best', framealpha=0.8) compt +=1 name = self.modelname fig.suptitle(name.upper(), fontsize=12, y=1.0) fig.tight_layout() now = datetime.now() #name = now.strftime("%d_%m_%Y_%Hh%M") fig.savefig(os.path.join(modelfolder,'_postprocess','_figures', '2D_' + str(name)+'.png'), dpi=300, bbox_inches='tight', transparent=False) plt.show()
#%% 3D
[docs] def visual3D(self, object_list=['grid', 'watertable'] , view = 'south-west', bg = 'lb', interactive = False, lines=100, z_scale=20, render=1, cscale = 'default', cmin=-1, cmax=1, cloc=(0.65,0.75) , size=(1500,1080)): """ 3Dvisual shows the vtk objects from an interactive windows or a screenshot. Parameters ---------- object_list : list of str, optional list of visualisation. possible options: grid, watertable, watertable_depth, pathlines, flux, acc_flux The default is ['grid', 'watertable']. view : str, optional position of view to see the 3D visual. possible options: north, north-east, east, south-east, south, south-west, west, north-west The default is 'south-west'. interactive : bool, optional activate the interactive window, if True the figure doesn't save. The default is False. lines : int, optional the number of random pathlines displayed cloc : tuple, optional Scalar bar location either as legacy (x, y) anchor or as ((x0, y0), (x1, y1)) bounding box. """ logger.info("Plotting 3D visualization for model %s", self.modelname) vedo = _require_vedo() def _normalize_scalarbar_pos(raw_pos): """Keep backward compatibility with legacy scalarbar coordinates.""" if raw_pos is None or isinstance(raw_pos, str): return raw_pos if not isinstance(raw_pos, Sequence): return raw_pos try: if len(raw_pos) == 4 and not any(isinstance(v, Sequence) and not isinstance(v, str) for v in raw_pos): x0, y0, width, height = map(float, raw_pos) x0 = max(0.0, min(x0, 0.99)) y0 = max(0.0, min(y0, 0.99)) x1 = max(x0 + 0.01, min(x0 + width, 0.999)) y1 = max(y0 + 0.01, min(y0 + height, 0.999)) return ((x0, y0), (x1, y1)) if len(raw_pos) == 2: first, second = raw_pos if isinstance(first, Sequence) and not isinstance(first, str): if isinstance(second, Sequence) and not isinstance(second, str): return ( (float(first[0]), float(first[1])), (float(second[0]), float(second[1])), ) if not isinstance(first, Sequence) and not isinstance(second, Sequence): x0, y0 = float(first), float(second) width, height = 0.1, 0.45 x0 = max(0.0, min(x0, 0.99)) y0 = max(0.0, min(y0, 0.99)) x1 = max(x0 + 0.01, min(x0 + width, 0.999)) y1 = max(y0 + 0.01, min(y0 + height, 0.999)) return ((x0, y0), (x1, y1)) except (TypeError, ValueError, IndexError): return raw_pos return raw_pos scalarbar_pos = _normalize_scalarbar_pos(cloc) vedo.settings.default_backend= 'vtk' #vedo.settings.screeshot_scale = render plt = vedo.Plotter(N=len(object_list), axes=dict(xtitle='m', ytitle='m', ztitle='m', yzGrid=False), size=size) # Load files try: contour = vedo.Mesh(os.path.join(self.watershed.simulations_folder, self.modelname, '_postprocess', '_vtuvtk','watershed_contour.vtk')) contour.scale([1,1,z_scale]) contour.color('k').lw(2) contour.render_lines_as_tubes(value=True) except: pass try: stream = vedo.Mesh(os.path.join(self.watershed.simulations_folder, self.modelname, '_postprocess', '_vtuvtk','streams.vtk')) stream.scale([1,1,z_scale]) stream.color('b').lw(5) stream.render_lines_as_tubes(value=True) except: stream=None pass try: grid = os.path.join(self.watershed.simulations_folder, self.modelname, '_postprocess', '_vtuvtk', 'grid.vtu') grid_mesh = vedo.load(grid) #grid_mesh grid_wireframe = vedo.load(grid).wireframe() #grid_wireframe if bg == 'white': grid_wireframe.color('black') else: grid_wireframe.color('white') grid_wireframe.scale([1,1,z_scale]) grid_wireframe.alpha(0.1) #plt += grid_wireframe.flag() zvals = grid_mesh.points[:, 2] # grid_mesh.add_elevation_scalars(lowPoint=(0,0,min(zvals)),highPoint=(0,0,max(zvals)), vrange=(min(zvals), max(zvals))) grid_mesh.cmap('terrain',zvals, vmin=min(zvals)) # grid_mesh.add_scalarbar(pos=cloc, title='Topographic elevation [m]', # horizontal=False, titleFontSize=20) grid_mesh.add_scalarbar(pos=scalarbar_pos, horizontal=False) grid_mesh.scale([1,1,z_scale]) grid_mesh.alpha(1) #plt += grid_mesh #plt += grid_mesh.isolines(5).lw(1).c('k') except Exception: logger.warning("VTU grid mesh missing for 3D visualization: %s", grid, exc_info=True) #try: watertable = os.path.join(self.watershed.simulations_folder, self.modelname, '_postprocess', '_vtuvtk', 'watertable_0.vtu') watertable_elev = vedo.load(watertable) # 1 Elevation watertable_depth = vedo.load(watertable) # 3 Depth if 'surface_flow' in object_list: surface_flow = vedo.UnstructuredGrid(watertable) # 3 Surface Flow if 'drain_flow' in object_list: drain_flow = vedo.UnstructuredGrid(watertable) # 3 Drain Flow watertable_blue = vedo.load(watertable) # 4 blue zvals = watertable_elev.points[:, 2] watertable_elev.cmap('Blues_r',zvals, vmin=min(zvals)) # watertable_elev.add_scalarbar(pos=cloc, title='Watertable elevation [m]', horizontal=False, titleFontSize=20) watertable_elev.add_scalarbar(pos=scalarbar_pos, horizontal=False) watertable_elev.scale([1,1,z_scale]) #plt += watertable_elev watertable_depth = watertable_depth.map_cells_to_points() watertable_depth.cmap('coolwarm_r',input_array='Drawdown', vmin=0, vmax=10) # watertable_depth.add_scalarbar(pos=cloc, title='Watertable depth [m]', horizontal=False, titleFontSize=20) watertable_depth.add_scalarbar(pos=scalarbar_pos, horizontal=False) watertable_depth.scale([1,1,z_scale]) #plt += watertable_depth watertable_blue.color('b') watertable_blue.alpha(0.2) watertable_blue.scale([1,1,z_scale]) watertable_blue.legend('Watertable') #plt += watertable_blue if 'surface_flow' in object_list: surface_flow = surface_flow.tomesh() nan_loc = ~np.isnan(surface_flow.celldata['Surfaceflow_log']) surface_flow = surface_flow.extract_cells([i for i, x in enumerate(nan_loc) if x]) surface_flow.cmap('jet', 'Surfaceflow_log', on='cells') # surface_flow.add_scalarbar(pos=cloc, title='Flow (log)', horizontal=False, titleFontSize=20) surface_flow.add_scalarbar(pos=scalarbar_pos, horizontal=False) surface_flow.scale([1,1,z_scale]) if 'drain_flow' in object_list: drain_flow = drain_flow.tomesh() nan_loc = ~np.isnan(drain_flow.celldata['Drainflow_log']) drain_flow = drain_flow.extract_cells([i for i, x in enumerate(nan_loc) if x]) # cmin = min(drain_flow.pointdata['Drainflow_log']) # cmax = max(drain_flow.pointdata['Drainflow_log']) try: if cscale == 'custom': mi = 1 ma = 4 drain_flow.cmap('RdYlGn_r', 'Drainflow_log', on='cells', vmin=mi, vmax=ma) else: drain_flow.cmap('RdYlGn_r', 'Drainflow_log', on='cells') # drain_flow.add_scalarbar(pos=cloc, title='Seepage outflow log [m$^3$/d]', horizontal=False, titleFontSize=20) drain_flow.add_scalarbar(pos=scalarbar_pos, horizontal=False) drain_flow.scale([1,1,z_scale]) except Exception: logger.warning("VTK drain_flow mesh missing or failed to process for 3D visualization", exc_info=True) #try: pathlines = os.path.join(self.watershed.simulations_folder, self.modelname, '_postprocess', '_vtuvtk', 'pathlines.vtk') pathlines_mesh = vedo.Mesh(pathlines) #5 #Pathlines if cscale == 'default': cmin = int(min(pathlines_mesh.pointdata['Time_log'])) cmax = int(max(pathlines_mesh.pointdata['Time_log'])) if cscale == 'custom': cmin = cmin cmax = cmax pathlines_mesh.cmap('plasma_r', input_array='Time_log', vmin=cmin, vmax=cmax).lw(5) # pathlines_mesh.add_scalarbar(pos=cloc, title='Residence times log [y]', horizontal=False, titleFontSize=20) pathlines_mesh.add_scalarbar(pos=scalarbar_pos, horizontal=False) pathlines_mesh.scale([1,1,z_scale]) pathlines_mesh.render_lines_as_tubes(value=True) pathlines_mesh.legend('Pathlines') n = lines # try: # x = pathlines_mesh.lines # length = max(map(len, x)) # y=np.array([xi+[None]*(length-len(xi)) for xi in x]) # number_of_rows = y.shape[0] # random_indices = np.random.choice(number_of_rows, size=len(x)-n, replace=False) # y1 = y[random_indices, :].flatten() # pts = y1[y1 != np.array(None)] # pathlines_mesh.delete_cells(pts) # pathlines_mesh = pathlines_mesh.subsample(0.5) # except: # print("VTK pathlines doesn't exist") # pass # #View xs = max(watertable_elev.points[:, 0]) - min(watertable_elev.points[:, 0]) ys = max(watertable_elev.points[:, 1]) - min(watertable_elev.points[:, 1]) zs = max(watertable_elev.points[:, 2]) - min(watertable_elev.points[:, 2]) if view == 'north': pos = (min(watertable_elev.points[:, 0])+ xs ,max(watertable_elev.points[:,1])+ ys,max(watertable_elev.points[:, 2])*10) if view == 'north-east': pos = (max(watertable_elev.points[:, 0])+ xs ,max(watertable_elev.points[:,1])+ ys,max(watertable_elev.points[:, 2])*10) if view == 'east': pos = (max(watertable_elev.points[:, 0])+ xs ,min(watertable_elev.points[:,1])+ ys,max(watertable_elev.points[:, 2])*10) if view == 'south-east': pos = (max(watertable_elev.points[:, 0])+ xs ,max(watertable_elev.points[:,1])- ys,max(watertable_elev.points[:, 2])*10) if view == 'south': pos = (min(watertable_elev.points[:, 0])+ xs ,min(watertable_elev.points[:,1])- ys,max(watertable_elev.points[:, 2])*10) if view == 'south-west': pos = (min(watertable_elev.points[:, 0])- xs ,min(watertable_elev.points[:,1])- ys,max(watertable_elev.points[:, 2])*10) if view == 'west': pos = (min(watertable_elev.points[:, 0])- xs ,min(watertable_elev.points[:,1])+ ys,max(watertable_elev.points[:, 2])*10) if view == 'north-west': pos = (min(watertable_elev.points[:, 0])- xs ,max(watertable_elev.points[:,1])+ ys,max(watertable_elev.points[:, 2])*10) if view == 'custom': pos = (max(watertable_elev.points[:, 0])+ xs ,max(watertable_elev.points[:,1])+ ys,max(watertable_elev.points[:, 2])*4) if view == 'vertical': pos = (np.mean(watertable_elev.points[:, 0]) ,np.mean(watertable_elev.points[:,1]), np.mean(watertable_elev.points[:, 2])*400) focal = (min(watertable_elev.points[:, 0])+(xs/2), min(watertable_elev.points[:, 1])+(ys/2), zs) cam = dict(pos = pos,focalPoint = focal) for i in range(len(object_list)): obj = object_list[i] logger.info("Processing object %s", obj) if obj == 'grid': plt.show(grid_mesh,contour,stream,"Topographic elevation [m]", at=i, camera=cam, viewup='z', axes = 13, bg=bg) if obj == 'watertable': plt.show(grid_wireframe,contour,stream, watertable_elev,"Watertable elevation [m]", camera=cam, viewup ='z', at=i, axes = 13, bg=bg) if obj == 'watertable_depth': plt.show(grid_wireframe,contour,stream, watertable_depth,"Watertable depth [m]", camera=cam, viewup ='z', at=i, axes = 13, bg=bg) if obj == 'pathlines': #plt.show(grid_wireframe,contour,stream, watertable_blue, pathlines_mesh,"Groundwater flow paths",camera=cam, viewup ='z', at=i, axes = 13) plt.show(grid_wireframe,contour,stream, watertable_blue, pathlines_mesh, "Time pathlines log [d]", camera=cam, viewup ='z', at=i, axes = 13, bg=bg) #plt.show(grid_wireframe,contour,stream, watertable_blue, pathlines_mesh,camera=cam, viewup ='z', at=i, axes = 13) #plt.show(grid_mesh, pathlines_mesh,camera=cam, viewup ='z', at=i, axes = 13) if obj == 'surface_flow': plt.show(grid_wireframe,contour, watertable_blue, surface_flow, "Accumulated outflow log [m3/d]", camera=cam, viewup ='z', at=i, axes = 13, bg=bg) if obj == 'drain_flow': #plt.show(grid_wireframe,contour,stream, watertable_blue, drain_flow,"Groundwater seepage",camera=cam, viewup ='z', at=i, axes = 13) plt.show(grid_wireframe,contour,stream, watertable_blue, drain_flow,"Seepage outflow log [m3/d]", camera=cam, viewup ='z', at=i, axes = 13, bg=bg) #plt.show(grid_wireframe,contour,stream, watertable_blue, drain_flow,camera=cam, viewup ='z', at=i, axes = 13) #plt.show(grid_mesh,drain_flow,camera=cam, viewup ='z', at=i, axes = 13) if interactive == True: plt.show(interactive=1) plt.close() else: plt += __doc__ plt.screenshot(os.path.join(self.watershed.simulations_folder, self.modelname, '_postprocess', '_figures', '3D_'+self.modelname+'.png')).close()
#%% CROSS
[docs] def interactive_cross_section(self, dem_data, wt_data, river_data, interactive): logger.info("Plotting 2D cross-section for model %s", self.modelname) # Modules mpl.rcParams.update(mpl.rcParamsDefault) original_backend = plt.get_backend() backend_supports_events = original_backend in rcsetup.interactive_bk backend_switched = False if interactive and not backend_supports_events: try: plt.switch_backend("QtAgg") except Exception: backend_supports_events = False logger.warning( "Unable to activate QtAgg; falling back to static cross-section output at %s", f"_postprocess/_figures/CROSS_{self.modelname}.png", ) else: backend_supports_events = True backend_switched = True logger.info("Matplotlib interactive backend enabled: QtAgg") elif backend_supports_events: logger.info("Matplotlib interactive backend active: %s", original_backend) else: logger.warning( "Current Matplotlib backend is not interactive; saving static cross-section to %s", f"_postprocess/_figures/CROSS_{self.modelname}.png", ) effective_interactive = interactive and backend_supports_events # Figure params fig, main_ax = plt.subplots(figsize=(5, 5)) divider = make_axes_locatable(main_ax) top_ax = divider.append_axes("top",1.1, pad=0.2, sharex=main_ax) right_ax = divider.append_axes("right",1.1, pad=0.2, sharey=main_ax) # Axis names top_ax.xaxis.set_tick_params(labelbottom=False) right_ax.yaxis.set_tick_params(labelleft=False) main_ax.set_xlabel('X [pixel]') main_ax.set_ylabel('Y [pixel]') top_ax.set_ylabel('Z [m]') right_ax.set_xlabel('Z [m]') # Dimensions xvalues = np.linspace(-1,1,dem_data.shape[1]) yvalues = np.linspace(-1,1,dem_data.shape[0]) xx, yy = np.meshgrid(xvalues,yvalues) if interactive == True: cur_x = dem_data.shape[1] - 1 cur_y = dem_data.shape[0] - 1 else: cur_x = dem_data.shape[1] /2 cur_y = dem_data.shape[0] /2 # Data DEM dem_prof = dem_data.astype(float) dem_prof[dem_prof<0] = np.nan main_ax.imshow( np.ma.masked_array(dem_data, mask=(dem_data<0)), origin='lower', cmap='terrain', alpha=0.5 ) # Plot contour try: with rasterio.open(self.watershed.geographic.watershed_contour_tif) as src: cont = src.read(1) main_ax.imshow(np.ma.masked_where(cont<0, cont), cmap=mpl.colors.ListedColormap(['k']), interpolation='none') except Exception: logger.warning("No contour raster available for cross-section overlay") pass try: river_plot = np.ma.masked_array(river_data, mask=(river_data<=0)) main_ax.imshow(river_plot, origin='lower', cmap=mpl.colors.ListedColormap('navy'), interpolation='none') except Exception: logger.warning("No river raster available for cross-section overlay") pass main_ax.invert_yaxis() # Data wt wt_prof = wt_data.astype(float) wt_prof[wt_prof<0] = np.nan # Scaling axis main_ax.autoscale(enable=False) right_ax.autoscale(enable=False) top_ax.autoscale(enable=False) dem_max = np.nanmax(dem_prof) # Calculate the maximum dem value right_ax.set_xlim(np.nanmin(wt_prof), dem_max) top_ax.set_ylim(np.nanmin(wt_prof), dem_max) right_ax.set_ylim(0, dem_data.shape[0]) right_ax.invert_yaxis() top_ax.set_xlim(0, dem_data.shape[1]) v_line = main_ax.axvline(cur_x, color='k', lw=2) h_line = main_ax.axhline(cur_y, color='k', lw=2) # Create initial data lw = 1.5 if interactive else 1 dem_v = dem_prof[:, int(cur_x)]; dem_v[dem_v==0] = np.nan wt_v = wt_prof[:, int(cur_x)]; wt_v[wt_v==0] = np.nan dem_h = dem_prof[int(cur_y), :]; dem_h[dem_h==0] = np.nan wt_h = wt_prof[int(cur_y), :]; wt_h[wt_h==0] = np.nan # Create initial plots dem_v_prof, = right_ax.plot(dem_v, np.arange(xx.shape[0]), c='saddlebrown', lw=lw) wt_v_prof, = right_ax.plot(wt_v, np.arange(xx.shape[0]), c='dodgerblue', lw=lw) dem_h_prof, = top_ax.plot(np.arange(xx.shape[1]), dem_h, c='saddlebrown', lw=lw) wt_h_prof, = top_ax.plot(np.arange(xx.shape[1]), wt_h, c='dodgerblue', lw=lw) # Store references to the fill collections water_fill_v = [right_ax.fill_betweenx(np.arange(xx.shape[0]), 0, wt_v, color='deepskyblue', alpha=0.5, lw=0)] soil_fill_v = [right_ax.fill_betweenx(np.arange(xx.shape[0]), wt_v, dem_v, color='saddlebrown', alpha=0.5, lw=0)] water_fill_h = [top_ax.fill_between(np.arange(xx.shape[1]), 0, wt_h, color='deepskyblue', alpha=0.5, lw=0)] soil_fill_h = [top_ax.fill_between(np.arange(xx.shape[1]), wt_h, dem_h, color='saddlebrown', alpha=0.5, lw=0)] plt.tight_layout() # Animation interactive def on_move(event): if event.inaxes is main_ax: try: x, y = int(event.xdata), int(event.ydata) # Check bounds if x < 0 or x >= dem_prof.shape[1] or y < 0 or y >= dem_prof.shape[0]: return v_line.set_xdata([x, x]) h_line.set_ydata([y, y]) # Extract new profile data new_dem_v = dem_prof[:, x].copy() new_wt_v = wt_prof[:, x].copy() new_dem_h = dem_prof[y, :].copy() new_wt_h = wt_prof[y, :].copy() new_dem_v[new_dem_v==0] = np.nan new_wt_v[new_wt_v==0] = np.nan new_dem_h[new_dem_h==0] = np.nan new_wt_h[new_wt_h==0] = np.nan # Update line data dem_v_prof.set_xdata(new_dem_v) wt_v_prof.set_xdata(new_wt_v) dem_h_prof.set_ydata(new_dem_h) wt_h_prof.set_ydata(new_wt_h) # Remove old fill areas for collection in water_fill_v + soil_fill_v + water_fill_h + soil_fill_h: if collection in right_ax.collections or collection in top_ax.collections: collection.remove() water_fill_v.clear() soil_fill_v.clear() water_fill_h.clear() soil_fill_h.clear() # Create new fill areas water_fill_v.append(right_ax.fill_betweenx(np.arange(xx.shape[0]), 0, new_wt_v, color='deepskyblue', alpha=0.5, lw=0)) soil_fill_v.append(right_ax.fill_betweenx(np.arange(xx.shape[0]), new_wt_v, new_dem_v, color='saddlebrown', alpha=0.5, lw=0)) water_fill_h.append(top_ax.fill_between(np.arange(xx.shape[1]), 0, new_wt_h, color='deepskyblue', alpha=0.5, lw=0)) soil_fill_h.append(top_ax.fill_between(np.arange(xx.shape[1]), new_wt_h, new_dem_h, color='saddlebrown', alpha=0.5, lw=0)) # Force a redraw fig.canvas.draw() fig.canvas.flush_events() except Exception as e: pass if effective_interactive: cid = fig.canvas.mpl_connect('motion_notify_event', on_move) elif interactive: logger.warning("Matplotlib backend lacks interactivity; displaying static cross-section") # Save and display fig.savefig(os.path.join( self.watershed.simulations_folder, self.modelname, '_postprocess','_figures', f'CROSS_{self.modelname}.png' )) plt.show(block=effective_interactive) if backend_switched: plt.close(fig) try: plt.switch_backend(original_backend) except Exception: pass
#%% NOTES