diff --git a/Tests/dea_tools/test_coastal.py b/Tests/dea_tools/test_coastal.py index f3ece21db..d58e4a99e 100644 --- a/Tests/dea_tools/test_coastal.py +++ b/Tests/dea_tools/test_coastal.py @@ -17,6 +17,7 @@ GAUGE_X = 122.2183 GAUGE_Y = -18.0008 +ENSEMBLE_MODELS = ["FES2014", "HAMTIDE11"] # simplified for tests @pytest.fixture() @@ -301,6 +302,183 @@ def test_model_tides_mode(mode, models, output_format): ) +# Test ensemble modelling functionality +def test_model_tides_ensemble(): + # Input params + x = [122.14, 144.910368] + y = [-17.91, -37.919491] + times = pd.date_range("2020", "2021", periods=2) + + # Default, only ensemble requested + modelled_tides_df = model_tides( + x=x, + y=y, + time=times, + model="ensemble", + ensemble_models=ENSEMBLE_MODELS, + ) + + assert modelled_tides_df.index.names == ["time", "x", "y"] + assert modelled_tides_df.columns.tolist() == ["tide_model", "tide_m"] + assert all(modelled_tides_df.tide_model == "ensemble") + + # Default, ensemble + other models requested + models = ["FES2014", "HAMTIDE11", "ensemble"] + modelled_tides_df = model_tides( + x=x, + y=y, + time=times, + model=models, + ensemble_models=ENSEMBLE_MODELS, + ) + + assert modelled_tides_df.index.names == ["time", "x", "y"] + assert modelled_tides_df.columns.tolist() == ["tide_model", "tide_m"] + assert set(modelled_tides_df.tide_model) == set(models) + assert np.allclose( + modelled_tides_df.tide_m, + [ + -2.819, + -1.850, + -0.215, + 0.037, + -2.623, + -1.803, + 0.073, + -0.069, + -2.721, + -1.826, + -0.071, + -0.0158, + ], + rtol=0.02, + ) + + # One-to-one mode + modelled_tides_df = model_tides( + x=x, + y=y, + time=times, + model=models, + mode="one-to-one", + ensemble_models=ENSEMBLE_MODELS, + ) + + assert modelled_tides_df.index.names == ["time", "x", "y"] + assert modelled_tides_df.columns.tolist() == ["tide_model", "tide_m"] + assert set(modelled_tides_df.tide_model) == set(models) + + # Wide mode, default + modelled_tides_df = model_tides( + x=x, + y=y, + time=times, + model=models, + output_format="wide", + ensemble_models=ENSEMBLE_MODELS, + ) + + # Check that expected models exist, and that ensemble is approx average + # of other two models + assert set(modelled_tides_df.columns) == set(models) + assert np.allclose( + 0.5 * (modelled_tides_df.FES2014 + modelled_tides_df.HAMTIDE11), + modelled_tides_df.ensemble, + ) + + # Wide mode, top n == 1 + modelled_tides_df = model_tides( + x=x, + y=y, + time=times, + model=models, + output_format="wide", + ensemble_top_n=1, + ensemble_models=ENSEMBLE_MODELS, + ) + + # Check that expected models exist, and that ensemble is equal to at + # least one of the other models + assert set(modelled_tides_df.columns) == set(models) + assert all( + (modelled_tides_df.FES2014 == modelled_tides_df.ensemble) + | (modelled_tides_df.HAMTIDE11 == modelled_tides_df.ensemble) + ) + + # Check that correct model is the closest at each row + closer_model = modelled_tides_df.apply( + lambda row: ( + "FES2014" + if abs(row["ensemble"] - row["FES2014"]) + < abs(row["ensemble"] - row["HAMTIDE11"]) + else "HAMTIDE11" + ), + axis=1, + ).tolist() + assert closer_model == ["FES2014", "HAMTIDE11", "FES2014", "HAMTIDE11"] + + # Check values are expected + assert np.allclose( + modelled_tides_df.ensemble, [-2.819, 0.0730, -1.850, -0.069], rtol=0.01 + ) + + # Wide mode, custom functions + ensemble_funcs = { + "ensemble-best": lambda x: x["rank"] == 1, + "ensemble-worst": lambda x: x["rank"] == 2, + "ensemble-mean-top2": lambda x: x["rank"].isin([1, 2]), + "ensemble-mean-weighted": lambda x: 3 - x["rank"], + "ensemble-mean": lambda x: x["rank"] <= 2, + } + modelled_tides_df = model_tides( + x=x, + y=y, + time=times, + model=models, + output_format="wide", + ensemble_func=ensemble_funcs, + ensemble_models=ENSEMBLE_MODELS, + ) + + # Check that expected models exist, and that valid data is produced + assert set(modelled_tides_df.columns) == set( + [ + "FES2014", + "HAMTIDE11", + "ensemble-best", + "ensemble-worst", + "ensemble-mean-top2", + "ensemble-mean-weighted", + "ensemble-mean", + ] + ) + assert all(modelled_tides_df.notnull()) + + # Long mode, custom functions + modelled_tides_df = model_tides( + x=x, + y=y, + time=times, + model=models, + output_format="long", + ensemble_func=ensemble_funcs, + ensemble_models=ENSEMBLE_MODELS, + ) + + # Check that expected models exist in "tide_model" column + assert set(modelled_tides_df.tide_model) == set( + [ + "FES2014", + "HAMTIDE11", + "ensemble-best", + "ensemble-worst", + "ensemble-mean-top2", + "ensemble-mean-weighted", + "ensemble-mean", + ] + ) + + # Run tests for default and custom resolutions @pytest.mark.parametrize("resolution", [None, "custom"]) def test_pixel_tides(satellite_ds, measured_tides_ds, resolution): @@ -488,6 +666,59 @@ def test_pixel_tides_dask(satellite_ds, dask_chunks): assert output_chunks == dask_chunks +# Run test pixel tides and ensemble modelling +def test_pixel_tides_ensemble(satellite_ds): + # Model tides using `pixel_tides` and default ensemble model + modelled_tides_ds, _ = pixel_tides( + satellite_ds, + model="ensemble", + ensemble_models=ENSEMBLE_MODELS, + ) + + assert modelled_tides_ds.tide_model == "ensemble" + + # Model tides using `pixel_tides` and multiple models including + # ensemble and custom IDW params + models = ["FES2014", "HAMTIDE11", "ensemble"] + modelled_tides_ds, _ = pixel_tides( + satellite_ds, + model=models, + ensemble_models=ENSEMBLE_MODELS, + k=10, + max_dist=20000, + ) + + assert "tide_model" in modelled_tides_ds.dims + assert set(modelled_tides_ds.tide_model.values) == set(models) + + # Model tides using `pixel_tides` and custom ensemble funcs + ensemble_funcs = { + "ensemble-best": lambda x: x["rank"] == 1, + "ensemble-worst": lambda x: x["rank"] == 2, + "ensemble-mean-top2": lambda x: x["rank"].isin([1, 2]), + "ensemble-mean-weighted": lambda x: 3 - x["rank"], + "ensemble-mean": lambda x: x["rank"] <= 2, + } + modelled_tides_ds, _ = pixel_tides( + satellite_ds, + model=models, + ensemble_func=ensemble_funcs, + ensemble_models=ENSEMBLE_MODELS, + ) + + assert set(modelled_tides_ds.tide_model.values) == set( + [ + "FES2014", + "HAMTIDE11", + "ensemble-best", + "ensemble-worst", + "ensemble-mean-top2", + "ensemble-mean-weighted", + "ensemble-mean", + ] + ) + + @pytest.mark.parametrize( "ebb_flow, swap_dims, tidepost_lat, tidepost_lon", [ diff --git a/Tools/dea_tools/coastal.py b/Tools/dea_tools/coastal.py index 479f2b56f..e5b7fc9e0 100644 --- a/Tools/dea_tools/coastal.py +++ b/Tools/dea_tools/coastal.py @@ -16,7 +16,7 @@ If you would like to report an issue with this script, you can file one on GitHub (https://github.com/GeoscienceAustralia/dea-notebooks/issues/new). -Last modified: November 2023 +Last modified: June 2024 """ @@ -40,6 +40,7 @@ from datacube.utils.geometry import CRS from dea_tools.datahandling import parallel_apply +from dea_tools.spatial import idw # Fix converters for tidal plot from pandas.plotting import register_matplotlib_converters @@ -118,9 +119,11 @@ def _intersect_dist(transect_gdf, lines_gdf, mode=mode): if mode == "distance": start_point = Point(transect_gdf.geometry.coords[0]) point_df = intersect_points.apply( - lambda x: pd.Series({"start": start_point, "end": x}) - if x.type == "Point" - else pd.Series({"start": None, "end": None}) + lambda x: ( + pd.Series({"start": start_point, "end": x}) + if x.type == "Point" + else pd.Series({"start": None, "end": None}) + ) ) # In width mode, identify transects with multiple intersections, and @@ -128,9 +131,11 @@ def _intersect_dist(transect_gdf, lines_gdf, mode=mode): # intersection for the end point when measuring distances if mode == "width": point_df = intersect_points.apply( - lambda x: pd.Series({"start": x.geoms[0], "end": x.geoms[-1]}) - if x.type == "MultiPoint" - else pd.Series({"start": None, "end": None}) + lambda x: ( + pd.Series({"start": x.geoms[0], "end": x.geoms[-1]}) + if x.type == "MultiPoint" + else pd.Series({"start": None, "end": None}) + ) ) # Calculate distances between valid start and end points @@ -407,6 +412,172 @@ def _model_tides( return tide_df +def _ensemble_model( + x, + y, + crs, + tide_df, + ensemble_models, + ensemble_func=None, + ensemble_top_n=3, + ranking_points="https://dea-public-data-dev.s3-ap-southeast-2.amazonaws.com/derivative/dea_intertidal/supplementary/rankings_ensemble_2017-2019.geojson", + ranking_valid_perc=0.02, + **idw_kwargs, +): + """ + Combine multiple tide models into a single locally optimised + ensemble tide model using external model ranking data (e.g. + satellite altimetry or NDWI-tide correlations along the coastline) + to inform the selection of the best local models. + + This function performs the following steps: + 1. Loads model ranking points from a GeoJSON file, filters them + based on the valid data percentage, and retains relevant columns + 2. Interpolates the model rankings into the requested x and y + coordinates using Inverse Weighted Interpolation (IDW) + 3. Uses rankings to combine multiple tide models into a single + optimised ensemble model (by default, by taking the mean of the + top 3 ranked models) + 4. Returns a DataFrame with the combined ensemble model predictions + + Parameters + ---------- + x : array-like + Array of x-coordinates where the ensemble model predictions are + required. + y : array-like + Array of y-coordinates where the ensemble model predictions are + required. + crs : string + Input coordinate reference system for x and y coordinates. Used + to ensure that interpolations are performed in the correct CRS. + tide_df : pandas.DataFrame + DataFrame containing tide model predictions with columns + `["time", "x", "y", "tide_m", "tide_model"]`. + ensemble_models : list + A list of models to include in the ensemble modelling process. + All values must exist as columns with the prefix "rank_" in + `ranking_points`. + ensemble_func : dict, optional + By default, a simple ensemble model will be calculated by taking + the mean of the `ensemble_top_n` tide models at each location. + However, a dictionary containing more complex ensemble + calculations can also be provided. Dictionary keys are used + to name output ensemble models; functions should take a column + named "rank" and convert it to a weighting, e.g.: + `ensemble_func = {"ensemble-custom": lambda x: x["rank"] <= 3}` + ensemble_top_n : int, optional + If `ensemble_func` is None, this sets the number of top models + to include in the mean ensemble calculation. Defaults to 3. + ranking_points : str, optional + Path to the GeoJSON file containing model ranking points. This + dataset should include columns containing rankings for each tide + model, named with the prefix "rank_". e.g. "rank_FES2014". + Low values should represent high rankings (e.g. 1 = top ranked). + ranking_valid_perc : float, optional + Minimum percentage of valid data required to include a model + rank point in the analysis, as defined in a column named + "valid_perc". Defaults to 0.02. + **idw_kwargs + Optional keyword arguments to pass to the `idw` function used + for interpolation. Useful values include `k` (number of nearest + neighbours to use in interpolation), `max_dist` (maximum + distance to nearest neighbours), and `k_min` (minimum number of + neighbours required after `max_dist` is applied). + + Returns + ------- + pandas.DataFrame + DataFrame containing the ensemble model predictions, matching + the format of the input `tide_df` (e.g. columns `["time", "x", + "y", "tide_m", "tide_model"]`. By default the 'tide_model' + column will be labeled "ensemble" for the combined model + predictions (but if a custom dictionary of ensemble functions is + provided via `ensemble_func`, each ensemble will be named using + the provided dictionary keys). + """ + + # Load model ranks points and reproject to same CRS as x and y + model_ranking_cols = [f"rank_{m}" for m in ensemble_models] + model_ranks_gdf = ( + gpd.read_file(ranking_points) + .to_crs(crs) + .query(f"valid_perc > {ranking_valid_perc}") + .dropna()[model_ranking_cols + ["geometry"]] + ) + + # Use points to interpolate model rankings into requested x and y + id_kwargs_str = "" if idw_kwargs == {} else idw_kwargs + print(f"Interpolating model rankings using IDW interpolation {id_kwargs_str}") + ensemble_ranks_df = ( + # Run IDW interpolation on subset of ranking columns + pd.DataFrame( + idw( + input_z=model_ranks_gdf[model_ranking_cols], + input_x=model_ranks_gdf.geometry.x, + input_y=model_ranks_gdf.geometry.y, + output_x=x, + output_y=y, + **idw_kwargs, + ), + columns=model_ranking_cols, + ) + .assign(x=x, y=y) + # Drop any duplicates then melt columns into long format + .drop_duplicates() + .melt(id_vars=["x", "y"], var_name="tide_model", value_name="rank") + # Remore "rank_" prefix to get plain model names + .replace({"^rank_": ""}, regex=True) + # Set index columns and rank across groups + .set_index(["tide_model", "x", "y"]) + .groupby(["x", "y"]) + .rank() + ) + + # If no custom ensemble funcs are provided, use a default ensemble + # calculation that takes the mean of the top N tide models + if ensemble_func is None: + ensemble_func = {"ensemble": lambda x: x["rank"] <= ensemble_top_n} + + # Create output list to hold computed ensemble model outputs + ensemble_list = [] + + # Loop through all provided ensemble generation functions + for ensemble_n, ensemble_f in ensemble_func.items(): + + print(f"Combining models into single {ensemble_n} model") + + # Join ranks to input tide data, compute weightings and group + grouped = ( + # Add tide model as an index so we can join with model ranks + tide_df.set_index("tide_model", append=True).join(ensemble_ranks_df) + # Add temp columns containing weightings and weighted values + .assign( + weights=ensemble_f, # use custom func to compute weights + weighted=lambda i: i.tide_m * i.weights, + ) + # Groupby is specified in a weird order here as this seems + # to be the easiest way to preserve correct index sorting + .groupby(["x", "y", "time"]) + ) + + # Use weightings to combine multiple models into single ensemble + ensemble_df = ( + # Calculate weighted mean and convert back to dataframe + grouped.weighted.sum() + .div(grouped.weights.sum()) + .to_frame("tide_m") + # Label ensemble model and ensure indexes are in expected order + .assign(tide_model=ensemble_n) + .reorder_levels(["time", "x", "y"], axis=0) + ) + + ensemble_list.append(ensemble_df) + + # Combine all ensemble models and return as a single dataframe + return pd.concat(ensemble_list) + + def model_tides( x, y, @@ -422,15 +593,17 @@ def model_tides( parallel_splits=5, output_units="m", output_format="long", - epsg=None, + ensemble_models=None, + **ensemble_kwargs, ): """ Compute tides at multiple points and times using tidal harmonics. - This function supports any tidal model supported by - `pyTMD`, including the FES2014 Finite Element Solution - tide model, and the TPXO8-atlas and TPXO9-atlas-v5 - TOPEX/POSEIDON global tide models. + This function supports all tidal models supported by `pyTMD`, + including FES Finite Element Solution models, TPXO TOPEX/POSEIDON + models, EOT Empirical Ocean Tide models, GOT Global Ocean Tide + models, and HAMTIDE Hamburg direct data Assimilation Methods for + Tides models. This function requires access to tide model data files. These should be placed in a folder with subfolders matching @@ -474,12 +647,14 @@ def model_tides( model tides in UTC time. model : string, optional The tide model used to model tides. Options include: - - "FES2014" (only pre-configured option on DEA Sandbox) + - "FES2014" (pre-configured on DEA Sandbox) - "TPXO9-atlas-v5" - "TPXO8-atlas" - "EOT20" - "HAMTIDE11" - "GOT4.10" + - "ensemble" (advanced ensemble tide model functionality; + combining multiple models based on external model rankings) directory : string, optional The directory containing tide model data files. If no path is provided, this will default to the environment variable @@ -496,7 +671,7 @@ def model_tides( Input coordinate reference system for x and y coordinates. Defaults to "EPSG:4326" (WGS84; degrees latitude, longitude). method : string, optional - Method used to interpolate tidal contsituents + Method used to interpolate tidal constituents from model files. Options include: - "spline": scipy bivariate spline interpolation (default) - "bilinear": quick bilinear interpolation @@ -542,8 +717,19 @@ def model_tides( results stacked vertically along "tide_model" and "tide_m" columns), or wide format (with a column for each tide model). Defaults to "long". - epsg : int, DEPRECATED - Deprecated; use "crs" instead. + ensemble_models : list, optional + An optional list of models used to generate the ensemble tide + model if "ensemble" tide modelling is requested. Defaults to + ["FES2014", "TPXO9-atlas-v5", "EOT20", "HAMTIDE11", "GOT4.10", + "FES2012", "TPXO8-atlas-v1"]. + **ensemble_kwargs : + Keyword arguments used to customise the generation of optional + ensemble tide models if "ensemble" modelling are requested. + These are passed to the underlying `_ensemble_model` function. + Useful parameters include `ranking_points` (path to model + rankings data), `k` (for controlling how model rankings are + interpolated), and `ensemble_top_n` (how many top models to use + in the ensemble calculation). Returns ------- @@ -551,16 +737,6 @@ def model_tides( combination of time and point coordinates. """ - - # Deprecate `epsg` param - if epsg is not None: - warn( - "The `epsg` parameter is deprecated; please use `crs` to " - "provide CRS information in the form 'EPSG:XXXX'", - DeprecationWarning, - stacklevel=2, - ) - # Set tide modelling files directory. If no custom path is provided, # first try global environmental var, then "/var/share/tide_models" if directory is None: @@ -579,7 +755,7 @@ def model_tides( time = time.to_datetime64() # Turn inputs into arrays for consistent handling - model = np.atleast_1d(model) + models_requested = np.atleast_1d(model) x = np.atleast_1d(x) y = np.atleast_1d(y) time = np.atleast_1d(time) @@ -603,23 +779,47 @@ def model_tides( "you intended to model multiple timesteps at each point." ) - # Verify that all provided models are in list of supported models + # Verify that all provided models are supported valid_models = [ "FES2014", "TPXO9-atlas-v5", - "TPXO8-atlas", "EOT20", "HAMTIDE11", "GOT4.10", - "FES2012", # Requires custom tide model definition file + "TPXO8-atlas", "TPXO8-atlas-v1", # Requires custom tide model definition file + "FES2012", # Requires custom tide model definition file + "ensemble", # Advanced ensemble model functionality ] - if not all(m in valid_models for m in model): + if not all(m in valid_models for m in models_requested): raise ValueError( - f"One or more of the models requested {model} is not valid. " - f"The following models are currently supported: {valid_models}" + f"One or more of the models requested {models_requested} is " + f"not valid. The following models are currently supported: " + f"{valid_models}" + ) + + # If ensemble modelling is requested, use a custom list of models + # for subsequent processing + if "ensemble" in models_requested: + print("Running ensemble tide modelling") + models_to_process = ( + ensemble_models + if ensemble_models is not None + else [ + "FES2014", + "TPXO9-atlas-v5", + "EOT20", + "HAMTIDE11", + "GOT4.10", + "FES2012", + "TPXO8-atlas-v1", + ] ) + # Otherwise, models to process are the same as those requested + else: + models_to_process = models_requested + # Update tide modelling func to add default keyword arguments that # are used for every iteration during parallel processing iter_func = partial( @@ -637,12 +837,12 @@ def model_tides( parallel_splits = min(parallel_splits, len(x)) # Parallelise if either multiple models or multiple splits requested - if parallel & ((len(model) > 1) | (parallel_splits > 1)): + if parallel & ((len(models_to_process) > 1) | (parallel_splits > 1)): from concurrent.futures import ProcessPoolExecutor from tqdm import tqdm with ProcessPoolExecutor() as executor: - print(f"Modelling tides using {', '.join(model)} in parallel") + print(f"Modelling tides using {', '.join(models_to_process)} in parallel") # Optionally split lon/lat points into `splits_n` chunks # that will be applied in parallel @@ -659,7 +859,7 @@ def model_tides( model_iters, x_iters, y_iters = zip( *[ (m, x_split[i], y_split[i]) - for m in model + for m in models_to_process for i in range(parallel_splits) ] ) @@ -669,7 +869,7 @@ def model_tides( model_iters, x_iters, y_iters, time_iters = zip( *[ (m, x_split[i], y_split[i], time_split[i]) - for m in model + for m in models_to_process for i in range(parallel_splits) ] ) @@ -686,7 +886,7 @@ def model_tides( else: model_outputs = [] - for model_i in model: + for model_i in models_to_process: print(f"Modelling tides using {model_i}") tide_df = iter_func(model_i, x, y, time) model_outputs.append(tide_df) @@ -694,6 +894,19 @@ def model_tides( # Combine outputs into a single dataframe tide_df = pd.concat(model_outputs, axis=0) + # Optionally compute ensemble model and add to dataframe + if "ensemble" in models_requested: + ensemble_df = _ensemble_model( + x, y, crs, tide_df, models_to_process, **ensemble_kwargs + ) + + # Update requested models with any custom ensemble models, then + # filter the dataframe to keep only models originally requested + models_requested = np.union1d(models_requested, ensemble_df.tide_model.unique()) + tide_df = pd.concat([tide_df, ensemble_df]).query( + "tide_model in @models_requested" + ) + # Optionally convert to a wide format dataframe with a tide model in # each dataframe column if output_format == "wide": @@ -755,7 +968,7 @@ def _pixel_tides_resample( """ # Determine spatial dimensions y_dim, x_dim = ds.odc.spatial_dims - + # Convert array to Dask, using no chunking along y and x dims, # and a single chunk for each timestep/quantile and tide model tides_lowres_dask = tides_lowres.chunk(