From a3e3553ce7ba36fba03332443060d556a155a28d Mon Sep 17 00:00:00 2001 From: Romain Hugonnet Date: Thu, 31 Aug 2023 18:04:05 -0800 Subject: [PATCH 1/7] Rename pipelines in workflows and fix bias_vars bug --- tests/test_coreg.py | 384 ++++++++++++---------- xdem/coreg/__init__.py | 2 +- xdem/coreg/base.py | 5 +- xdem/coreg/biascorr.py | 4 +- xdem/coreg/{pipelines.py => workflows.py} | 0 5 files changed, 221 insertions(+), 174 deletions(-) rename xdem/coreg/{pipelines.py => workflows.py} (100%) diff --git a/tests/test_coreg.py b/tests/test_coreg.py index c106814e..ae9f58fe 100644 --- a/tests/test_coreg.py +++ b/tests/test_coreg.py @@ -91,20 +91,6 @@ def test_copy(self, coreg_class: Callable[[], AffineCoreg]) -> None: assert corr_copy._meta != corr._meta assert not hasattr(corr_copy, "vshift") - # Create a pipeline, add some metadata, and copy it - pipeline = coreg_class() + coreg_class() - pipeline.pipeline[0]._meta["vshift"] = 1 - - pipeline_copy = pipeline.copy() - - # Add some more metadata after copying (this should not be transferred) - pipeline._meta["resolution"] = 30 - pipeline_copy.pipeline[0]._meta["offset_north_px"] = 0.5 - - assert pipeline._meta != pipeline_copy._meta - assert pipeline.pipeline[0]._meta != pipeline_copy.pipeline[0]._meta - assert pipeline_copy.pipeline[0]._meta["vshift"] - def test_vertical_shift(self) -> None: warnings.simplefilter("error") @@ -371,71 +357,6 @@ def test_icp_opencv(self) -> None: assert aligned_dem.shape == self.ref.data.squeeze().shape - def test_pipeline(self) -> None: - warnings.simplefilter("error") - - # Create a pipeline from two coreg methods. - pipeline = coreg.CoregPipeline([coreg.VerticalShift(), coreg.NuthKaab()]) - pipeline.fit(**self.fit_params) - - aligned_dem, _ = pipeline.apply(self.tba.data, self.ref.transform, self.ref.crs) - - assert aligned_dem.shape == self.ref.data.squeeze().shape - - # Make a new pipeline with two vertical shift correction approaches. - pipeline2 = coreg.CoregPipeline([coreg.VerticalShift(), coreg.VerticalShift()]) - # Set both "estimated" vertical shifts to be 1 - pipeline2.pipeline[0]._meta["vshift"] = 1 - pipeline2.pipeline[1]._meta["vshift"] = 1 - - # Assert that the combined vertical shift is 2 - assert pipeline2.to_matrix()[2, 3] == 2.0 - - def test_pipeline_pts(self) -> None: - warnings.simplefilter("ignore") - - pipeline = coreg.NuthKaab() + coreg.GradientDescending() - ref_points = self.ref.to_points(as_array=False, subset=5000, pixel_offset="center").ds - ref_points["E"] = ref_points.geometry.x - ref_points["N"] = ref_points.geometry.y - ref_points.rename(columns={"b1": "z"}, inplace=True) - - # Check that this runs without error - pipeline.fit_pts(reference_dem=ref_points, dem_to_be_aligned=self.tba) - - for part in pipeline.pipeline: - assert np.abs(part._meta["offset_east_px"]) > 0 - - assert pipeline.pipeline[0]._meta["offset_east_px"] != pipeline.pipeline[1]._meta["offset_east_px"] - - def test_coreg_add(self) -> None: - warnings.simplefilter("error") - # Test with a vertical shift of 4 - vshift = 4 - - vshift1 = coreg.VerticalShift() - vshift2 = coreg.VerticalShift() - - # Set the vertical shift attribute - for vshift_corr in (vshift1, vshift2): - vshift_corr._meta["vshift"] = vshift - - # Add the two coregs and check that the resulting vertical shift is 2* vertical shift - vshift3 = vshift1 + vshift2 - assert vshift3.to_matrix()[2, 3] == vshift * 2 - - # Make sure the correct exception is raised on incorrect additions - with pytest.raises(ValueError, match="Incompatible add type"): - vshift1 + 1 # type: ignore - - # Try to add a Coreg step to an already existing CoregPipeline - vshift4 = vshift3 + vshift1 - assert vshift4.to_matrix()[2, 3] == vshift * 3 - - # Try to add two CoregPipelines - vshift5 = vshift3 + vshift3 - assert vshift5.to_matrix()[2, 3] == vshift * 4 - def test_subsample(self) -> None: warnings.simplefilter("error") @@ -497,96 +418,6 @@ def test_subsample(self) -> None: # Check that the estimated biases are similar assert deramp_sub._meta["coefficients"] == pytest.approx(deramp_full._meta["coefficients"], rel=1e-1) - @pytest.mark.parametrize( - "pipeline", [coreg.VerticalShift(), coreg.VerticalShift() + coreg.NuthKaab()] - ) # type: ignore - @pytest.mark.parametrize("subdivision", [4, 10]) # type: ignore - def test_blockwise_coreg(self, pipeline: AffineCoreg, subdivision: int) -> None: - warnings.simplefilter("error") - - blockwise = coreg.BlockwiseCoreg(step=pipeline, subdivision=subdivision) - - # Results can not yet be extracted (since fit has not been called) and should raise an error - with pytest.raises(AssertionError, match="No coreg results exist.*"): - blockwise.to_points() - - blockwise.fit(**self.fit_params) - points = blockwise.to_points() - - # Validate that the number of points is equal to the amount of subdivisions. - assert points.shape[0] == subdivision - - # Validate that the points do not represent only the same location. - assert np.sum(np.linalg.norm(points[:, :, 0] - points[:, :, 1], axis=1)) != 0.0 - - z_diff = points[:, 2, 1] - points[:, 2, 0] - - # Validate that all values are different - assert np.unique(z_diff).size == z_diff.size, "Each coreg cell should have different results." - - # Validate that the BlockwiseCoreg doesn't accept uninstantiated Coreg classes - with pytest.raises(ValueError, match="instantiated Coreg subclass"): - coreg.BlockwiseCoreg(step=coreg.VerticalShift, subdivision=1) # type: ignore - - # Metadata copying has been an issue. Validate that all chunks have unique ids - chunk_numbers = [m["i"] for m in blockwise._meta["step_meta"]] - assert np.unique(chunk_numbers).shape[0] == len(chunk_numbers) - - transformed_dem = blockwise.apply(self.tba) - - ddem_pre = (self.ref - self.tba)[~self.inlier_mask] - ddem_post = (self.ref - transformed_dem)[~self.inlier_mask] - - # Check that the periglacial difference is lower after coregistration. - assert abs(np.ma.median(ddem_post)) < abs(np.ma.median(ddem_pre)) - - stats = blockwise.stats() - - # Check that nans don't exist (if they do, something has gone very wrong) - assert np.all(np.isfinite(stats["nmad"])) - # Check that offsets were actually calculated. - assert np.sum(np.abs(np.linalg.norm(stats[["x_off", "y_off", "z_off"]], axis=0))) > 0 - - def test_blockwise_coreg_large_gaps(self) -> None: - """Test BlockwiseCoreg when large gaps are encountered, e.g. around the frame of a rotated DEM.""" - warnings.simplefilter("error") - reference_dem = self.ref.reproject(dst_crs="EPSG:3413", dst_res=self.ref.res, resampling="bilinear") - dem_to_be_aligned = self.tba.reproject(dst_ref=reference_dem, resampling="bilinear") - - blockwise = xdem.coreg.BlockwiseCoreg(xdem.coreg.NuthKaab(), 64, warn_failures=False) - - # This should not fail or trigger warnings as warn_failures is False - blockwise.fit(reference_dem, dem_to_be_aligned) - - stats = blockwise.stats() - - # We expect holes in the blockwise coregistration, so there should not be 64 "successful" blocks. - assert stats.shape[0] < 64 - - # Statistics are only calculated on finite values, so all of these should be finite as well. - assert np.all(np.isfinite(stats)) - - # Copy the TBA DEM and set a square portion to nodata - tba = self.tba.copy() - mask = np.zeros(np.shape(tba.data), dtype=bool) - mask[450:500, 450:500] = True - tba.set_mask(mask=mask) - - blockwise = xdem.coreg.BlockwiseCoreg(xdem.coreg.NuthKaab(), 8, warn_failures=False) - - # Align the DEM and apply the blockwise to a zero-array (to get the zshift) - aligned = blockwise.fit(self.ref, tba).apply(tba) - zshift, _ = blockwise.apply(np.zeros_like(tba.data), transform=tba.transform, crs=tba.crs) - - # Validate that the zshift is not something crazy high and that no negative values exist in the data. - assert np.nanmax(np.abs(zshift)) < 50 - assert np.count_nonzero(aligned.data.compressed() < -50) == 0 - - # Check that coregistration improved the alignment - ddem_post = (aligned - self.ref).data.compressed() - ddem_pre = (tba - self.ref).data.compressed() - assert abs(np.nanmedian(ddem_pre)) > abs(np.nanmedian(ddem_post)) - assert np.nanstd(ddem_pre) > np.nanstd(ddem_post) def test_coreg_raster_and_ndarray_args(self) -> None: @@ -832,6 +663,221 @@ def test_coreg_oneliner(self) -> None: assert np.array_equal(dem_arr, dem_arr2_fixed) +class TestCoregPipeline: + + ref, tba, outlines = load_examples() # Load example reference, to-be-aligned and mask. + inlier_mask = ~outlines.create_mask(ref) + + fit_params = dict( + reference_dem=ref.data, + dem_to_be_aligned=tba.data, + inlier_mask=inlier_mask, + transform=ref.transform, + crs=ref.crs, + verbose=False, + ) + # Create some 3D coordinates with Z coordinates being 0 to try the apply_pts functions. + points = np.array([[1, 2, 3, 4], [1, 2, 3, 4], [0, 0, 0, 0]], dtype="float64").T + + @pytest.mark.parametrize("coreg_class", [coreg.VerticalShift, coreg.ICP, coreg.NuthKaab]) # type: ignore + def test_copy(self, coreg_class: Callable[[], AffineCoreg]) -> None: + + # Create a pipeline, add some metadata, and copy it + pipeline = coreg_class() + coreg_class() + pipeline.pipeline[0]._meta["vshift"] = 1 + + pipeline_copy = pipeline.copy() + + # Add some more metadata after copying (this should not be transferred) + pipeline._meta["resolution"] = 30 + pipeline_copy.pipeline[0]._meta["offset_north_px"] = 0.5 + + assert pipeline._meta != pipeline_copy._meta + assert pipeline.pipeline[0]._meta != pipeline_copy.pipeline[0]._meta + assert pipeline_copy.pipeline[0]._meta["vshift"] + + def test_pipeline(self) -> None: + warnings.simplefilter("error") + + # Create a pipeline from two coreg methods. + pipeline = coreg.CoregPipeline([coreg.VerticalShift(), coreg.NuthKaab()]) + pipeline.fit(**self.fit_params) + + aligned_dem, _ = pipeline.apply(self.tba.data, transform=self.ref.transform, crs=self.ref.crs) + + assert aligned_dem.shape == self.ref.data.squeeze().shape + + # Make a new pipeline with two vertical shift correction approaches. + pipeline2 = coreg.CoregPipeline([coreg.VerticalShift(), coreg.VerticalShift()]) + # Set both "estimated" vertical shifts to be 1 + pipeline2.pipeline[0]._meta["vshift"] = 1 + pipeline2.pipeline[1]._meta["vshift"] = 1 + + # Assert that the combined vertical shift is 2 + assert pipeline2.to_matrix()[2, 3] == 2.0 + + def test_pipeline_affine_biascorr(self) -> None: + + # Create a pipeline from one affine and one biascorr methods. + pipeline = coreg.CoregPipeline([coreg.Deramp(), coreg.NuthKaab()]) + pipeline.fit(**self.fit_params) + + aligned_dem, _ = pipeline.apply(self.tba.data, transform=self.ref.transform, crs=self.ref.crs) + assert aligned_dem.shape == self.ref.data.squeeze().shape + + def test_pipeline_pts(self) -> None: + warnings.simplefilter("ignore") + + pipeline = coreg.NuthKaab() + coreg.GradientDescending() + ref_points = self.ref.to_points(as_array=False, subset=5000, pixel_offset="center").ds + ref_points["E"] = ref_points.geometry.x + ref_points["N"] = ref_points.geometry.y + ref_points.rename(columns={"b1": "z"}, inplace=True) + + # Check that this runs without error + pipeline.fit_pts(reference_dem=ref_points, dem_to_be_aligned=self.tba) + + for part in pipeline.pipeline: + assert np.abs(part._meta["offset_east_px"]) > 0 + + assert pipeline.pipeline[0]._meta["offset_east_px"] != pipeline.pipeline[1]._meta["offset_east_px"] + + def test_coreg_add(self) -> None: + warnings.simplefilter("error") + # Test with a vertical shift of 4 + vshift = 4 + + vshift1 = coreg.VerticalShift() + vshift2 = coreg.VerticalShift() + + # Set the vertical shift attribute + for vshift_corr in (vshift1, vshift2): + vshift_corr._meta["vshift"] = vshift + + # Add the two coregs and check that the resulting vertical shift is 2* vertical shift + vshift3 = vshift1 + vshift2 + assert vshift3.to_matrix()[2, 3] == vshift * 2 + + # Make sure the correct exception is raised on incorrect additions + with pytest.raises(ValueError, match="Incompatible add type"): + vshift1 + 1 # type: ignore + + # Try to add a Coreg step to an already existing CoregPipeline + vshift4 = vshift3 + vshift1 + assert vshift4.to_matrix()[2, 3] == vshift * 3 + + # Try to add two CoregPipelines + vshift5 = vshift3 + vshift3 + assert vshift5.to_matrix()[2, 3] == vshift * 4 + + +class TestBlockwiseCoreg: + ref, tba, outlines = load_examples() # Load example reference, to-be-aligned and mask. + inlier_mask = ~outlines.create_mask(ref) + + fit_params = dict( + reference_dem=ref.data, + dem_to_be_aligned=tba.data, + inlier_mask=inlier_mask, + transform=ref.transform, + crs=ref.crs, + verbose=False, + ) + # Create some 3D coordinates with Z coordinates being 0 to try the apply_pts functions. + points = np.array([[1, 2, 3, 4], [1, 2, 3, 4], [0, 0, 0, 0]], dtype="float64").T + + @pytest.mark.parametrize( + "pipeline", [coreg.VerticalShift(), coreg.VerticalShift() + coreg.NuthKaab()] + ) # type: ignore + @pytest.mark.parametrize("subdivision", [4, 10]) # type: ignore + def test_blockwise_coreg(self, pipeline: AffineCoreg, subdivision: int) -> None: + warnings.simplefilter("error") + + blockwise = coreg.BlockwiseCoreg(step=pipeline, subdivision=subdivision) + + # Results can not yet be extracted (since fit has not been called) and should raise an error + with pytest.raises(AssertionError, match="No coreg results exist.*"): + blockwise.to_points() + + blockwise.fit(**self.fit_params) + points = blockwise.to_points() + + # Validate that the number of points is equal to the amount of subdivisions. + assert points.shape[0] == subdivision + + # Validate that the points do not represent only the same location. + assert np.sum(np.linalg.norm(points[:, :, 0] - points[:, :, 1], axis=1)) != 0.0 + + z_diff = points[:, 2, 1] - points[:, 2, 0] + + # Validate that all values are different + assert np.unique(z_diff).size == z_diff.size, "Each coreg cell should have different results." + + # Validate that the BlockwiseCoreg doesn't accept uninstantiated Coreg classes + with pytest.raises(ValueError, match="instantiated Coreg subclass"): + coreg.BlockwiseCoreg(step=coreg.VerticalShift, subdivision=1) # type: ignore + + # Metadata copying has been an issue. Validate that all chunks have unique ids + chunk_numbers = [m["i"] for m in blockwise._meta["step_meta"]] + assert np.unique(chunk_numbers).shape[0] == len(chunk_numbers) + + transformed_dem = blockwise.apply(self.tba) + + ddem_pre = (self.ref - self.tba)[~self.inlier_mask] + ddem_post = (self.ref - transformed_dem)[~self.inlier_mask] + + # Check that the periglacial difference is lower after coregistration. + assert abs(np.ma.median(ddem_post)) < abs(np.ma.median(ddem_pre)) + + stats = blockwise.stats() + + # Check that nans don't exist (if they do, something has gone very wrong) + assert np.all(np.isfinite(stats["nmad"])) + # Check that offsets were actually calculated. + assert np.sum(np.abs(np.linalg.norm(stats[["x_off", "y_off", "z_off"]], axis=0))) > 0 + + def test_blockwise_coreg_large_gaps(self) -> None: + """Test BlockwiseCoreg when large gaps are encountered, e.g. around the frame of a rotated DEM.""" + warnings.simplefilter("error") + reference_dem = self.ref.reproject(dst_crs="EPSG:3413", dst_res=self.ref.res, resampling="bilinear") + dem_to_be_aligned = self.tba.reproject(dst_ref=reference_dem, resampling="bilinear") + + blockwise = xdem.coreg.BlockwiseCoreg(xdem.coreg.NuthKaab(), 64, warn_failures=False) + + # This should not fail or trigger warnings as warn_failures is False + blockwise.fit(reference_dem, dem_to_be_aligned) + + stats = blockwise.stats() + + # We expect holes in the blockwise coregistration, so there should not be 64 "successful" blocks. + assert stats.shape[0] < 64 + + # Statistics are only calculated on finite values, so all of these should be finite as well. + assert np.all(np.isfinite(stats)) + + # Copy the TBA DEM and set a square portion to nodata + tba = self.tba.copy() + mask = np.zeros(np.shape(tba.data), dtype=bool) + mask[450:500, 450:500] = True + tba.set_mask(mask=mask) + + blockwise = xdem.coreg.BlockwiseCoreg(xdem.coreg.NuthKaab(), 8, warn_failures=False) + + # Align the DEM and apply the blockwise to a zero-array (to get the zshift) + aligned = blockwise.fit(self.ref, tba).apply(tba) + zshift, _ = blockwise.apply(np.zeros_like(tba.data), transform=tba.transform, crs=tba.crs) + + # Validate that the zshift is not something crazy high and that no negative values exist in the data. + assert np.nanmax(np.abs(zshift)) < 50 + assert np.count_nonzero(aligned.data.compressed() < -50) == 0 + + # Check that coregistration improved the alignment + ddem_post = (aligned - self.ref).data.compressed() + ddem_pre = (tba - self.ref).data.compressed() + assert abs(np.nanmedian(ddem_pre)) > abs(np.nanmedian(ddem_post)) + assert np.nanstd(ddem_pre) > np.nanstd(ddem_post) + + def test_apply_matrix() -> None: warnings.simplefilter("error") diff --git a/xdem/coreg/__init__.py b/xdem/coreg/__init__.py index 6d172f13..06a0b014 100644 --- a/xdem/coreg/__init__.py +++ b/xdem/coreg/__init__.py @@ -20,4 +20,4 @@ DirectionalBias, TerrainBias, ) -from xdem.coreg.pipelines import dem_coregistration # noqa +from xdem.coreg.workflows import dem_coregistration # noqa diff --git a/xdem/coreg/base.py b/xdem/coreg/base.py index 41ba3111..575be9a8 100644 --- a/xdem/coreg/base.py +++ b/xdem/coreg/base.py @@ -1335,7 +1335,8 @@ def _fit_func( coreg._fit_func(ref_dem, tba_dem_mod, transform=transform, crs=crs, weights=weights, verbose=verbose) coreg._fit_called = True - tba_dem_mod, out_transform = coreg.apply(tba_dem_mod, transform, crs) + # TODO: shouldn't this call _apply_func directly? + tba_dem_mod, out_transform = coreg.apply(tba_dem_mod, transform=transform, crs=crs) def _fit_pts_func( self: CoregType, @@ -1364,7 +1365,7 @@ def _apply_func( dem_mod = dem.copy() out_transform = copy.copy(transform) for coreg in self.pipeline: - dem_mod, out_transform = coreg.apply(dem_mod, out_transform, crs, **kwargs) + dem_mod, out_transform = coreg.apply(dem_mod, transform=out_transform, crs=crs, **kwargs) return dem_mod, out_transform diff --git a/xdem/coreg/biascorr.py b/xdem/coreg/biascorr.py index ac0e1ae7..09cdf0cb 100644 --- a/xdem/coreg/biascorr.py +++ b/xdem/coreg/biascorr.py @@ -219,7 +219,7 @@ def _fit_func( # type: ignore # Remove random state for keyword argument if its value is not in the optimizer function if self._fit_or_bin in ["fit", "bin_and_fit"]: fit_func_args = inspect.getfullargspec(self._meta["fit_optimizer"]).args - if "random_state" not in fit_func_args: + if "random_state" not in fit_func_args and "random_state" in kwargs: kwargs.pop("random_state") # We need to sort the bin sizes in the same order as the bias variables if a dict is passed for bin_sizes @@ -789,9 +789,9 @@ def _fit_func( # type: ignore self, ref_dem: NDArrayf, tba_dem: NDArrayf, - bias_vars: dict[str, NDArrayf], transform: rio.transform.Affine, crs: rio.crs.CRS, + bias_vars: dict[str, NDArrayf] | None = None, weights: None | NDArrayf = None, verbose: bool = False, **kwargs, diff --git a/xdem/coreg/pipelines.py b/xdem/coreg/workflows.py similarity index 100% rename from xdem/coreg/pipelines.py rename to xdem/coreg/workflows.py From 9359e7e10506378485ea52f58d70c10e512bcb5b Mon Sep 17 00:00:00 2001 From: Romain Hugonnet Date: Fri, 1 Sep 2023 11:22:44 -0800 Subject: [PATCH 2/7] Restructure coreg tests --- tests/__init__.py | 0 tests/test_coreg/test_affine.py | 299 ++++++++++ .../test_base.py} | 511 +----------------- tests/{ => test_coreg}/test_biascorr.py | 0 tests/test_coreg/test_filters.py | 1 + tests/test_coreg/test_workflows.py | 269 +++++++++ 6 files changed, 574 insertions(+), 506 deletions(-) delete mode 100644 tests/__init__.py create mode 100644 tests/test_coreg/test_affine.py rename tests/{test_coreg.py => test_coreg/test_base.py} (61%) rename tests/{ => test_coreg}/test_biascorr.py (100%) create mode 100644 tests/test_coreg/test_filters.py create mode 100644 tests/test_coreg/test_workflows.py diff --git a/tests/__init__.py b/tests/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/tests/test_coreg/test_affine.py b/tests/test_coreg/test_affine.py new file mode 100644 index 00000000..187a340f --- /dev/null +++ b/tests/test_coreg/test_affine.py @@ -0,0 +1,299 @@ +"""Functions to test the affine coregistrations.""" + +import warnings +import pytest +import copy + +import numpy as np +import rasterio as rio + +from geoutils import Raster, Vector +from geoutils.raster import RasterType +import xdem +from xdem import examples +from xdem import coreg +from xdem.coreg.affine import AffineCoreg, CoregDict + +def load_examples() -> tuple[RasterType, RasterType, Vector]: + """Load example files to try coregistration methods with.""" + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + reference_raster = Raster(examples.get_path("longyearbyen_ref_dem")) + to_be_aligned_raster = Raster(examples.get_path("longyearbyen_tba_dem")) + glacier_mask = Vector(examples.get_path("longyearbyen_glacier_outlines")) + + return reference_raster, to_be_aligned_raster, glacier_mask + + +class TestAffineCoreg: + + ref, tba, outlines = load_examples() # Load example reference, to-be-aligned and mask. + inlier_mask = ~outlines.create_mask(ref) + + fit_params = dict( + reference_dem=ref.data, + dem_to_be_aligned=tba.data, + inlier_mask=inlier_mask, + transform=ref.transform, + crs=ref.crs, + verbose=False, + ) + # Create some 3D coordinates with Z coordinates being 0 to try the apply_pts functions. + points = np.array([[1, 2, 3, 4], [1, 2, 3, 4], [0, 0, 0, 0]], dtype="float64").T + + def test_from_classmethods(self) -> None: + warnings.simplefilter("error") + + # Check that the from_matrix function works as expected. + vshift = 5 + matrix = np.diag(np.ones(4, dtype=float)) + matrix[2, 3] = vshift + coreg_obj = AffineCoreg.from_matrix(matrix) + transformed_points = coreg_obj.apply_pts(self.points) + assert transformed_points[0, 2] == vshift + + # Check that the from_translation function works as expected. + x_offset = 5 + coreg_obj2 = AffineCoreg.from_translation(x_off=x_offset) + transformed_points2 = coreg_obj2.apply_pts(self.points) + assert np.array_equal(self.points[:, 0] + x_offset, transformed_points2[:, 0]) + + # Try to make a Coreg object from a nan translation (should fail). + try: + AffineCoreg.from_translation(np.nan) + except ValueError as exception: + if "non-finite values" not in str(exception): + raise exception + + def test_vertical_shift(self) -> None: + warnings.simplefilter("error") + + # Create a vertical shift correction instance + vshiftcorr = coreg.VerticalShift() + # Fit the vertical shift model to the data + vshiftcorr.fit(**self.fit_params) + + # Check that a vertical shift was found. + assert vshiftcorr._meta.get("vshift") is not None + assert vshiftcorr._meta["vshift"] != 0.0 + + # Copy the vertical shift to see if it changes in the test (it shouldn't) + vshift = copy.copy(vshiftcorr._meta["vshift"]) + + # Check that the to_matrix function works as it should + matrix = vshiftcorr.to_matrix() + assert matrix[2, 3] == vshift, matrix + + # Check that the first z coordinate is now the vertical shift + assert vshiftcorr.apply_pts(self.points)[0, 2] == vshiftcorr._meta["vshift"] + + # Apply the model to correct the DEM + tba_unshifted, _ = vshiftcorr.apply(self.tba.data, self.ref.transform, self.ref.crs) + + # Create a new vertical shift correction model + vshiftcorr2 = coreg.VerticalShift() + # Check that this is indeed a new object + assert vshiftcorr is not vshiftcorr2 + # Fit the corrected DEM to see if the vertical shift will be close to or at zero + vshiftcorr2.fit( + reference_dem=self.ref.data, + dem_to_be_aligned=tba_unshifted, + transform=self.ref.transform, + crs=self.ref.crs, + inlier_mask=self.inlier_mask, + ) + # Test the vertical shift + newmeta: CoregDict = vshiftcorr2._meta + new_vshift = newmeta["vshift"] + assert np.abs(new_vshift) < 0.01 + + # Check that the original model's vertical shift has not changed + # (that the _meta dicts are two different objects) + assert vshiftcorr._meta["vshift"] == vshift + + def test_all_nans(self) -> None: + """Check that the coregistration approaches fail gracefully when given only nans.""" + dem1 = np.ones((50, 50), dtype=float) + dem2 = dem1.copy() + np.nan + affine = rio.transform.from_origin(0, 0, 1, 1) + crs = rio.crs.CRS.from_epsg(4326) + + vshiftcorr = coreg.VerticalShift() + icp = coreg.ICP() + + pytest.raises(ValueError, vshiftcorr.fit, dem1, dem2, transform=affine) + pytest.raises(ValueError, icp.fit, dem1, dem2, transform=affine) + + dem2[[3, 20, 40], [2, 21, 41]] = 1.2 + + vshiftcorr.fit(dem1, dem2, transform=affine, crs=crs) + + pytest.raises(ValueError, icp.fit, dem1, dem2, transform=affine) + + def test_coreg_example(self, verbose: bool = False) -> None: + """ + Test the co-registration outputs performed on the example are always the same. This overlaps with the test in + test_examples.py, but helps identify from where differences arise. + """ + + # Run co-registration + nuth_kaab = xdem.coreg.NuthKaab() + nuth_kaab.fit(self.ref, self.tba, inlier_mask=self.inlier_mask, verbose=verbose) + + # Check the output metadata is always the same + assert nuth_kaab._meta["offset_east_px"] == pytest.approx(-0.46255704521968716) + assert nuth_kaab._meta["offset_north_px"] == pytest.approx(-0.13618536563846081) + assert nuth_kaab._meta["vshift"] == pytest.approx(-1.9815309753424906) + + def test_gradientdescending( + self, downsampling: int = 10000, samples: int = 5000, inlier_mask: bool = True, verbose: bool = False + ) -> None: + """ + Test the co-registration outputs performed on the example are always the same. This overlaps with the test in + test_examples.py, but helps identify from where differences arise. + + It also implicitly tests the z_name kwarg and whether a geometry column can be provided instead of E/N cols. + """ + if inlier_mask: + inlier_mask = self.inlier_mask + + # Run co-registration + gds = xdem.coreg.GradientDescending(downsampling=downsampling) + gds.fit_pts( + self.ref.to_points().ds, self.tba, inlier_mask=inlier_mask, verbose=verbose, samples=samples, z_name="b1" + ) + assert gds._meta["offset_east_px"] == pytest.approx(-0.496000, rel=1e-1, abs=0.1) + assert gds._meta["offset_north_px"] == pytest.approx(-0.1875, rel=1e-1, abs=0.1) + assert gds._meta["vshift"] == pytest.approx(-1.8730, rel=1e-1) + + @pytest.mark.parametrize("shift_px", [(1, 1), (2, 2)]) # type: ignore + @pytest.mark.parametrize("coreg_class", [coreg.NuthKaab, coreg.GradientDescending, coreg.ICP]) # type: ignore + @pytest.mark.parametrize("points_or_raster", ["raster", "points"]) + def test_coreg_example_shift(self, shift_px, coreg_class, points_or_raster, verbose=False, downsampling=5000): + """ + For comparison of coreg algorithms: + Shift a ref_dem on purpose, e.g. shift_px = (1,1), and then applying coreg to shift it back. + """ + warnings.simplefilter("error") + res = self.ref.res[0] + + # shift DEM by shift_px + shifted_ref = self.ref.copy() + shifted_ref.shift(shift_px[0] * res, shift_px[1] * res) + + shifted_ref_points = shifted_ref.to_points(as_array=False, subset=downsampling, pixel_offset="center").ds + shifted_ref_points["E"] = shifted_ref_points.geometry.x + shifted_ref_points["N"] = shifted_ref_points.geometry.y + shifted_ref_points.rename(columns={"b1": "z"}, inplace=True) + + kwargs = {} if coreg_class.__name__ != "GradientDescending" else {"downsampling": downsampling} + + coreg_obj = coreg_class(**kwargs) + + best_east_diff = 1e5 + best_north_diff = 1e5 + if points_or_raster == "raster": + coreg_obj.fit(shifted_ref, self.ref, verbose=verbose) + elif points_or_raster == "points": + coreg_obj.fit_pts(shifted_ref_points, self.ref, verbose=verbose) + + if coreg_class.__name__ == "ICP": + matrix = coreg_obj.to_matrix() + # The ICP fit only creates a matrix and doesn't normally show the alignment in pixels + # Since the test is formed to validate pixel shifts, these calls extract the approximate pixel shift + # from the matrix (it's not perfect since rotation/scale can change it). + coreg_obj._meta["offset_east_px"] = -matrix[0][3] / res + coreg_obj._meta["offset_north_px"] = -matrix[1][3] / res + + # ICP can never be expected to be much better than 1px on structured data, as its implementation often finds a + # minimum between two grid points. This is clearly warned for in the documentation. + precision = 1e-2 if coreg_class.__name__ != "ICP" else 1 + + if coreg_obj._meta["offset_east_px"] == pytest.approx(-shift_px[0], rel=precision) and coreg_obj._meta[ + "offset_north_px" + ] == pytest.approx(-shift_px[0], rel=precision): + return + best_east_diff = coreg_obj._meta["offset_east_px"] - shift_px[0] + best_north_diff = coreg_obj._meta["offset_north_px"] - shift_px[1] + + raise AssertionError(f"Diffs are too big. east: {best_east_diff:.2f} px, north: {best_north_diff:.2f} px") + + def test_nuth_kaab(self) -> None: + warnings.simplefilter("error") + + nuth_kaab = coreg.NuthKaab(max_iterations=10) + + # Synthesize a shifted and vertically offset DEM + pixel_shift = 2 + vshift = 5 + shifted_dem = self.ref.data.squeeze().copy() + shifted_dem[:, pixel_shift:] = shifted_dem[:, :-pixel_shift] + shifted_dem[:, :pixel_shift] = np.nan + shifted_dem += vshift + + # Fit the synthesized shifted DEM to the original + nuth_kaab.fit( + self.ref.data.squeeze(), + shifted_dem, + transform=self.ref.transform, + crs=self.ref.crs, + verbose=self.fit_params["verbose"], + ) + + # Make sure that the estimated offsets are similar to what was synthesized. + assert nuth_kaab._meta["offset_east_px"] == pytest.approx(pixel_shift, abs=0.03) + assert nuth_kaab._meta["offset_north_px"] == pytest.approx(0, abs=0.03) + assert nuth_kaab._meta["vshift"] == pytest.approx(-vshift, 0.03) + + # Apply the estimated shift to "revert the DEM" to its original state. + unshifted_dem, _ = nuth_kaab.apply(shifted_dem, transform=self.ref.transform, crs=self.ref.crs) + # Measure the difference (should be more or less zero) + diff = self.ref.data.squeeze() - unshifted_dem + diff = diff.compressed() # turn into a 1D array with only unmasked values + + # Check that the median is very close to zero + assert np.abs(np.median(diff)) < 0.01 + # Check that the RMSE is low + assert np.sqrt(np.mean(np.square(diff))) < 1 + + # Transform some arbitrary points. + transformed_points = nuth_kaab.apply_pts(self.points) + + # Check that the x shift is close to the pixel_shift * image resolution + assert abs((transformed_points[0, 0] - self.points[0, 0]) - pixel_shift * self.ref.res[0]) < 0.1 + # Check that the z shift is close to the original vertical shift. + assert abs((transformed_points[0, 2] - self.points[0, 2]) + vshift) < 0.1 + + def test_deramping(self) -> None: + warnings.simplefilter("error") + + # Try a 1st degree deramping. + deramp = coreg.Tilt() + + # Fit the data + deramp.fit(**self.fit_params) + + # Apply the deramping to a DEM + deramped_dem = deramp.apply(self.tba) + + # Get the periglacial offset after deramping + periglacial_offset = (self.ref - deramped_dem)[self.inlier_mask] + # Get the periglacial offset before deramping + pre_offset = (self.ref - self.tba)[self.inlier_mask] + + # Check that the error improved + assert np.abs(np.mean(periglacial_offset)) < np.abs(np.mean(pre_offset)) + + # Check that the mean periglacial offset is low + assert np.abs(np.mean(periglacial_offset)) < 1 + + def test_icp_opencv(self) -> None: + warnings.simplefilter("error") + + # Do a fast and dirty 3 iteration ICP just to make sure it doesn't error out. + icp = coreg.ICP(max_iterations=3) + icp.fit(**self.fit_params) + + aligned_dem, _ = icp.apply(self.tba.data, self.ref.transform, self.ref.crs) + + assert aligned_dem.shape == self.ref.data.squeeze().shape diff --git a/tests/test_coreg.py b/tests/test_coreg/test_base.py similarity index 61% rename from tests/test_coreg.py rename to tests/test_coreg/test_base.py index ae9f58fe..cd853080 100644 --- a/tests/test_coreg.py +++ b/tests/test_coreg/test_base.py @@ -1,15 +1,13 @@ -"""Functions to test the coregistration tools.""" +"""Functions to test the coregistration base classes.""" + from __future__ import annotations import copy -import os -import tempfile import warnings from typing import Any, Callable import geoutils as gu import numpy as np -import pandas as pd import pytest import rasterio as rio from geoutils import Raster, Vector @@ -20,8 +18,7 @@ import xdem from xdem import coreg, examples, misc, spatialstats from xdem._typing import NDArrayf - from xdem.coreg.affine import AffineCoreg - from xdem.coreg.base import CoregDict, apply_matrix + from xdem.coreg.base import Coreg, CoregDict, apply_matrix def load_examples() -> tuple[RasterType, RasterType, Vector]: @@ -51,32 +48,8 @@ class TestCoregClass: # Create some 3D coordinates with Z coordinates being 0 to try the apply_pts functions. points = np.array([[1, 2, 3, 4], [1, 2, 3, 4], [0, 0, 0, 0]], dtype="float64").T - def test_from_classmethods(self) -> None: - warnings.simplefilter("error") - - # Check that the from_matrix function works as expected. - vshift = 5 - matrix = np.diag(np.ones(4, dtype=float)) - matrix[2, 3] = vshift - coreg_obj = AffineCoreg.from_matrix(matrix) - transformed_points = coreg_obj.apply_pts(self.points) - assert transformed_points[0, 2] == vshift - - # Check that the from_translation function works as expected. - x_offset = 5 - coreg_obj2 = AffineCoreg.from_translation(x_off=x_offset) - transformed_points2 = coreg_obj2.apply_pts(self.points) - assert np.array_equal(self.points[:, 0] + x_offset, transformed_points2[:, 0]) - - # Try to make a Coreg object from a nan translation (should fail). - try: - AffineCoreg.from_translation(np.nan) - except ValueError as exception: - if "non-finite values" not in str(exception): - raise exception - @pytest.mark.parametrize("coreg_class", [coreg.VerticalShift, coreg.ICP, coreg.NuthKaab]) # type: ignore - def test_copy(self, coreg_class: Callable[[], AffineCoreg]) -> None: + def test_copy(self, coreg_class: Callable[[], Coreg]) -> None: """Test that copying work expectedly (that no attributes still share references).""" warnings.simplefilter("error") @@ -91,70 +64,6 @@ def test_copy(self, coreg_class: Callable[[], AffineCoreg]) -> None: assert corr_copy._meta != corr._meta assert not hasattr(corr_copy, "vshift") - def test_vertical_shift(self) -> None: - warnings.simplefilter("error") - - # Create a vertical shift correction instance - vshiftcorr = coreg.VerticalShift() - # Fit the vertical shift model to the data - vshiftcorr.fit(**self.fit_params) - - # Check that a vertical shift was found. - assert vshiftcorr._meta.get("vshift") is not None - assert vshiftcorr._meta["vshift"] != 0.0 - - # Copy the vertical shift to see if it changes in the test (it shouldn't) - vshift = copy.copy(vshiftcorr._meta["vshift"]) - - # Check that the to_matrix function works as it should - matrix = vshiftcorr.to_matrix() - assert matrix[2, 3] == vshift, matrix - - # Check that the first z coordinate is now the vertical shift - assert vshiftcorr.apply_pts(self.points)[0, 2] == vshiftcorr._meta["vshift"] - - # Apply the model to correct the DEM - tba_unshifted, _ = vshiftcorr.apply(self.tba.data, self.ref.transform, self.ref.crs) - - # Create a new vertical shift correction model - vshiftcorr2 = coreg.VerticalShift() - # Check that this is indeed a new object - assert vshiftcorr is not vshiftcorr2 - # Fit the corrected DEM to see if the vertical shift will be close to or at zero - vshiftcorr2.fit( - reference_dem=self.ref.data, - dem_to_be_aligned=tba_unshifted, - transform=self.ref.transform, - crs=self.ref.crs, - inlier_mask=self.inlier_mask, - ) - # Test the vertical shift - newmeta: CoregDict = vshiftcorr2._meta - new_vshift = newmeta["vshift"] - assert np.abs(new_vshift) < 0.01 - - # Check that the original model's vertical shift has not changed - # (that the _meta dicts are two different objects) - assert vshiftcorr._meta["vshift"] == vshift - - def test_all_nans(self) -> None: - """Check that the coregistration approaches fail gracefully when given only nans.""" - dem1 = np.ones((50, 50), dtype=float) - dem2 = dem1.copy() + np.nan - affine = rio.transform.from_origin(0, 0, 1, 1) - crs = rio.crs.CRS.from_epsg(4326) - - vshiftcorr = coreg.VerticalShift() - icp = coreg.ICP() - - pytest.raises(ValueError, vshiftcorr.fit, dem1, dem2, transform=affine) - pytest.raises(ValueError, icp.fit, dem1, dem2, transform=affine) - - dem2[[3, 20, 40], [2, 21, 41]] = 1.2 - - vshiftcorr.fit(dem1, dem2, transform=affine, crs=crs) - - pytest.raises(ValueError, icp.fit, dem1, dem2, transform=affine) def test_error_method(self) -> None: """Test different error measures.""" @@ -189,174 +98,6 @@ def test_ij_xy(self, i: int = 10, j: int = 20) -> None: assert i == pytest.approx(10) assert j == pytest.approx(20) - def test_coreg_example(self, verbose: bool = False) -> None: - """ - Test the co-registration outputs performed on the example are always the same. This overlaps with the test in - test_examples.py, but helps identify from where differences arise. - """ - - # Run co-registration - nuth_kaab = xdem.coreg.NuthKaab() - nuth_kaab.fit(self.ref, self.tba, inlier_mask=self.inlier_mask, verbose=verbose) - - # Check the output metadata is always the same - assert nuth_kaab._meta["offset_east_px"] == pytest.approx(-0.46255704521968716) - assert nuth_kaab._meta["offset_north_px"] == pytest.approx(-0.13618536563846081) - assert nuth_kaab._meta["vshift"] == pytest.approx(-1.9815309753424906) - - def test_gradientdescending( - self, downsampling: int = 10000, samples: int = 5000, inlier_mask: bool = True, verbose: bool = False - ) -> None: - """ - Test the co-registration outputs performed on the example are always the same. This overlaps with the test in - test_examples.py, but helps identify from where differences arise. - - It also implicitly tests the z_name kwarg and whether a geometry column can be provided instead of E/N cols. - """ - if inlier_mask: - inlier_mask = self.inlier_mask - - # Run co-registration - gds = xdem.coreg.GradientDescending(downsampling=downsampling) - gds.fit_pts( - self.ref.to_points().ds, self.tba, inlier_mask=inlier_mask, verbose=verbose, samples=samples, z_name="b1" - ) - assert gds._meta["offset_east_px"] == pytest.approx(-0.496000, rel=1e-1, abs=0.1) - assert gds._meta["offset_north_px"] == pytest.approx(-0.1875, rel=1e-1, abs=0.1) - assert gds._meta["vshift"] == pytest.approx(-1.8730, rel=1e-1) - - @pytest.mark.parametrize("shift_px", [(1, 1), (2, 2)]) # type: ignore - @pytest.mark.parametrize("coreg_class", [coreg.NuthKaab, coreg.GradientDescending, coreg.ICP]) # type: ignore - @pytest.mark.parametrize("points_or_raster", ["raster", "points"]) - def test_coreg_example_shift(self, shift_px, coreg_class, points_or_raster, verbose=False, downsampling=5000): - """ - For comparison of coreg algorithms: - Shift a ref_dem on purpose, e.g. shift_px = (1,1), and then applying coreg to shift it back. - """ - warnings.simplefilter("error") - res = self.ref.res[0] - - # shift DEM by shift_px - shifted_ref = self.ref.copy() - shifted_ref.shift(shift_px[0] * res, shift_px[1] * res) - - shifted_ref_points = shifted_ref.to_points(as_array=False, subset=downsampling, pixel_offset="center").ds - shifted_ref_points["E"] = shifted_ref_points.geometry.x - shifted_ref_points["N"] = shifted_ref_points.geometry.y - shifted_ref_points.rename(columns={"b1": "z"}, inplace=True) - - kwargs = {} if coreg_class.__name__ != "GradientDescending" else {"downsampling": downsampling} - - coreg_obj = coreg_class(**kwargs) - - best_east_diff = 1e5 - best_north_diff = 1e5 - if points_or_raster == "raster": - coreg_obj.fit(shifted_ref, self.ref, verbose=verbose) - elif points_or_raster == "points": - coreg_obj.fit_pts(shifted_ref_points, self.ref, verbose=verbose) - - if coreg_class.__name__ == "ICP": - matrix = coreg_obj.to_matrix() - # The ICP fit only creates a matrix and doesn't normally show the alignment in pixels - # Since the test is formed to validate pixel shifts, these calls extract the approximate pixel shift - # from the matrix (it's not perfect since rotation/scale can change it). - coreg_obj._meta["offset_east_px"] = -matrix[0][3] / res - coreg_obj._meta["offset_north_px"] = -matrix[1][3] / res - - # ICP can never be expected to be much better than 1px on structured data, as its implementation often finds a - # minimum between two grid points. This is clearly warned for in the documentation. - precision = 1e-2 if coreg_class.__name__ != "ICP" else 1 - - if coreg_obj._meta["offset_east_px"] == pytest.approx(-shift_px[0], rel=precision) and coreg_obj._meta[ - "offset_north_px" - ] == pytest.approx(-shift_px[0], rel=precision): - return - best_east_diff = coreg_obj._meta["offset_east_px"] - shift_px[0] - best_north_diff = coreg_obj._meta["offset_north_px"] - shift_px[1] - - raise AssertionError(f"Diffs are too big. east: {best_east_diff:.2f} px, north: {best_north_diff:.2f} px") - - def test_nuth_kaab(self) -> None: - warnings.simplefilter("error") - - nuth_kaab = coreg.NuthKaab(max_iterations=10) - - # Synthesize a shifted and vertically offset DEM - pixel_shift = 2 - vshift = 5 - shifted_dem = self.ref.data.squeeze().copy() - shifted_dem[:, pixel_shift:] = shifted_dem[:, :-pixel_shift] - shifted_dem[:, :pixel_shift] = np.nan - shifted_dem += vshift - - # Fit the synthesized shifted DEM to the original - nuth_kaab.fit( - self.ref.data.squeeze(), - shifted_dem, - transform=self.ref.transform, - crs=self.ref.crs, - verbose=self.fit_params["verbose"], - ) - - # Make sure that the estimated offsets are similar to what was synthesized. - assert nuth_kaab._meta["offset_east_px"] == pytest.approx(pixel_shift, abs=0.03) - assert nuth_kaab._meta["offset_north_px"] == pytest.approx(0, abs=0.03) - assert nuth_kaab._meta["vshift"] == pytest.approx(-vshift, 0.03) - - # Apply the estimated shift to "revert the DEM" to its original state. - unshifted_dem, _ = nuth_kaab.apply(shifted_dem, transform=self.ref.transform, crs=self.ref.crs) - # Measure the difference (should be more or less zero) - diff = self.ref.data.squeeze() - unshifted_dem - diff = diff.compressed() # turn into a 1D array with only unmasked values - - # Check that the median is very close to zero - assert np.abs(np.median(diff)) < 0.01 - # Check that the RMSE is low - assert np.sqrt(np.mean(np.square(diff))) < 1 - - # Transform some arbitrary points. - transformed_points = nuth_kaab.apply_pts(self.points) - - # Check that the x shift is close to the pixel_shift * image resolution - assert abs((transformed_points[0, 0] - self.points[0, 0]) - pixel_shift * self.ref.res[0]) < 0.1 - # Check that the z shift is close to the original vertical shift. - assert abs((transformed_points[0, 2] - self.points[0, 2]) + vshift) < 0.1 - - def test_deramping(self) -> None: - warnings.simplefilter("error") - - # Try a 1st degree deramping. - deramp = coreg.Tilt() - - # Fit the data - deramp.fit(**self.fit_params) - - # Apply the deramping to a DEM - deramped_dem = deramp.apply(self.tba) - - # Get the periglacial offset after deramping - periglacial_offset = (self.ref - deramped_dem)[self.inlier_mask] - # Get the periglacial offset before deramping - pre_offset = (self.ref - self.tba)[self.inlier_mask] - - # Check that the error improved - assert np.abs(np.mean(periglacial_offset)) < np.abs(np.mean(pre_offset)) - - # Check that the mean periglacial offset is low - assert np.abs(np.mean(periglacial_offset)) < 1 - - def test_icp_opencv(self) -> None: - warnings.simplefilter("error") - - # Do a fast and dirty 3 iteration ICP just to make sure it doesn't error out. - icp = coreg.ICP(max_iterations=3) - icp.fit(**self.fit_params) - - aligned_dem, _ = icp.apply(self.tba.data, self.ref.transform, self.ref.crs) - - assert aligned_dem.shape == self.ref.data.squeeze().shape - def test_subsample(self) -> None: warnings.simplefilter("error") @@ -1092,246 +833,4 @@ def test_warp_dem() -> None: plt.subplot(144) plt.imshow(dem - untransformed_dem, cmap="coolwarm_r", vmin=-10, vmax=10) - plt.show() - - -def test_create_inlier_mask() -> None: - """Test that the create_inlier_mask function works expectedly.""" - warnings.simplefilter("error") - - ref, tba, outlines = load_examples() # Load example reference, to-be-aligned and outlines - - # - Assert that without filtering create_inlier_mask behaves as if calling Vector.create_mask - # - # Masking inside - using Vector - inlier_mask_comp = ~outlines.create_mask(ref, as_array=True) - inlier_mask = xdem.coreg.pipelines.create_inlier_mask( - tba, - ref, - [ - outlines, - ], - filtering=False, - ) - assert np.all(inlier_mask_comp == inlier_mask) - - # Masking inside - using string - inlier_mask = xdem.coreg.pipelines.create_inlier_mask( - tba, - ref, - [ - outlines.name, - ], - filtering=False, - ) - assert np.all(inlier_mask_comp == inlier_mask) - - # Masking outside - using Vector - inlier_mask = xdem.coreg.pipelines.create_inlier_mask( - tba, - ref, - [ - outlines, - ], - inout=[ - -1, - ], - filtering=False, - ) - assert np.all(~inlier_mask_comp == inlier_mask) - - # Masking outside - using string - inlier_mask = xdem.coreg.pipelines.create_inlier_mask( - tba, - ref, - [ - outlines.name, - ], - inout=[-1], - filtering=False, - ) - assert np.all(~inlier_mask_comp == inlier_mask) - - # - Test filtering options only - # - # Test the slope filter only - slope = xdem.terrain.slope(ref) - slope_lim = [1, 50] - inlier_mask_comp2 = np.ones(tba.data.shape, dtype=bool) - inlier_mask_comp2[slope.data < slope_lim[0]] = False - inlier_mask_comp2[slope.data > slope_lim[1]] = False - inlier_mask = xdem.coreg.pipelines.create_inlier_mask( - tba, ref, filtering=True, slope_lim=slope_lim, nmad_factor=np.inf - ) - assert np.all(inlier_mask == inlier_mask_comp2) - - # Test the nmad_factor filter only - nmad_factor = 3 - ddem = tba - ref - inlier_mask_comp3 = (np.abs(ddem.data - np.median(ddem)) < nmad_factor * xdem.spatialstats.nmad(ddem)).filled(False) - inlier_mask = xdem.coreg.pipelines.create_inlier_mask( - tba, ref, filtering=True, slope_lim=[0, 90], nmad_factor=nmad_factor - ) - assert np.all(inlier_mask == inlier_mask_comp3) - - # Test the sum of both - inlier_mask = xdem.coreg.pipelines.create_inlier_mask( - tba, ref, shp_list=[], inout=[], filtering=True, slope_lim=slope_lim, nmad_factor=nmad_factor - ) - inlier_mask_all = inlier_mask_comp2 & inlier_mask_comp3 - assert np.all(inlier_mask == inlier_mask_all) - - # Test the dh_max filter only - dh_max = 200 - inlier_mask_comp4 = (np.abs(ddem.data) < dh_max).filled(False) - inlier_mask = xdem.coreg.pipelines.create_inlier_mask( - tba, ref, filtering=True, slope_lim=[0, 90], nmad_factor=np.inf, dh_max=dh_max - ) - assert np.all(inlier_mask == inlier_mask_comp4) - - # - Test the sum of outlines + dh_max + slope - # - # nmad_factor will have a different behavior because it calculates nmad from the inliers of previous filters - inlier_mask = xdem.coreg.pipelines.create_inlier_mask( - tba, - ref, - shp_list=[ - outlines, - ], - inout=[ - -1, - ], - filtering=True, - slope_lim=slope_lim, - nmad_factor=np.inf, - dh_max=dh_max, - ) - inlier_mask_all = ~inlier_mask_comp & inlier_mask_comp2 & inlier_mask_comp4 - assert np.all(inlier_mask == inlier_mask_all) - - # - Test that proper errors are raised for wrong inputs - # - with pytest.raises(ValueError, match="`shp_list` must be a list/tuple"): - inlier_mask = xdem.coreg.pipelines.create_inlier_mask(tba, ref, shp_list=outlines) - - with pytest.raises(ValueError, match="`shp_list` must be a list/tuple of strings or geoutils.Vector instance"): - inlier_mask = xdem.coreg.pipelines.create_inlier_mask(tba, ref, shp_list=[1]) - - with pytest.raises(ValueError, match="`inout` must be a list/tuple"): - inlier_mask = xdem.coreg.pipelines.create_inlier_mask( - tba, - ref, - shp_list=[ - outlines, - ], - inout=1, # type: ignore - ) - - with pytest.raises(ValueError, match="`inout` must contain only 1 and -1"): - inlier_mask = xdem.coreg.pipelines.create_inlier_mask( - tba, - ref, - shp_list=[ - outlines, - ], - inout=[ - 0, - ], - ) - - with pytest.raises(ValueError, match="`inout` must be of same length as shp"): - inlier_mask = xdem.coreg.pipelines.create_inlier_mask( - tba, - ref, - shp_list=[ - outlines, - ], - inout=[1, 1], - ) - - with pytest.raises(ValueError, match="`slope_lim` must be a list/tuple"): - inlier_mask = xdem.coreg.pipelines.create_inlier_mask(tba, ref, filtering=True, slope_lim=1) # type: ignore - - with pytest.raises(ValueError, match="`slope_lim` must contain 2 elements"): - inlier_mask = xdem.coreg.pipelines.create_inlier_mask(tba, ref, filtering=True, slope_lim=[30]) - - with pytest.raises(ValueError, match=r"`slope_lim` must be a tuple/list of 2 elements in the range \[0-90\]"): - inlier_mask = xdem.coreg.pipelines.create_inlier_mask(tba, ref, filtering=True, slope_lim=[-1, 40]) - - with pytest.raises(ValueError, match=r"`slope_lim` must be a tuple/list of 2 elements in the range \[0-90\]"): - inlier_mask = xdem.coreg.pipelines.create_inlier_mask(tba, ref, filtering=True, slope_lim=[1, 120]) - - -@pytest.mark.skip(reason="The test segfaults locally and in CI (2023-08-21)") # type: ignore -def test_dem_coregistration() -> None: - """ - Test that the dem_coregistration function works expectedly. - Tests the features that are specific to dem_coregistration. - For example, many features are tested in create_inlier_mask, so not tested again here. - TODO: Add DEMs with different projection/grid to test that regridding works as expected. - """ - # Load example reference, to-be-aligned and outlines - ref_dem, tba_dem, outlines = load_examples() - - # - Check that it works with default parameters - # - dem_coreg, coreg_method, coreg_stats, inlier_mask = xdem.coreg.dem_coregistration(tba_dem, ref_dem) - - # Assert that outputs have expected format - assert isinstance(dem_coreg, xdem.DEM) - assert isinstance(coreg_method, xdem.coreg.Coreg) - assert isinstance(coreg_stats, pd.DataFrame) - - # Assert that default coreg_method is as expected - assert hasattr(coreg_method, "pipeline") - assert isinstance(coreg_method.pipeline[0], xdem.coreg.NuthKaab) - assert isinstance(coreg_method.pipeline[1], xdem.coreg.VerticalShift) - - # The result should be similar to applying the same coreg by hand with: - # - DEMs converted to Float32 - # - default inlier_mask - # - no resampling - coreg_method_ref = xdem.coreg.NuthKaab() + xdem.coreg.VerticalShift() - inlier_mask = xdem.coreg.pipelines.create_inlier_mask(tba_dem, ref_dem) - coreg_method_ref.fit(ref_dem.astype("float32"), tba_dem.astype("float32"), inlier_mask=inlier_mask) - dem_coreg_ref = coreg_method_ref.apply(tba_dem, resample=False) - assert dem_coreg == dem_coreg_ref - - # Assert that coregistration improved the residuals - assert abs(coreg_stats["med_orig"].values) > abs(coreg_stats["med_coreg"].values) - assert coreg_stats["nmad_orig"].values > coreg_stats["nmad_coreg"].values - - # - Check some alternative arguments - # - # Test with filename instead of DEMs - dem_coreg2, _, _, _ = xdem.coreg.dem_coregistration(tba_dem.filename, ref_dem.filename) - assert dem_coreg2 == dem_coreg - - # Test saving to file (mode = "w" is necessary to work on Windows) - outfile = tempfile.NamedTemporaryFile(suffix=".tif", mode="w", delete=False) - xdem.coreg.dem_coregistration(tba_dem, ref_dem, out_dem_path=outfile.name) - dem_coreg2 = xdem.DEM(outfile.name) - assert dem_coreg2 == dem_coreg - outfile.close() - - # Test that shapefile is properly taken into account - inlier_mask should be False inside outlines - # Need to use resample=True, to ensure that dem_coreg has same georef as inlier_mask - dem_coreg, coreg_method, coreg_stats, inlier_mask = xdem.coreg.dem_coregistration( - tba_dem, - ref_dem, - shp_list=[ - outlines, - ], - resample=True, - ) - gl_mask = outlines.create_mask(dem_coreg, as_array=True) - assert np.all(~inlier_mask[gl_mask]) - - # Testing with plot - out_fig = tempfile.NamedTemporaryFile(suffix=".png", mode="w", delete=False) - assert os.path.getsize(out_fig.name) == 0 - xdem.coreg.dem_coregistration(tba_dem, ref_dem, plot=True, out_fig=out_fig.name) - assert os.path.getsize(out_fig.name) > 0 - out_fig.close() - - # Testing different coreg method - dem_coreg, coreg_method, coreg_stats, inlier_mask = xdem.coreg.dem_coregistration( - tba_dem, ref_dem, coreg_method=xdem.coreg.Tilt() - ) - assert isinstance(coreg_method, xdem.coreg.Tilt) - assert abs(coreg_stats["med_orig"].values) > abs(coreg_stats["med_coreg"].values) - assert coreg_stats["nmad_orig"].values > coreg_stats["nmad_coreg"].values + plt.show() \ No newline at end of file diff --git a/tests/test_biascorr.py b/tests/test_coreg/test_biascorr.py similarity index 100% rename from tests/test_biascorr.py rename to tests/test_coreg/test_biascorr.py diff --git a/tests/test_coreg/test_filters.py b/tests/test_coreg/test_filters.py new file mode 100644 index 00000000..9d51106b --- /dev/null +++ b/tests/test_coreg/test_filters.py @@ -0,0 +1 @@ +"""Functions to test the coregistration filters.""" diff --git a/tests/test_coreg/test_workflows.py b/tests/test_coreg/test_workflows.py new file mode 100644 index 00000000..485d1f82 --- /dev/null +++ b/tests/test_coreg/test_workflows.py @@ -0,0 +1,269 @@ +"""Functions to test the coregistration workflows.""" + +import os +import warnings +import tempfile + +import numpy as np +import pandas as pd +import pytest + +from geoutils import Raster, Vector +from geoutils.raster import RasterType +import xdem +from xdem import examples +from xdem.coreg.workflows import create_inlier_mask, dem_coregistration + + +def load_examples() -> tuple[RasterType, RasterType, Vector]: + """Load example files to try coregistration methods with.""" + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + reference_raster = Raster(examples.get_path("longyearbyen_ref_dem")) + to_be_aligned_raster = Raster(examples.get_path("longyearbyen_tba_dem")) + glacier_mask = Vector(examples.get_path("longyearbyen_glacier_outlines")) + + return reference_raster, to_be_aligned_raster, glacier_mask + +class TestWorkflows: + + def test_create_inlier_mask(self) -> None: + """Test that the create_inlier_mask function works expectedly.""" + warnings.simplefilter("error") + + ref, tba, outlines = load_examples() # Load example reference, to-be-aligned and outlines + + # - Assert that without filtering create_inlier_mask behaves as if calling Vector.create_mask - # + # Masking inside - using Vector + inlier_mask_comp = ~outlines.create_mask(ref, as_array=True) + inlier_mask = create_inlier_mask( + tba, + ref, + [ + outlines, + ], + filtering=False, + ) + assert np.all(inlier_mask_comp == inlier_mask) + + # Masking inside - using string + inlier_mask = create_inlier_mask( + tba, + ref, + [ + outlines.name, + ], + filtering=False, + ) + assert np.all(inlier_mask_comp == inlier_mask) + + # Masking outside - using Vector + inlier_mask = create_inlier_mask( + tba, + ref, + [ + outlines, + ], + inout=[ + -1, + ], + filtering=False, + ) + assert np.all(~inlier_mask_comp == inlier_mask) + + # Masking outside - using string + inlier_mask = create_inlier_mask( + tba, + ref, + [ + outlines.name, + ], + inout=[-1], + filtering=False, + ) + assert np.all(~inlier_mask_comp == inlier_mask) + + # - Test filtering options only - # + # Test the slope filter only + slope = xdem.terrain.slope(ref) + slope_lim = [1, 50] + inlier_mask_comp2 = np.ones(tba.data.shape, dtype=bool) + inlier_mask_comp2[slope.data < slope_lim[0]] = False + inlier_mask_comp2[slope.data > slope_lim[1]] = False + inlier_mask = create_inlier_mask( + tba, ref, filtering=True, slope_lim=slope_lim, nmad_factor=np.inf + ) + assert np.all(inlier_mask == inlier_mask_comp2) + + # Test the nmad_factor filter only + nmad_factor = 3 + ddem = tba - ref + inlier_mask_comp3 = (np.abs(ddem.data - np.median(ddem)) < nmad_factor * xdem.spatialstats.nmad(ddem)).filled(False) + inlier_mask = create_inlier_mask( + tba, ref, filtering=True, slope_lim=[0, 90], nmad_factor=nmad_factor + ) + assert np.all(inlier_mask == inlier_mask_comp3) + + # Test the sum of both + inlier_mask = create_inlier_mask( + tba, ref, shp_list=[], inout=[], filtering=True, slope_lim=slope_lim, nmad_factor=nmad_factor + ) + inlier_mask_all = inlier_mask_comp2 & inlier_mask_comp3 + assert np.all(inlier_mask == inlier_mask_all) + + # Test the dh_max filter only + dh_max = 200 + inlier_mask_comp4 = (np.abs(ddem.data) < dh_max).filled(False) + inlier_mask = create_inlier_mask( + tba, ref, filtering=True, slope_lim=[0, 90], nmad_factor=np.inf, dh_max=dh_max + ) + assert np.all(inlier_mask == inlier_mask_comp4) + + # - Test the sum of outlines + dh_max + slope - # + # nmad_factor will have a different behavior because it calculates nmad from the inliers of previous filters + inlier_mask = create_inlier_mask( + tba, + ref, + shp_list=[ + outlines, + ], + inout=[ + -1, + ], + filtering=True, + slope_lim=slope_lim, + nmad_factor=np.inf, + dh_max=dh_max, + ) + inlier_mask_all = ~inlier_mask_comp & inlier_mask_comp2 & inlier_mask_comp4 + assert np.all(inlier_mask == inlier_mask_all) + + # - Test that proper errors are raised for wrong inputs - # + with pytest.raises(ValueError, match="`shp_list` must be a list/tuple"): + create_inlier_mask(tba, ref, shp_list=outlines) + + with pytest.raises(ValueError, match="`shp_list` must be a list/tuple of strings or geoutils.Vector instance"): + create_inlier_mask(tba, ref, shp_list=[1]) + + with pytest.raises(ValueError, match="`inout` must be a list/tuple"): + create_inlier_mask( + tba, + ref, + shp_list=[ + outlines, + ], + inout=1, # type: ignore + ) + + with pytest.raises(ValueError, match="`inout` must contain only 1 and -1"): + create_inlier_mask( + tba, + ref, + shp_list=[ + outlines, + ], + inout=[ + 0, + ], + ) + + with pytest.raises(ValueError, match="`inout` must be of same length as shp"): + create_inlier_mask( + tba, + ref, + shp_list=[ + outlines, + ], + inout=[1, 1], + ) + + with pytest.raises(ValueError, match="`slope_lim` must be a list/tuple"): + create_inlier_mask(tba, ref, filtering=True, slope_lim=1) # type: ignore + + with pytest.raises(ValueError, match="`slope_lim` must contain 2 elements"): + create_inlier_mask(tba, ref, filtering=True, slope_lim=[30]) + + with pytest.raises(ValueError, match=r"`slope_lim` must be a tuple/list of 2 elements in the range \[0-90\]"): + create_inlier_mask(tba, ref, filtering=True, slope_lim=[-1, 40]) + + with pytest.raises(ValueError, match=r"`slope_lim` must be a tuple/list of 2 elements in the range \[0-90\]"): + create_inlier_mask(tba, ref, filtering=True, slope_lim=[1, 120]) + + + @pytest.mark.skip(reason="The test segfaults locally and in CI (2023-08-21)") # type: ignore + def test_dem_coregistration(self) -> None: + """ + Test that the dem_coregistration function works expectedly. + Tests the features that are specific to dem_coregistration. + For example, many features are tested in create_inlier_mask, so not tested again here. + TODO: Add DEMs with different projection/grid to test that regridding works as expected. + """ + # Load example reference, to-be-aligned and outlines + ref_dem, tba_dem, outlines = load_examples() + + # - Check that it works with default parameters - # + dem_coreg, coreg_method, coreg_stats, inlier_mask = dem_coregistration(tba_dem, ref_dem) + + # Assert that outputs have expected format + assert isinstance(dem_coreg, xdem.DEM) + assert isinstance(coreg_method, xdem.coreg.Coreg) + assert isinstance(coreg_stats, pd.DataFrame) + + # Assert that default coreg_method is as expected + assert hasattr(coreg_method, "pipeline") + assert isinstance(coreg_method.pipeline[0], xdem.coreg.NuthKaab) + assert isinstance(coreg_method.pipeline[1], xdem.coreg.VerticalShift) + + # The result should be similar to applying the same coreg by hand with: + # - DEMs converted to Float32 + # - default inlier_mask + # - no resampling + coreg_method_ref = xdem.coreg.NuthKaab() + xdem.coreg.VerticalShift() + inlier_mask = create_inlier_mask(tba_dem, ref_dem) + coreg_method_ref.fit(ref_dem.astype("float32"), tba_dem.astype("float32"), inlier_mask=inlier_mask) + dem_coreg_ref = coreg_method_ref.apply(tba_dem, resample=False) + assert dem_coreg == dem_coreg_ref + + # Assert that coregistration improved the residuals + assert abs(coreg_stats["med_orig"].values) > abs(coreg_stats["med_coreg"].values) + assert coreg_stats["nmad_orig"].values > coreg_stats["nmad_coreg"].values + + # - Check some alternative arguments - # + # Test with filename instead of DEMs + dem_coreg2, _, _, _ = dem_coregistration(tba_dem.filename, ref_dem.filename) + assert dem_coreg2 == dem_coreg + + # Test saving to file (mode = "w" is necessary to work on Windows) + outfile = tempfile.NamedTemporaryFile(suffix=".tif", mode="w", delete=False) + dem_coregistration(tba_dem, ref_dem, out_dem_path=outfile.name) + dem_coreg2 = xdem.DEM(outfile.name) + assert dem_coreg2 == dem_coreg + outfile.close() + + # Test that shapefile is properly taken into account - inlier_mask should be False inside outlines + # Need to use resample=True, to ensure that dem_coreg has same georef as inlier_mask + dem_coreg, coreg_method, coreg_stats, inlier_mask = dem_coregistration( + tba_dem, + ref_dem, + shp_list=[ + outlines, + ], + resample=True, + ) + gl_mask = outlines.create_mask(dem_coreg, as_array=True) + assert np.all(~inlier_mask[gl_mask]) + + # Testing with plot + out_fig = tempfile.NamedTemporaryFile(suffix=".png", mode="w", delete=False) + assert os.path.getsize(out_fig.name) == 0 + dem_coregistration(tba_dem, ref_dem, plot=True, out_fig=out_fig.name) + assert os.path.getsize(out_fig.name) > 0 + out_fig.close() + + # Testing different coreg method + dem_coreg, coreg_method, coreg_stats, inlier_mask = dem_coregistration( + tba_dem, ref_dem, coreg_method=xdem.coreg.Tilt() + ) + assert isinstance(coreg_method, xdem.coreg.Tilt) + assert abs(coreg_stats["med_orig"].values) > abs(coreg_stats["med_coreg"].values) + assert coreg_stats["nmad_orig"].values > coreg_stats["nmad_coreg"].values From 36c7eabc9e2cf8b168cc371d63a50fc68e3748e2 Mon Sep 17 00:00:00 2001 From: Romain Hugonnet Date: Fri, 1 Sep 2023 11:37:20 -0800 Subject: [PATCH 3/7] Linting --- tests/test_coreg/__init__.py | 0 tests/test_coreg/test_affine.py | 10 +++++----- tests/test_coreg/test_base.py | 15 ++++++--------- tests/test_coreg/test_workflows.py | 21 ++++++++------------- 4 files changed, 19 insertions(+), 27 deletions(-) create mode 100644 tests/test_coreg/__init__.py diff --git a/tests/test_coreg/__init__.py b/tests/test_coreg/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/test_coreg/test_affine.py b/tests/test_coreg/test_affine.py index 187a340f..75fc74dd 100644 --- a/tests/test_coreg/test_affine.py +++ b/tests/test_coreg/test_affine.py @@ -1,19 +1,19 @@ """Functions to test the affine coregistrations.""" -import warnings -import pytest import copy +import warnings import numpy as np +import pytest import rasterio as rio - from geoutils import Raster, Vector from geoutils.raster import RasterType + import xdem -from xdem import examples -from xdem import coreg +from xdem import coreg, examples from xdem.coreg.affine import AffineCoreg, CoregDict + def load_examples() -> tuple[RasterType, RasterType, Vector]: """Load example files to try coregistration methods with.""" with warnings.catch_warnings(): diff --git a/tests/test_coreg/test_base.py b/tests/test_coreg/test_base.py index cd853080..5ee9b54a 100644 --- a/tests/test_coreg/test_base.py +++ b/tests/test_coreg/test_base.py @@ -2,7 +2,6 @@ from __future__ import annotations -import copy import warnings from typing import Any, Callable @@ -18,7 +17,7 @@ import xdem from xdem import coreg, examples, misc, spatialstats from xdem._typing import NDArrayf - from xdem.coreg.base import Coreg, CoregDict, apply_matrix + from xdem.coreg.base import Coreg, apply_matrix def load_examples() -> tuple[RasterType, RasterType, Vector]: @@ -64,7 +63,6 @@ def test_copy(self, coreg_class: Callable[[], Coreg]) -> None: assert corr_copy._meta != corr._meta assert not hasattr(corr_copy, "vshift") - def test_error_method(self) -> None: """Test different error measures.""" dem1: NDArrayf = np.ones((50, 50)).astype(np.float32) @@ -159,7 +157,6 @@ def test_subsample(self) -> None: # Check that the estimated biases are similar assert deramp_sub._meta["coefficients"] == pytest.approx(deramp_full._meta["coefficients"], rel=1e-1) - def test_coreg_raster_and_ndarray_args(self) -> None: # Create a small sample-DEM @@ -365,7 +362,7 @@ def test_coreg_raises(self, combination: tuple[str, str, str, str, str, str, str # Use VerticalShift as a representative example. vshiftcorr = xdem.coreg.VerticalShift() - def fit_func() -> AffineCoreg: + def fit_func() -> Coreg: return vshiftcorr.fit(ref_dem, tba_dem, transform=transform, crs=crs) def apply_func() -> NDArrayf: @@ -404,6 +401,7 @@ def test_coreg_oneliner(self) -> None: assert np.array_equal(dem_arr, dem_arr2_fixed) + class TestCoregPipeline: ref, tba, outlines = load_examples() # Load example reference, to-be-aligned and mask. @@ -421,7 +419,7 @@ class TestCoregPipeline: points = np.array([[1, 2, 3, 4], [1, 2, 3, 4], [0, 0, 0, 0]], dtype="float64").T @pytest.mark.parametrize("coreg_class", [coreg.VerticalShift, coreg.ICP, coreg.NuthKaab]) # type: ignore - def test_copy(self, coreg_class: Callable[[], AffineCoreg]) -> None: + def test_copy(self, coreg_class: Callable[[], Coreg]) -> None: # Create a pipeline, add some metadata, and copy it pipeline = coreg_class() + coreg_class() @@ -531,7 +529,7 @@ class TestBlockwiseCoreg: "pipeline", [coreg.VerticalShift(), coreg.VerticalShift() + coreg.NuthKaab()] ) # type: ignore @pytest.mark.parametrize("subdivision", [4, 10]) # type: ignore - def test_blockwise_coreg(self, pipeline: AffineCoreg, subdivision: int) -> None: + def test_blockwise_coreg(self, pipeline: Coreg, subdivision: int) -> None: warnings.simplefilter("error") blockwise = coreg.BlockwiseCoreg(step=pipeline, subdivision=subdivision) @@ -619,7 +617,6 @@ def test_blockwise_coreg_large_gaps(self) -> None: assert np.nanstd(ddem_pre) > np.nanstd(ddem_post) - def test_apply_matrix() -> None: warnings.simplefilter("error") ref, tba, outlines = load_examples() # Load example reference, to-be-aligned and mask. @@ -833,4 +830,4 @@ def test_warp_dem() -> None: plt.subplot(144) plt.imshow(dem - untransformed_dem, cmap="coolwarm_r", vmin=-10, vmax=10) - plt.show() \ No newline at end of file + plt.show() diff --git a/tests/test_coreg/test_workflows.py b/tests/test_coreg/test_workflows.py index 485d1f82..7fe9a6ca 100644 --- a/tests/test_coreg/test_workflows.py +++ b/tests/test_coreg/test_workflows.py @@ -1,15 +1,15 @@ """Functions to test the coregistration workflows.""" import os -import warnings import tempfile +import warnings import numpy as np import pandas as pd import pytest - from geoutils import Raster, Vector from geoutils.raster import RasterType + import xdem from xdem import examples from xdem.coreg.workflows import create_inlier_mask, dem_coregistration @@ -25,8 +25,8 @@ def load_examples() -> tuple[RasterType, RasterType, Vector]: return reference_raster, to_be_aligned_raster, glacier_mask -class TestWorkflows: +class TestWorkflows: def test_create_inlier_mask(self) -> None: """Test that the create_inlier_mask function works expectedly.""" warnings.simplefilter("error") @@ -90,18 +90,16 @@ def test_create_inlier_mask(self) -> None: inlier_mask_comp2 = np.ones(tba.data.shape, dtype=bool) inlier_mask_comp2[slope.data < slope_lim[0]] = False inlier_mask_comp2[slope.data > slope_lim[1]] = False - inlier_mask = create_inlier_mask( - tba, ref, filtering=True, slope_lim=slope_lim, nmad_factor=np.inf - ) + inlier_mask = create_inlier_mask(tba, ref, filtering=True, slope_lim=slope_lim, nmad_factor=np.inf) assert np.all(inlier_mask == inlier_mask_comp2) # Test the nmad_factor filter only nmad_factor = 3 ddem = tba - ref - inlier_mask_comp3 = (np.abs(ddem.data - np.median(ddem)) < nmad_factor * xdem.spatialstats.nmad(ddem)).filled(False) - inlier_mask = create_inlier_mask( - tba, ref, filtering=True, slope_lim=[0, 90], nmad_factor=nmad_factor + inlier_mask_comp3 = (np.abs(ddem.data - np.median(ddem)) < nmad_factor * xdem.spatialstats.nmad(ddem)).filled( + False ) + inlier_mask = create_inlier_mask(tba, ref, filtering=True, slope_lim=[0, 90], nmad_factor=nmad_factor) assert np.all(inlier_mask == inlier_mask_comp3) # Test the sum of both @@ -114,9 +112,7 @@ def test_create_inlier_mask(self) -> None: # Test the dh_max filter only dh_max = 200 inlier_mask_comp4 = (np.abs(ddem.data) < dh_max).filled(False) - inlier_mask = create_inlier_mask( - tba, ref, filtering=True, slope_lim=[0, 90], nmad_factor=np.inf, dh_max=dh_max - ) + inlier_mask = create_inlier_mask(tba, ref, filtering=True, slope_lim=[0, 90], nmad_factor=np.inf, dh_max=dh_max) assert np.all(inlier_mask == inlier_mask_comp4) # - Test the sum of outlines + dh_max + slope - # @@ -189,7 +185,6 @@ def test_create_inlier_mask(self) -> None: with pytest.raises(ValueError, match=r"`slope_lim` must be a tuple/list of 2 elements in the range \[0-90\]"): create_inlier_mask(tba, ref, filtering=True, slope_lim=[1, 120]) - @pytest.mark.skip(reason="The test segfaults locally and in CI (2023-08-21)") # type: ignore def test_dem_coregistration(self) -> None: """ From e7e61dbc26702ee40574282dad88326d48e33a2b Mon Sep 17 00:00:00 2001 From: Romain Hugonnet Date: Fri, 1 Sep 2023 13:28:30 -0800 Subject: [PATCH 4/7] Add bias_var_name optional instantiation, checks in fit and apply and related tests --- tests/test_coreg/test_base.py | 24 ++++++++++- tests/test_coreg/test_biascorr.py | 71 +++++++++++++++++++++++++++---- xdem/coreg/base.py | 2 +- xdem/coreg/biascorr.py | 57 +++++++++++++++++-------- 4 files changed, 126 insertions(+), 28 deletions(-) diff --git a/tests/test_coreg/test_base.py b/tests/test_coreg/test_base.py index 5ee9b54a..0c79bbef 100644 --- a/tests/test_coreg/test_base.py +++ b/tests/test_coreg/test_base.py @@ -455,15 +455,35 @@ def test_pipeline(self) -> None: # Assert that the combined vertical shift is 2 assert pipeline2.to_matrix()[2, 3] == 2.0 - def test_pipeline_affine_biascorr(self) -> None: + all_coregs = [coreg.VerticalShift(), coreg.NuthKaab(), coreg.ICP(), coreg.Deramp(), coreg.TerrainBias(), coreg.DirectionalBias()] + + @pytest.mark.parametrize("coreg1", all_coregs) + @pytest.mark.parametrize("coreg2", all_coregs) + def test_pipeline_combinations__nobiasvar(self, coreg1: Coreg, coreg2: Coreg) -> None: + """Test pipelines with all combinations of coregistration subclasses (without bias variables)""" # Create a pipeline from one affine and one biascorr methods. - pipeline = coreg.CoregPipeline([coreg.Deramp(), coreg.NuthKaab()]) + pipeline = coreg.CoregPipeline([coreg1, coreg2]) pipeline.fit(**self.fit_params) aligned_dem, _ = pipeline.apply(self.tba.data, transform=self.ref.transform, crs=self.ref.crs) assert aligned_dem.shape == self.ref.data.squeeze().shape + all_coregs = [coreg.VerticalShift(), coreg.NuthKaab(), coreg.ICP(), coreg.Deramp(), coreg.TerrainBias(), + coreg.DirectionalBias()] + + @pytest.mark.parametrize("coreg1", all_coregs) + @pytest.mark.parametrize("coreg2", [coreg.BiasCorr1D()]) + def test_pipeline_combinations__biasvar(self, coreg1: Coreg, coreg2: Coreg) -> None: + """Test pipelines with all combinations of coregistration subclasses with bias variables""" + + # Create a pipeline from one affine and one biascorr methods. + pipeline = coreg.CoregPipeline([coreg1, coreg2]) + pipeline.fit(**self.fit_params, bias_vars={"slope": xdem.terrain.slope(self.ref)}) + + aligned_dem, _ = pipeline.apply(self.tba.data, transform=self.ref.transform, crs=self.ref.crs) + assert aligned_dem.shape == self.ref.data.squeeze().shape + def test_pipeline_pts(self) -> None: warnings.simplefilter("ignore") diff --git a/tests/test_coreg/test_biascorr.py b/tests/test_coreg/test_biascorr.py index e324a2ae..8d3db267 100644 --- a/tests/test_coreg/test_biascorr.py +++ b/tests/test_coreg/test_biascorr.py @@ -53,6 +53,7 @@ def test_biascorr(self) -> None: # Check default "fit" metadata was set properly assert bcorr._meta["fit_func"] == biascorr.fit_workflows["norder_polynomial"]["func"] assert bcorr._meta["fit_optimizer"] == biascorr.fit_workflows["norder_polynomial"]["optimizer"] + assert bcorr._meta["bias_var_names"] is None # Check that the _is_affine attribute is set correctly assert not bcorr._is_affine @@ -77,6 +78,15 @@ def test_biascorr(self) -> None: assert bcorr3._fit_or_bin == "bin_and_fit" + # Or defining bias variable names on instantiation as iterable + bcorr4 = biascorr.BiasCorr(bias_var_names=("slope", "ncc")) + assert bcorr4._meta["bias_var_names"] == ["slope", "ncc"] + + # Same using an array + bcorr5 = biascorr.BiasCorr(bias_var_names=np.array(["slope", "ncc"])) + assert bcorr5._meta["bias_var_names"] == ["slope", "ncc"] + + def test_biascorr__errors(self) -> None: """Test the errors that should be raised by BiasCorr.""" @@ -153,6 +163,9 @@ def test_biascorr__fit_1d(self, fit_func, fit_optimizer) -> None: # Run with input parameter, and using only 100 subsamples for speed bcorr.fit(**elev_fit_params, subsample=100, random_state=42) + # Check that variable names are defined during fit + assert bcorr._meta["bias_var_names"] == ["elevation"] + # Apply the correction bcorr.apply(dem=self.tba, bias_vars=bias_vars_dict) @@ -180,6 +193,9 @@ def test_biascorr__fit_2d(self, fit_func, fit_optimizer) -> None: # Passing p0 defines the number of parameters to solve for bcorr.fit(**elev_fit_params, subsample=100, p0=[0, 0, 0, 0], random_state=42) + # Check that variable names are defined during fit + assert bcorr._meta["bias_var_names"] == ["elevation", "slope"] + # Apply the correction bcorr.apply(dem=self.tba, bias_vars=bias_vars_dict) @@ -199,6 +215,9 @@ def test_biascorr__bin_1d(self, bin_sizes, bin_statistic) -> None: # Run with input parameter, and using only 100 subsamples for speed bcorr.fit(**elev_fit_params, subsample=1000, random_state=42) + # Check that variable names are defined during fit + assert bcorr._meta["bias_var_names"] == ["elevation"] + # Apply the correction bcorr.apply(dem=self.tba, bias_vars=bias_vars_dict) @@ -218,6 +237,9 @@ def test_biascorr__bin_2d(self, bin_sizes, bin_statistic) -> None: # Run with input parameter, and using only 100 subsamples for speed bcorr.fit(**elev_fit_params, subsample=10000, random_state=42) + # Check that variable names are defined during fit + assert bcorr._meta["bias_var_names"] == ["elevation", "slope"] + # Apply the correction bcorr.apply(dem=self.tba, bias_vars=bias_vars_dict) @@ -257,6 +279,9 @@ def test_biascorr__bin_and_fit_1d(self, fit_func, fit_optimizer, bin_sizes, bin_ # Run with input parameter, and using only 100 subsamples for speed bcorr.fit(**elev_fit_params, subsample=100, random_state=42) + # Check that variable names are defined during fit + assert bcorr._meta["bias_var_names"] == ["elevation"] + # Apply the correction bcorr.apply(dem=self.tba, bias_vars=bias_vars_dict) @@ -292,6 +317,9 @@ def test_biascorr__bin_and_fit_2d(self, fit_func, fit_optimizer, bin_sizes, bin_ # Passing p0 defines the number of parameters to solve for bcorr.fit(**elev_fit_params, subsample=100, p0=[0, 0, 0, 0], random_state=42) + # Check that variable names are defined during fit + assert bcorr._meta["bias_var_names"] == ["elevation", "slope"] + # Apply the correction bcorr.apply(dem=self.tba, bias_vars=bias_vars_dict) @@ -322,6 +350,15 @@ def test_biascorr1d(self) -> None: bias_vars_dict = {"elevation": self.ref, "slope": xdem.terrain.slope(self.ref)} bcorr1d.fit(**elev_fit_params, bias_vars=bias_vars_dict) + # Raise error when variables don't match + with pytest.raises( + ValueError, match=re.escape("The keys of `bias_vars` do not match the `bias_var_names` defined during " + "instantiation: ['ncc'].") + ): + bcorr1d2 = biascorr.BiasCorr1D(bias_var_names=["ncc"]) + bias_vars_dict = {"elevation": self.ref} + bcorr1d2.fit(**elev_fit_params, bias_vars=bias_vars_dict) + def test_biascorr2d(self) -> None: """ Test the subclass BiasCorr2D, which defines default parameters for 2D. @@ -329,17 +366,17 @@ def test_biascorr2d(self) -> None: """ # Try default "fit" parameters instantiation - bcorr1d = biascorr.BiasCorr2D() + bcorr2d = biascorr.BiasCorr2D() - assert bcorr1d._meta["fit_func"] == polynomial_2d - assert bcorr1d._meta["fit_optimizer"] == scipy.optimize.curve_fit + assert bcorr2d._meta["fit_func"] == polynomial_2d + assert bcorr2d._meta["fit_optimizer"] == scipy.optimize.curve_fit # Try default "bin" parameter instantiation - bcorr1d = biascorr.BiasCorr2D(fit_or_bin="bin") + bcorr2d = biascorr.BiasCorr2D(fit_or_bin="bin") - assert bcorr1d._meta["bin_sizes"] == 10 - assert bcorr1d._meta["bin_statistic"] == np.nanmedian - assert bcorr1d._meta["bin_apply_method"] == "linear" + assert bcorr2d._meta["bin_sizes"] == 10 + assert bcorr2d._meta["bin_statistic"] == np.nanmedian + assert bcorr2d._meta["bin_apply_method"] == "linear" elev_fit_params = self.fit_params.copy() # Raise error when wrong number of parameters are passed @@ -347,7 +384,16 @@ def test_biascorr2d(self) -> None: ValueError, match="Exactly two variables have to be provided through the argument " "'bias_vars', got 1." ): bias_vars_dict = {"elevation": self.ref} - bcorr1d.fit(**elev_fit_params, bias_vars=bias_vars_dict) + bcorr2d.fit(**elev_fit_params, bias_vars=bias_vars_dict) + + # Raise error when variables don't match + with pytest.raises( + ValueError, match=re.escape("The keys of `bias_vars` do not match the `bias_var_names` defined during " + "instantiation: ['elevation', 'ncc'].") + ): + bcorr2d2 = biascorr.BiasCorr2D(bias_var_names=["elevation", "ncc"]) + bias_vars_dict = {"elevation": self.ref, "slope": xdem.terrain.slope(self.ref)} + bcorr2d2.fit(**elev_fit_params, bias_vars=bias_vars_dict) def test_directionalbias(self) -> None: """Test the subclass DirectionalBias.""" @@ -360,6 +406,9 @@ def test_directionalbias(self) -> None: assert dirbias._meta["fit_optimizer"] == biascorr.fit_workflows["nfreq_sumsin"]["optimizer"] assert dirbias._meta["angle"] == 45 + # Check that variable names are defined during instantiation + assert dirbias._meta["bias_var_names"] == ["angle"] + @pytest.mark.parametrize("angle", [20, 90, 210]) # type: ignore @pytest.mark.parametrize("nb_freq", [1, 2, 3]) # type: ignore def test_directionalbias__synthetic(self, angle, nb_freq) -> None: @@ -435,6 +484,9 @@ def test_deramp(self) -> None: assert deramp._meta["fit_optimizer"] == scipy.optimize.curve_fit assert deramp._meta["poly_order"] == 2 + # Check that variable names are defined during instantiation + assert deramp._meta["bias_var_names"] == ["xx", "yy"] + @pytest.mark.parametrize("order", [1, 2, 3, 4]) # type: ignore def test_deramp__synthetic(self, order: int) -> None: """Run the deramp for varying polynomial orders using a synthetic elevation difference.""" @@ -479,6 +531,9 @@ def test_terrainbias(self) -> None: assert tb._meta["bin_statistic"] == np.nanmedian assert tb._meta["terrain_attribute"] == "maximum_curvature" + assert tb._meta["bias_var_names"] == ["maximum_curvature"] + + def test_terrainbias__synthetic(self) -> None: """Test the subclass TerrainBias.""" diff --git a/xdem/coreg/base.py b/xdem/coreg/base.py index 575be9a8..22e6bf37 100644 --- a/xdem/coreg/base.py +++ b/xdem/coreg/base.py @@ -675,9 +675,9 @@ class CoregDict(TypedDict, total=False): bin_sizes: int | dict[str, int | Iterable[float]] bin_statistic: Callable[[NDArrayf], np.floating[Any]] bin_apply_method: Literal["linear"] | Literal["per_bin"] + bias_var_names: list[str] # 2/ Outputs - bias_vars: list[str] fit_params: NDArrayf fit_perr: NDArrayf bin_dataframe: pd.DataFrame diff --git a/xdem/coreg/biascorr.py b/xdem/coreg/biascorr.py index 09cdf0cb..eeee6ff3 100644 --- a/xdem/coreg/biascorr.py +++ b/xdem/coreg/biascorr.py @@ -47,6 +47,7 @@ def __init__( bin_sizes: int | dict[str, int | Iterable[float]] = 10, bin_statistic: Callable[[NDArrayf], np.floating[Any]] = np.nanmedian, bin_apply_method: Literal["linear"] | Literal["per_bin"] = "linear", + bias_var_names: Iterable[str] = None, ): """ Instantiate a bias correction object. @@ -98,17 +99,20 @@ def __init__( "got {}.".format(type(bin_apply_method)) ) + list_bias_var_names = list(bias_var_names) if bias_var_names is not None else None + # Now we write the relevant attributes to the class metadata # For fitting if fit_or_bin == "fit": - meta_fit = {"fit_func": fit_func, "fit_optimizer": fit_optimizer} + meta_fit = {"fit_func": fit_func, "fit_optimizer": fit_optimizer, "bias_var_names": list_bias_var_names} # Somehow mypy doesn't understand that fit_func and fit_optimizer can only be callables now, # even writing the above "if" in a more explicit "if; else" loop with new variables names and typing super().__init__(meta=meta_fit) # type: ignore # For binning elif fit_or_bin == "bin": - meta_bin = {"bin_sizes": bin_sizes, "bin_statistic": bin_statistic, "bin_apply_method": bin_apply_method} + meta_bin = {"bin_sizes": bin_sizes, "bin_statistic": bin_statistic, "bin_apply_method": bin_apply_method, + "bias_var_names": list_bias_var_names} super().__init__(meta=meta_bin) # type: ignore # For both @@ -118,6 +122,7 @@ def __init__( "fit_optimizer": fit_optimizer, "bin_sizes": bin_sizes, "bin_statistic": bin_statistic, + "bias_var_names": list_bias_var_names } super().__init__(meta=meta_bin_and_fit) # type: ignore @@ -204,6 +209,15 @@ def _fit_func( # type: ignore if bias_vars is None: raise ValueError("At least one `bias_var` should be passed to the fitting function, got None.") + # If bias var names were explicitly passed at instantiation, check that they match the one from the dict + if self._meta["bias_var_names"] is not None: + if not sorted(list(bias_vars.keys())) == sorted(self._meta["bias_var_names"]): + raise ValueError("The keys of `bias_vars` do not match the `bias_var_names` defined during " + "instantiation: {}.".format(self._meta["bias_var_names"])) + # Otherwise, store bias variable names from the dictionary + else: + self._meta["bias_var_names"] = list(bias_vars.keys()) + # Compute difference and mask of valid data diff = ref_dem - tba_dem ind_valid = np.logical_and.reduce((np.isfinite(diff), *(np.isfinite(var) for var in bias_vars.values()))) @@ -346,9 +360,6 @@ def _fit_func( # type: ignore elif self._fit_or_bin in ["bin", "bin_and_fit"]: self._meta["bin_dataframe"] = df - # Save bias variable names in any case - self._meta["bias_vars"] = list(bias_vars.keys()) - def _apply_func( # type: ignore self, dem: NDArrayf, @@ -361,6 +372,11 @@ def _apply_func( # type: ignore if bias_vars is None: raise ValueError("At least one `bias_var` should be passed to the `apply` function, got None.") + # Check the bias_vars passed match the ones stored for this bias correction class + if not sorted(list(bias_vars.keys())) == sorted(self._meta["bias_var_names"]): + raise ValueError("The keys of `bias_vars` do not match the `bias_var_names` defined during " + "instantiation or fitting: {}.".format(self._meta["bias_var_names"])) + # Apply function to get correction (including if binning was done before) if self._fit_or_bin in ["fit", "bin_and_fit"]: corr = self._meta["fit_func"](tuple(bias_vars.values()), *self._meta["fit_params"]) @@ -409,6 +425,7 @@ def __init__( bin_sizes: int | dict[str, int | Iterable[float]] = 10, bin_statistic: Callable[[NDArrayf], np.floating[Any]] = np.nanmedian, bin_apply_method: Literal["linear"] | Literal["per_bin"] = "linear", + bias_var_names: Iterable[str] = None, ): """ Instantiate a 1D bias correction. @@ -421,8 +438,9 @@ def __init__( :param bin_statistic: Statistic of central tendency (e.g., mean) to apply during the binning. :param bin_apply_method: Method to correct with the binned statistics, either "linear" to interpolate linearly between bins, or "per_bin" to apply the statistic for each bin. + :param bias_var_names: (Optional) For pipelines, explicitly define bias variables names to use during .fit(). """ - super().__init__(fit_or_bin, fit_func, fit_optimizer, bin_sizes, bin_statistic, bin_apply_method) + super().__init__(fit_or_bin, fit_func, fit_optimizer, bin_sizes, bin_statistic, bin_apply_method, bias_var_names) def _fit_func( # type: ignore self, @@ -469,6 +487,7 @@ def __init__( bin_sizes: int | dict[str, int | Iterable[float]] = 10, bin_statistic: Callable[[NDArrayf], np.floating[Any]] = np.nanmedian, bin_apply_method: Literal["linear"] | Literal["per_bin"] = "linear", + bias_var_names: Iterable[str] = None, ): """ Instantiate a 2D bias correction. @@ -481,8 +500,9 @@ def __init__( :param bin_statistic: Statistic of central tendency (e.g., mean) to apply during the binning. :param bin_apply_method: Method to correct with the binned statistics, either "linear" to interpolate linearly between bins, or "per_bin" to apply the statistic for each bin. + :param bias_var_names: (Optional) For pipelines, explicitly define bias variables names to use during .fit(). """ - super().__init__(fit_or_bin, fit_func, fit_optimizer, bin_sizes, bin_statistic, bin_apply_method) + super().__init__(fit_or_bin, fit_func, fit_optimizer, bin_sizes, bin_statistic, bin_apply_method, bias_var_names) def _fit_func( # type: ignore self, @@ -530,6 +550,7 @@ def __init__( bin_sizes: int | dict[str, int | Iterable[float]] = 10, bin_statistic: Callable[[NDArrayf], np.floating[Any]] = np.nanmedian, bin_apply_method: Literal["linear"] | Literal["per_bin"] = "linear", + bias_var_names: Iterable[str] = None, ): """ Instantiate an N-D bias correction. @@ -542,8 +563,9 @@ def __init__( :param bin_statistic: Statistic of central tendency (e.g., mean) to apply during the binning. :param bin_apply_method: Method to correct with the binned statistics, either "linear" to interpolate linearly between bins, or "per_bin" to apply the statistic for each bin. + :param bias_var_names: (Optional) For pipelines, explicitly define bias variables names to use during .fit(). """ - super().__init__(fit_or_bin, fit_func, fit_optimizer, bin_sizes, bin_statistic, bin_apply_method) + super().__init__(fit_or_bin, fit_func, fit_optimizer, bin_sizes, bin_statistic, bin_apply_method, bias_var_names) def _fit_func( # type: ignore self, @@ -601,16 +623,16 @@ def __init__( :param bin_apply_method: Method to correct with the binned statistics, either "linear" to interpolate linearly between bins, or "per_bin" to apply the statistic for each bin. """ - super().__init__(fit_or_bin, fit_func, fit_optimizer, bin_sizes, bin_statistic, bin_apply_method) + super().__init__(fit_or_bin, fit_func, fit_optimizer, bin_sizes, bin_statistic, bin_apply_method, ["angle"]) self._meta["angle"] = angle def _fit_func( # type: ignore self, ref_dem: NDArrayf, tba_dem: NDArrayf, - bias_vars: dict[str, NDArrayf], transform: rio.transform.Affine, crs: rio.crs.CRS, + bias_vars: dict[str, NDArrayf] = None, weights: None | NDArrayf = None, verbose: bool = False, **kwargs, @@ -697,23 +719,24 @@ def __init__( between bins, or "per_bin" to apply the statistic for each bin. """ - super().__init__(fit_or_bin, fit_func, fit_optimizer, bin_sizes, bin_statistic, bin_apply_method) + super().__init__(fit_or_bin, fit_func, fit_optimizer, bin_sizes, bin_statistic, bin_apply_method, [terrain_attribute]) + # This is the same as bias_var_names, but let's leave the duplicate for clarity self._meta["terrain_attribute"] = terrain_attribute def _fit_func( # type: ignore self, ref_dem: NDArrayf, tba_dem: NDArrayf, - bias_vars: dict[str, NDArrayf], transform: rio.transform.Affine, crs: rio.crs.CRS, + bias_vars: dict[str, NDArrayf] = None, weights: None | NDArrayf = None, verbose: bool = False, **kwargs, ) -> None: # Derive terrain attribute - if self._meta["terrain_attribute"] == "elevation": + if self._meta["bias_var_names"] == "elevation": attr = ref_dem else: attr = xdem.terrain.get_terrain_attribute( @@ -743,13 +766,13 @@ def _apply_func( if bias_vars is None: # Derive terrain attribute - if self._meta["terrain_attribute"] == "elevation": + if self._meta["bias_var_names"] == "elevation": attr = dem else: attr = xdem.terrain.get_terrain_attribute( - dem=dem, attribute=self._meta["terrain_attribute"], resolution=(transform[0], abs(transform[4])) + dem=dem, attribute=self._meta["bias_var_names"], resolution=(transform[0], abs(transform[4])) ) - bias_vars = {self._meta["terrain_attribute"]: attr} + bias_vars = {self._meta["bias_var_names"]: attr} return super()._apply_func(dem=dem, transform=transform, crs=crs, bias_vars=bias_vars, **kwargs) @@ -782,7 +805,7 @@ def __init__( :param bin_apply_method: Method to correct with the binned statistics, either "linear" to interpolate linearly between bins, or "per_bin" to apply the statistic for each bin. """ - super().__init__(fit_or_bin, fit_func, fit_optimizer, bin_sizes, bin_statistic, bin_apply_method) + super().__init__(fit_or_bin, fit_func, fit_optimizer, bin_sizes, bin_statistic, bin_apply_method, ["xx", "yy"]) self._meta["poly_order"] = poly_order def _fit_func( # type: ignore From 77e2e5d45f8d9af6516ca93220e3479fa201bbd6 Mon Sep 17 00:00:00 2001 From: Romain Hugonnet Date: Fri, 1 Sep 2023 18:38:13 -0800 Subject: [PATCH 5/7] Write _parse_bias_vars logic for CoregPipeline and add tests --- tests/test_coreg/test_base.py | 50 ++++++++-- tests/test_coreg/test_biascorr.py | 6 ++ xdem/coreg/affine.py | 14 --- xdem/coreg/base.py | 155 ++++++++++++++++++++++++------ xdem/coreg/biascorr.py | 81 +++------------- 5 files changed, 187 insertions(+), 119 deletions(-) diff --git a/tests/test_coreg/test_base.py b/tests/test_coreg/test_base.py index 0c79bbef..e84d566c 100644 --- a/tests/test_coreg/test_base.py +++ b/tests/test_coreg/test_base.py @@ -2,6 +2,7 @@ from __future__ import annotations +import re import warnings from typing import Any, Callable @@ -47,6 +48,15 @@ class TestCoregClass: # Create some 3D coordinates with Z coordinates being 0 to try the apply_pts functions. points = np.array([[1, 2, 3, 4], [1, 2, 3, 4], [0, 0, 0, 0]], dtype="float64").T + def test_init(self): + """Test instantiation of Coreg""" + + c = coreg.Coreg() + + assert c._fit_called is False + assert c._is_affine is None + assert c._needs_vars is False + @pytest.mark.parametrize("coreg_class", [coreg.VerticalShift, coreg.ICP, coreg.NuthKaab]) # type: ignore def test_copy(self, coreg_class: Callable[[], Coreg]) -> None: """Test that copying work expectedly (that no attributes still share references).""" @@ -457,8 +467,8 @@ def test_pipeline(self) -> None: all_coregs = [coreg.VerticalShift(), coreg.NuthKaab(), coreg.ICP(), coreg.Deramp(), coreg.TerrainBias(), coreg.DirectionalBias()] - @pytest.mark.parametrize("coreg1", all_coregs) - @pytest.mark.parametrize("coreg2", all_coregs) + @pytest.mark.parametrize("coreg1", all_coregs) # type: ignore + @pytest.mark.parametrize("coreg2", all_coregs) # type: ignore def test_pipeline_combinations__nobiasvar(self, coreg1: Coreg, coreg2: Coreg) -> None: """Test pipelines with all combinations of coregistration subclasses (without bias variables)""" @@ -472,18 +482,46 @@ def test_pipeline_combinations__nobiasvar(self, coreg1: Coreg, coreg2: Coreg) -> all_coregs = [coreg.VerticalShift(), coreg.NuthKaab(), coreg.ICP(), coreg.Deramp(), coreg.TerrainBias(), coreg.DirectionalBias()] - @pytest.mark.parametrize("coreg1", all_coregs) - @pytest.mark.parametrize("coreg2", [coreg.BiasCorr1D()]) + @pytest.mark.parametrize("coreg1", all_coregs) # type: ignore + @pytest.mark.parametrize("coreg2", [coreg.BiasCorr1D(bias_var_names=["slope"], fit_or_bin="bin"), + coreg.BiasCorr2D(bias_var_names=["slope", "aspect"], fit_or_bin="bin")]) # type: ignore def test_pipeline_combinations__biasvar(self, coreg1: Coreg, coreg2: Coreg) -> None: """Test pipelines with all combinations of coregistration subclasses with bias variables""" # Create a pipeline from one affine and one biascorr methods. pipeline = coreg.CoregPipeline([coreg1, coreg2]) - pipeline.fit(**self.fit_params, bias_vars={"slope": xdem.terrain.slope(self.ref)}) + bias_vars = {"slope": xdem.terrain.slope(self.ref), "aspect": xdem.terrain.aspect(self.ref)} + pipeline.fit(**self.fit_params, bias_vars=bias_vars) - aligned_dem, _ = pipeline.apply(self.tba.data, transform=self.ref.transform, crs=self.ref.crs) + aligned_dem, _ = pipeline.apply(self.tba.data, transform=self.ref.transform, crs=self.ref.crs, bias_vars=bias_vars) assert aligned_dem.shape == self.ref.data.squeeze().shape + def test_pipeline__errors(self): + """Test pipeline raises proper errors.""" + + pipeline = coreg.CoregPipeline([coreg.NuthKaab(), coreg.BiasCorr1D()]) + with pytest.raises(ValueError, match=re.escape("No `bias_vars` passed to .fit() for bias correction step " + " of the pipeline.")): + pipeline.fit(**self.fit_params) + + + pipeline2 = coreg.CoregPipeline([coreg.NuthKaab(), coreg.BiasCorr1D(), coreg.BiasCorr1D()]) + with pytest.raises(ValueError, match=re.escape("No `bias_vars` passed to .fit() for bias correction step " + "of the pipeline. As you are using several bias correction steps requiring" + " `bias_vars`, don't forget to explicitly define their `bias_var_names` " + "during instantiation, e.g. BiasCorr1D(bias_var_names=['slope']).")): + pipeline2.fit(**self.fit_params) + + with pytest.raises(ValueError, match=re.escape("When using several bias correction steps requiring `bias_vars` in a pipeline," + "the `bias_var_names` need to be explicitly defined at each step's " + "instantiation, e.g. BiasCorr1D(bias_var_names=['slope']).")): + pipeline2.fit(**self.fit_params, bias_vars={"slope": xdem.terrain.slope(self.ref)}) + + pipeline3 = coreg.CoregPipeline([coreg.NuthKaab(), coreg.BiasCorr1D(bias_var_names=["slope"])]) + with pytest.raises(ValueError, match=re.escape("Not all keys of `bias_vars` in .fit() match the `bias_var_names` defined during " + "instantiation of the bias correction step : ['slope'].")): + pipeline3.fit(**self.fit_params, bias_vars={"ncc": xdem.terrain.slope(self.ref)}) + def test_pipeline_pts(self) -> None: warnings.simplefilter("ignore") diff --git a/tests/test_coreg/test_biascorr.py b/tests/test_coreg/test_biascorr.py index 8d3db267..372cf6dc 100644 --- a/tests/test_coreg/test_biascorr.py +++ b/tests/test_coreg/test_biascorr.py @@ -58,6 +58,7 @@ def test_biascorr(self) -> None: # Check that the _is_affine attribute is set correctly assert not bcorr._is_affine assert bcorr._fit_or_bin == "fit" + assert bcorr._needs_vars is True # Or with default bin arguments bcorr2 = biascorr.BiasCorr(fit_or_bin="bin") @@ -334,6 +335,7 @@ def test_biascorr1d(self) -> None: assert bcorr1d._meta["fit_func"] == biascorr.fit_workflows["norder_polynomial"]["func"] assert bcorr1d._meta["fit_optimizer"] == biascorr.fit_workflows["norder_polynomial"]["optimizer"] + assert bcorr1d._needs_vars is True # Try default "bin" parameter instantiation bcorr1d = biascorr.BiasCorr1D(fit_or_bin="bin") @@ -370,6 +372,7 @@ def test_biascorr2d(self) -> None: assert bcorr2d._meta["fit_func"] == polynomial_2d assert bcorr2d._meta["fit_optimizer"] == scipy.optimize.curve_fit + assert bcorr2d._needs_vars is True # Try default "bin" parameter instantiation bcorr2d = biascorr.BiasCorr2D(fit_or_bin="bin") @@ -405,6 +408,7 @@ def test_directionalbias(self) -> None: assert dirbias._meta["fit_func"] == biascorr.fit_workflows["nfreq_sumsin"]["func"] assert dirbias._meta["fit_optimizer"] == biascorr.fit_workflows["nfreq_sumsin"]["optimizer"] assert dirbias._meta["angle"] == 45 + assert dirbias._needs_vars is False # Check that variable names are defined during instantiation assert dirbias._meta["bias_var_names"] == ["angle"] @@ -483,6 +487,7 @@ def test_deramp(self) -> None: assert deramp._meta["fit_func"] == polynomial_2d assert deramp._meta["fit_optimizer"] == scipy.optimize.curve_fit assert deramp._meta["poly_order"] == 2 + assert deramp._needs_vars is False # Check that variable names are defined during instantiation assert deramp._meta["bias_var_names"] == ["xx", "yy"] @@ -530,6 +535,7 @@ def test_terrainbias(self) -> None: assert tb._meta["bin_sizes"] == 100 assert tb._meta["bin_statistic"] == np.nanmedian assert tb._meta["terrain_attribute"] == "maximum_curvature" + assert tb._needs_vars is False assert tb._meta["bias_var_names"] == ["maximum_curvature"] diff --git a/xdem/coreg/affine.py b/xdem/coreg/affine.py index 89b682e0..936a59f1 100644 --- a/xdem/coreg/affine.py +++ b/xdem/coreg/affine.py @@ -227,20 +227,6 @@ def __init__(self, meta: CoregDict | None = None, matrix: NDArrayf | None = None self._meta["matrix"] = valid_matrix self._is_affine = True - @property - def is_affine(self) -> bool: - """Check if the transform be explained by a 3D affine transform.""" - # _is_affine is found by seeing if to_matrix() raises an error. - # If this hasn't been done yet, it will be None - if self._is_affine is None: - try: # See if to_matrix() raises an error. - self.to_matrix() - self._is_affine = True - except (ValueError, NotImplementedError): - self._is_affine = False - - return self._is_affine - def to_matrix(self) -> NDArrayf: """Convert the transform to a 4x4 transformation matrix.""" return self._to_matrix_func() diff --git a/xdem/coreg/base.py b/xdem/coreg/base.py index 22e6bf37..66f945cf 100644 --- a/xdem/coreg/base.py +++ b/xdem/coreg/base.py @@ -374,19 +374,6 @@ def _preprocess_coreg_raster_input( ref_dem[~mask_subsample] = np.nan tba_dem[~mask_subsample] = np.nan - # # TODO: Use tested subsampling function from geoutils? - # # The full mask (inliers=True) is the inverse of the above masks and the provided mask. - # full_mask = ( - # ~ref_mask & ~tba_mask & (np.asarray(inlier_mask) if inlier_mask is not None else True) - # ).squeeze() - # random_indices = subsample_array(full_mask, subsample=subsample, return_indices=True) - # full_mask[random_indices] = False - # - # # Remove the data, keep the shape - # # TODO: there's likely a better way to go about this... - # ref_dem[~full_mask] = np.nan - # tba_dem[~full_mask] = np.nan - return ref_dem, tba_dem, transform, crs @@ -704,6 +691,7 @@ class Coreg: _fit_called: bool = False # Flag to check if the .fit() method has been called. _is_affine: bool | None = None + _needs_vars: bool = False def __init__(self, meta: CoregDict | None = None) -> None: """Instantiate a generic processing step method.""" @@ -723,6 +711,20 @@ def __add__(self, other: CoregType) -> CoregPipeline: raise ValueError(f"Incompatible add type: {type(other)}. Expected 'Coreg' subclass") return CoregPipeline([self, other]) + @property + def is_affine(self) -> bool: + """Check if the transform be explained by a 3D affine transform.""" + # _is_affine is found by seeing if to_matrix() raises an error. + # If this hasn't been done yet, it will be None + if self._is_affine is None: + try: # See if to_matrix() raises an error. + self.to_matrix() + self._is_affine = True + except (ValueError, NotImplementedError): + self._is_affine = False + + return self._is_affine + def fit( self: CoregType, reference_dem: NDArrayf | MArrayf | RasterType, @@ -730,6 +732,7 @@ def fit( inlier_mask: NDArrayf | Mask | None = None, transform: rio.transform.Affine | None = None, crs: rio.crs.CRS | None = None, + bias_vars: dict[str, NDArrayf | MArrayf | RasterType] | None = None, weights: NDArrayf | None = None, subsample: float | int = 1.0, verbose: bool = False, @@ -744,6 +747,7 @@ def fit( :param inlier_mask: Optional. 2D boolean array of areas to include in the analysis (inliers=True). :param transform: Optional. Transform of the reference_dem. Mandatory if DEM provided as array. :param crs: Optional. CRS of the reference_dem. Mandatory if DEM provided as array. + :param bias_vars: Optional, only for some bias correction classes. 2D array of bias variables used. :param weights: Optional. Per-pixel weights for the coregistration. :param subsample: Subsample the input to increase performance. <1 is parsed as a fraction. >1 is a pixel count. :param verbose: Print progress messages to stdout. @@ -764,15 +768,23 @@ def fit( random_state=random_state, ) + main_args = {"ref_dem": ref_dem, "tba_dem": tba_dem, "transform": transform, "crs": crs, "weights": weights, + "verbose": verbose, "random_state": random_state} + + # If bias_vars are defined, update dictionary content to array + if bias_vars is not None: + # Check if the current class actually requires bias_vars + if self._is_affine: + warnings.warn("This coregistration method is affine, ignoring `bias_vars` passed to fit().") + + for var in bias_vars.keys(): + bias_vars[var] = gu.raster.get_array_and_mask(bias_vars[var])[0] + + main_args.update({"bias_vars": bias_vars}) + # Run the associated fitting function self._fit_func( - ref_dem=ref_dem, - tba_dem=tba_dem, - transform=transform, - crs=crs, - weights=weights, - verbose=verbose, - random_state=random_state, + **main_args, **kwargs, ) @@ -984,6 +996,7 @@ def apply( dem: MArrayf, transform: rio.transform.Affine | None = None, crs: rio.crs.CRS | None = None, + bias_vars: dict[str, NDArrayf | MArrayf | RasterType] | None = None, resample: bool = True, **kwargs: Any, ) -> tuple[MArrayf, rio.transform.Affine]: @@ -995,6 +1008,7 @@ def apply( dem: NDArrayf, transform: rio.transform.Affine | None = None, crs: rio.crs.CRS | None = None, + bias_vars: dict[str, NDArrayf | MArrayf | RasterType] | None = None, resample: bool = True, **kwargs: Any, ) -> tuple[NDArrayf, rio.transform.Affine]: @@ -1006,6 +1020,7 @@ def apply( dem: RasterType, transform: rio.transform.Affine | None = None, crs: rio.crs.CRS | None = None, + bias_vars: dict[str, NDArrayf | MArrayf | RasterType] | None = None, resample: bool = True, **kwargs: Any, ) -> RasterType: @@ -1016,6 +1031,7 @@ def apply( dem: RasterType | NDArrayf | MArrayf, transform: rio.transform.Affine | None = None, crs: rio.crs.CRS | None = None, + bias_vars: dict[str, NDArrayf | MArrayf | RasterType] | None = None, resample: bool = True, **kwargs: Any, ) -> RasterType | tuple[NDArrayf, rio.transform.Affine] | tuple[MArrayf, rio.transform.Affine]: @@ -1025,6 +1041,7 @@ def apply( :param dem: A DEM array or Raster to apply the transform on. :param transform: Optional. The transform object of the DEM. Mandatory if 'dem' provided as array. :param crs: Optional. CRS of the reference_dem. Mandatory if 'dem' provided as array. + :param bias_vars: Optional, only for some bias correction classes. 2D array of bias variables used. :param resample: If set to True, will reproject output Raster on the same grid as input. Otherwise, \ only the transform might be updated and no resampling is done. :param kwargs: Any optional arguments to be passed to either self._apply_func or apply_matrix. @@ -1058,15 +1075,27 @@ def apply( if np.all(dem_mask): raise ValueError("'dem' had only NaNs") + main_args = {"dem": dem_array, "transform": transform, "crs": crs} + + # If bias_vars are defined, update dictionary content to array + if bias_vars is not None: + # Check if the current class actually requires bias_vars + if self._is_affine: + warnings.warn("This coregistration method is affine, ignoring `bias_vars` passed to apply().") + + for var in bias_vars.keys(): + bias_vars[var] = gu.raster.get_array_and_mask(bias_vars[var])[0] + + main_args.update({"bias_vars": bias_vars}) + # See if a _apply_func exists try: # arg `resample` must be passed to _apply_func, otherwise will be overwritten in CoregPipeline kwargs["resample"] = resample # Run the associated apply function - applied_dem, out_transform = self._apply_func( - dem_array, transform, crs, **kwargs - ) # pylint: disable=assignment-from-no-return + applied_dem, out_transform = self._apply_func(**main_args, **kwargs + ) # pylint: disable=assignment-from-no-return # If it doesn't exist, use apply_matrix() except NotImplementedError: @@ -1272,6 +1301,7 @@ def _fit_func( transform: rio.transform.Affine, crs: rio.crs.CRS, weights: NDArrayf | None, + bias_vars: dict[str, NDArrayf] | None = None, verbose: bool = False, **kwargs: Any, ) -> None: @@ -1279,7 +1309,9 @@ def _fit_func( raise NotImplementedError("This step has to be implemented by subclassing.") def _apply_func( - self, dem: NDArrayf, transform: rio.transform.Affine, crs: rio.crs.CRS, **kwargs: Any + self, dem: NDArrayf, transform: rio.transform.Affine, crs: rio.crs.CRS, + bias_vars: dict[str, NDArrayf] | None = None, + **kwargs: Any ) -> tuple[NDArrayf, rio.transform.Affine]: # FOR DEVELOPERS: This function is only needed for non-rigid transforms. raise NotImplementedError("This should have been implemented by subclassing") @@ -1316,6 +1348,42 @@ def copy(self: CoregType) -> CoregType: return new_coreg + def _parse_bias_vars(self, step: int, bias_vars: dict[str, NDArrayf] | None): + """Parse bias variables for a pipeline step requiring them.""" + + # Get number of non-affine coregistration requiring bias variables to be passed + nb_needs_vars = sum(c._needs_vars for c in self.pipeline) + + # Get step object + coreg = self.pipeline[step] + + # Check that all variable names of this were passed + var_names = coreg._meta["bias_var_names"] + + # Raise error if bias_vars is None + if bias_vars is None: + msg = "No `bias_vars` passed to .fit() for bias correction step {} of the pipeline.".format(coreg.__class__) + if nb_needs_vars > 1: + msg += " As you are using several bias correction steps requiring `bias_vars`, don't forget to " \ + "explicitly define their `bias_var_names` during " \ + "instantiation, e.g. {}(bias_var_names=['slope']).".format(coreg.__class__.__name__) + raise ValueError(msg) + + # Raise error if no variable were explicitly assigned and there is more than 1 step with bias_vars + if var_names is None and nb_needs_vars > 1: + raise ValueError("When using several bias correction steps requiring `bias_vars` in a pipeline," + "the `bias_var_names` need to be explicitly defined at each step's " + "instantiation, e.g. {}(bias_var_names=['slope']).".format(coreg.__class__.__name__)) + + # Raise error if the variables explicitly assigned don't match the ones passed in bias_vars + if not all(n in bias_vars.keys() for n in var_names): + raise ValueError("Not all keys of `bias_vars` in .fit() match the `bias_var_names` defined during " + "instantiation of the bias correction step {}: {}.".format(coreg.__class__, var_names)) + + # Add subset dict for this pipeline step to args of fit and apply + return {n: bias_vars[n] for n in var_names} + + def _fit_func( self, ref_dem: NDArrayf, @@ -1323,6 +1391,7 @@ def _fit_func( transform: rio.transform.Affine, crs: rio.crs.CRS, weights: NDArrayf | None, + bias_vars: dict[str, NDArrayf] | None = None, verbose: bool = False, **kwargs: Any, ) -> None: @@ -1332,11 +1401,23 @@ def _fit_func( for i, coreg in enumerate(self.pipeline): if verbose: print(f"Running pipeline step: {i + 1} / {len(self.pipeline)}") - coreg._fit_func(ref_dem, tba_dem_mod, transform=transform, crs=crs, weights=weights, verbose=verbose) + + main_args_fit = {"ref_dem": ref_dem, "tba_dem": tba_dem_mod, "transform": transform, "crs": crs, + "weights": weights, "verbose": verbose} + + main_args_apply = {"dem": tba_dem_mod, "transform": transform, "crs": crs} + + # If non-affine method that expects a bias_vars argument + if coreg._needs_vars: + step_bias_vars = self._parse_bias_vars(step=i, bias_vars=bias_vars) + + main_args_fit.update({"bias_vars": step_bias_vars}) + main_args_apply.update({"bias_vars": step_bias_vars}) + + coreg._fit_func(**main_args_fit) coreg._fit_called = True - # TODO: shouldn't this call _apply_func directly? - tba_dem_mod, out_transform = coreg.apply(tba_dem_mod, transform=transform, crs=crs) + tba_dem_mod, out_transform = coreg.apply(**main_args_apply) def _fit_pts_func( self: CoregType, @@ -1359,13 +1440,27 @@ def _fit_pts_func( return self def _apply_func( - self, dem: NDArrayf, transform: rio.transform.Affine, crs: rio.crs.CRS, **kwargs: Any + self, + dem: NDArrayf, + transform: rio.transform.Affine, + crs: rio.crs.CRS, + bias_vars: dict[str, NDArrayf] | None = None, + **kwargs: Any ) -> tuple[NDArrayf, rio.transform.Affine]: """Apply the coregistration steps sequentially to a DEM.""" dem_mod = dem.copy() out_transform = copy.copy(transform) - for coreg in self.pipeline: - dem_mod, out_transform = coreg.apply(dem_mod, transform=out_transform, crs=crs, **kwargs) + + for i, coreg in enumerate(self.pipeline): + + main_args_apply = {"dem": dem_mod, "transform": out_transform, "crs": crs} + + # If non-affine method that expects a bias_vars argument + if coreg._needs_vars: + step_bias_vars = self._parse_bias_vars(step=i, bias_vars=bias_vars) + main_args_apply.update({"bias_vars": step_bias_vars}) + + dem_mod, out_transform = coreg.apply(**main_args_apply, **kwargs) return dem_mod, out_transform diff --git a/xdem/coreg/biascorr.py b/xdem/coreg/biascorr.py index eeee6ff3..7bbe2db2 100644 --- a/xdem/coreg/biascorr.py +++ b/xdem/coreg/biascorr.py @@ -2,19 +2,17 @@ from __future__ import annotations import inspect -from typing import Any, Callable, Iterable, Literal +from typing import Any, Callable, Iterable, Literal, TypeVar import geoutils as gu import numpy as np import pandas as pd import rasterio as rio import scipy -from geoutils import Mask -from geoutils.raster import RasterType import xdem.spatialstats -from xdem._typing import MArrayf, NDArrayf -from xdem.coreg.base import Coreg, CoregType +from xdem._typing import NDArrayf +from xdem.coreg.base import Coreg from xdem.fit import ( polynomial_1d, polynomial_2d, @@ -28,6 +26,7 @@ "nfreq_sumsin": {"func": sumsin_1d, "optimizer": robust_nfreq_sumsin_fit}, } +BiasCorrType = TypeVar("BiasCorrType", bound="BiasCorr") class BiasCorr(Coreg): """ @@ -129,66 +128,7 @@ def __init__( # Update attributes self._fit_or_bin = fit_or_bin self._is_affine = False - - def fit( # type: ignore - self: CoregType, - reference_dem: NDArrayf | MArrayf | RasterType, - dem_to_be_aligned: NDArrayf | MArrayf | RasterType, - bias_vars: dict[str, NDArrayf | MArrayf | RasterType] | None = None, # None if subclass derives biasvar itself - inlier_mask: NDArrayf | Mask | None = None, - transform: rio.transform.Affine | None = None, - crs: rio.crs.CRS | None = None, - weights: NDArrayf | None = None, - subsample: float | int = 1.0, - verbose: bool = False, - random_state: None | np.random.RandomState | np.random.Generator | int = None, - **kwargs: Any, - ) -> CoregType: - - # Change dictionary content to array - if bias_vars is not None: - for var in bias_vars.keys(): - bias_vars[var] = gu.raster.get_array_and_mask(bias_vars[var])[0] - - # Call parent fit to do the pre-processing and return itself - return super().fit( # type: ignore - reference_dem=reference_dem, - dem_to_be_aligned=dem_to_be_aligned, - inlier_mask=inlier_mask, - transform=transform, - crs=crs, - weights=weights, - subsample=subsample, - verbose=verbose, - random_state=random_state, - bias_vars=bias_vars, - **kwargs, - ) - - def apply( # type: ignore - self, - dem: RasterType | NDArrayf | MArrayf, - bias_vars: dict[str, NDArrayf | MArrayf | RasterType] | None = None, - transform: rio.transform.Affine | None = None, - crs: rio.crs.CRS | None = None, - resample: bool = True, - **kwargs: Any, - ) -> tuple[RasterType | NDArrayf | MArrayf, rio.transform.Affine]: - - # Change dictionary content to array - if bias_vars is not None: - for var in bias_vars.keys(): - bias_vars[var] = gu.raster.get_array_and_mask(bias_vars[var])[0] - - # Call parent fit to do the pre-processing and return itself - return super().apply( - dem=dem, - transform=transform, - crs=crs, - resample=resample, - bias_vars=bias_vars, - **kwargs, - ) + self._needs_vars = True def _fit_func( # type: ignore self, @@ -625,6 +565,7 @@ def __init__( """ super().__init__(fit_or_bin, fit_func, fit_optimizer, bin_sizes, bin_statistic, bin_apply_method, ["angle"]) self._meta["angle"] = angle + self._needs_vars = False def _fit_func( # type: ignore self, @@ -722,6 +663,7 @@ def __init__( super().__init__(fit_or_bin, fit_func, fit_optimizer, bin_sizes, bin_statistic, bin_apply_method, [terrain_attribute]) # This is the same as bias_var_names, but let's leave the duplicate for clarity self._meta["terrain_attribute"] = terrain_attribute + self._needs_vars = False def _fit_func( # type: ignore self, @@ -736,7 +678,7 @@ def _fit_func( # type: ignore ) -> None: # Derive terrain attribute - if self._meta["bias_var_names"] == "elevation": + if self._meta["terrain_attribute"] == "elevation": attr = ref_dem else: attr = xdem.terrain.get_terrain_attribute( @@ -766,13 +708,13 @@ def _apply_func( if bias_vars is None: # Derive terrain attribute - if self._meta["bias_var_names"] == "elevation": + if self._meta["terrain_attribute"] == "elevation": attr = dem else: attr = xdem.terrain.get_terrain_attribute( - dem=dem, attribute=self._meta["bias_var_names"], resolution=(transform[0], abs(transform[4])) + dem=dem, attribute=self._meta["terrain_attribute"], resolution=(transform[0], abs(transform[4])) ) - bias_vars = {self._meta["bias_var_names"]: attr} + bias_vars = {self._meta["terrain_attribute"]: attr} return super()._apply_func(dem=dem, transform=transform, crs=crs, bias_vars=bias_vars, **kwargs) @@ -807,6 +749,7 @@ def __init__( """ super().__init__(fit_or_bin, fit_func, fit_optimizer, bin_sizes, bin_statistic, bin_apply_method, ["xx", "yy"]) self._meta["poly_order"] = poly_order + self._needs_vars = False def _fit_func( # type: ignore self, From 8b08c0df2a12e73161f463e27f23d682b2ffcd15 Mon Sep 17 00:00:00 2001 From: Romain Hugonnet Date: Fri, 1 Sep 2023 18:47:18 -0800 Subject: [PATCH 6/7] Linting --- tests/test_coreg/test_base.py | 79 ++++++++++++++++++++++-------- tests/test_coreg/test_biascorr.py | 15 +++--- xdem/coreg/affine.py | 34 +++++++++++-- xdem/coreg/base.py | 80 +++++++++++++++++++++---------- xdem/coreg/biascorr.py | 43 ++++++++++++----- 5 files changed, 182 insertions(+), 69 deletions(-) diff --git a/tests/test_coreg/test_base.py b/tests/test_coreg/test_base.py index e84d566c..c403b98a 100644 --- a/tests/test_coreg/test_base.py +++ b/tests/test_coreg/test_base.py @@ -48,7 +48,7 @@ class TestCoregClass: # Create some 3D coordinates with Z coordinates being 0 to try the apply_pts functions. points = np.array([[1, 2, 3, 4], [1, 2, 3, 4], [0, 0, 0, 0]], dtype="float64").T - def test_init(self): + def test_init(self) -> None: """Test instantiation of Coreg""" c = coreg.Coreg() @@ -465,7 +465,14 @@ def test_pipeline(self) -> None: # Assert that the combined vertical shift is 2 assert pipeline2.to_matrix()[2, 3] == 2.0 - all_coregs = [coreg.VerticalShift(), coreg.NuthKaab(), coreg.ICP(), coreg.Deramp(), coreg.TerrainBias(), coreg.DirectionalBias()] + all_coregs = [ + coreg.VerticalShift(), + coreg.NuthKaab(), + coreg.ICP(), + coreg.Deramp(), + coreg.TerrainBias(), + coreg.DirectionalBias(), + ] @pytest.mark.parametrize("coreg1", all_coregs) # type: ignore @pytest.mark.parametrize("coreg2", all_coregs) # type: ignore @@ -479,12 +486,23 @@ def test_pipeline_combinations__nobiasvar(self, coreg1: Coreg, coreg2: Coreg) -> aligned_dem, _ = pipeline.apply(self.tba.data, transform=self.ref.transform, crs=self.ref.crs) assert aligned_dem.shape == self.ref.data.squeeze().shape - all_coregs = [coreg.VerticalShift(), coreg.NuthKaab(), coreg.ICP(), coreg.Deramp(), coreg.TerrainBias(), - coreg.DirectionalBias()] + all_coregs = [ + coreg.VerticalShift(), + coreg.NuthKaab(), + coreg.ICP(), + coreg.Deramp(), + coreg.TerrainBias(), + coreg.DirectionalBias(), + ] @pytest.mark.parametrize("coreg1", all_coregs) # type: ignore - @pytest.mark.parametrize("coreg2", [coreg.BiasCorr1D(bias_var_names=["slope"], fit_or_bin="bin"), - coreg.BiasCorr2D(bias_var_names=["slope", "aspect"], fit_or_bin="bin")]) # type: ignore + @pytest.mark.parametrize( + "coreg2", + [ + coreg.BiasCorr1D(bias_var_names=["slope"], fit_or_bin="bin"), + coreg.BiasCorr2D(bias_var_names=["slope", "aspect"], fit_or_bin="bin"), + ], + ) # type: ignore def test_pipeline_combinations__biasvar(self, coreg1: Coreg, coreg2: Coreg) -> None: """Test pipelines with all combinations of coregistration subclasses with bias variables""" @@ -493,33 +511,54 @@ def test_pipeline_combinations__biasvar(self, coreg1: Coreg, coreg2: Coreg) -> N bias_vars = {"slope": xdem.terrain.slope(self.ref), "aspect": xdem.terrain.aspect(self.ref)} pipeline.fit(**self.fit_params, bias_vars=bias_vars) - aligned_dem, _ = pipeline.apply(self.tba.data, transform=self.ref.transform, crs=self.ref.crs, bias_vars=bias_vars) + aligned_dem, _ = pipeline.apply( + self.tba.data, transform=self.ref.transform, crs=self.ref.crs, bias_vars=bias_vars + ) assert aligned_dem.shape == self.ref.data.squeeze().shape - def test_pipeline__errors(self): + def test_pipeline__errors(self) -> None: """Test pipeline raises proper errors.""" pipeline = coreg.CoregPipeline([coreg.NuthKaab(), coreg.BiasCorr1D()]) - with pytest.raises(ValueError, match=re.escape("No `bias_vars` passed to .fit() for bias correction step " - " of the pipeline.")): + with pytest.raises( + ValueError, + match=re.escape( + "No `bias_vars` passed to .fit() for bias correction step " + " of the pipeline." + ), + ): pipeline.fit(**self.fit_params) - pipeline2 = coreg.CoregPipeline([coreg.NuthKaab(), coreg.BiasCorr1D(), coreg.BiasCorr1D()]) - with pytest.raises(ValueError, match=re.escape("No `bias_vars` passed to .fit() for bias correction step " - "of the pipeline. As you are using several bias correction steps requiring" - " `bias_vars`, don't forget to explicitly define their `bias_var_names` " - "during instantiation, e.g. BiasCorr1D(bias_var_names=['slope']).")): + with pytest.raises( + ValueError, + match=re.escape( + "No `bias_vars` passed to .fit() for bias correction step " + "of the pipeline. As you are using several bias correction steps requiring" + " `bias_vars`, don't forget to explicitly define their `bias_var_names` " + "during instantiation, e.g. BiasCorr1D(bias_var_names=['slope'])." + ), + ): pipeline2.fit(**self.fit_params) - with pytest.raises(ValueError, match=re.escape("When using several bias correction steps requiring `bias_vars` in a pipeline," - "the `bias_var_names` need to be explicitly defined at each step's " - "instantiation, e.g. BiasCorr1D(bias_var_names=['slope']).")): + with pytest.raises( + ValueError, + match=re.escape( + "When using several bias correction steps requiring `bias_vars` in a pipeline," + "the `bias_var_names` need to be explicitly defined at each step's " + "instantiation, e.g. BiasCorr1D(bias_var_names=['slope'])." + ), + ): pipeline2.fit(**self.fit_params, bias_vars={"slope": xdem.terrain.slope(self.ref)}) pipeline3 = coreg.CoregPipeline([coreg.NuthKaab(), coreg.BiasCorr1D(bias_var_names=["slope"])]) - with pytest.raises(ValueError, match=re.escape("Not all keys of `bias_vars` in .fit() match the `bias_var_names` defined during " - "instantiation of the bias correction step : ['slope'].")): + with pytest.raises( + ValueError, + match=re.escape( + "Not all keys of `bias_vars` in .fit() match the `bias_var_names` defined during " + "instantiation of the bias correction step : ['slope']." + ), + ): pipeline3.fit(**self.fit_params, bias_vars={"ncc": xdem.terrain.slope(self.ref)}) def test_pipeline_pts(self) -> None: diff --git a/tests/test_coreg/test_biascorr.py b/tests/test_coreg/test_biascorr.py index 372cf6dc..f9538fe9 100644 --- a/tests/test_coreg/test_biascorr.py +++ b/tests/test_coreg/test_biascorr.py @@ -87,7 +87,6 @@ def test_biascorr(self) -> None: bcorr5 = biascorr.BiasCorr(bias_var_names=np.array(["slope", "ncc"])) assert bcorr5._meta["bias_var_names"] == ["slope", "ncc"] - def test_biascorr__errors(self) -> None: """Test the errors that should be raised by BiasCorr.""" @@ -354,8 +353,10 @@ def test_biascorr1d(self) -> None: # Raise error when variables don't match with pytest.raises( - ValueError, match=re.escape("The keys of `bias_vars` do not match the `bias_var_names` defined during " - "instantiation: ['ncc'].") + ValueError, + match=re.escape( + "The keys of `bias_vars` do not match the `bias_var_names` defined during " "instantiation: ['ncc']." + ), ): bcorr1d2 = biascorr.BiasCorr1D(bias_var_names=["ncc"]) bias_vars_dict = {"elevation": self.ref} @@ -391,8 +392,11 @@ def test_biascorr2d(self) -> None: # Raise error when variables don't match with pytest.raises( - ValueError, match=re.escape("The keys of `bias_vars` do not match the `bias_var_names` defined during " - "instantiation: ['elevation', 'ncc'].") + ValueError, + match=re.escape( + "The keys of `bias_vars` do not match the `bias_var_names` defined during " + "instantiation: ['elevation', 'ncc']." + ), ): bcorr2d2 = biascorr.BiasCorr2D(bias_var_names=["elevation", "ncc"]) bias_vars_dict = {"elevation": self.ref, "slope": xdem.terrain.slope(self.ref)} @@ -539,7 +543,6 @@ def test_terrainbias(self) -> None: assert tb._meta["bias_var_names"] == ["maximum_curvature"] - def test_terrainbias__synthetic(self) -> None: """Test the subclass TerrainBias.""" diff --git a/xdem/coreg/affine.py b/xdem/coreg/affine.py index 936a59f1..d9f09613 100644 --- a/xdem/coreg/affine.py +++ b/xdem/coreg/affine.py @@ -298,6 +298,7 @@ def _fit_func( transform: rio.transform.Affine, crs: rio.crs.CRS, weights: NDArrayf | None, + bias_vars: dict[str, NDArrayf] | None = None, verbose: bool = False, **kwargs: Any, ) -> None: @@ -305,7 +306,12 @@ def _fit_func( raise NotImplementedError("This step has to be implemented by subclassing.") def _apply_func( - self, dem: NDArrayf, transform: rio.transform.Affine, crs: rio.crs.CRS, **kwargs: Any + self, + dem: NDArrayf, + transform: rio.transform.Affine, + crs: rio.crs.CRS, + bias_vars: dict[str, NDArrayf] | None = None, + **kwargs: Any, ) -> tuple[NDArrayf, rio.transform.Affine]: # FOR DEVELOPERS: This function is only needed for non-rigid transforms. raise NotImplementedError("This should have been implemented by subclassing") @@ -340,6 +346,7 @@ def _fit_func( transform: rio.transform.Affine, crs: rio.crs.CRS, weights: NDArrayf | None, + bias_vars: dict[str, NDArrayf] | None = None, verbose: bool = False, **kwargs: Any, ) -> None: @@ -368,7 +375,12 @@ def _fit_func( self._meta["vshift"] = vshift def _apply_func( - self, dem: NDArrayf, transform: rio.transform.Affine, crs: rio.crs.CRS, **kwargs: Any + self, + dem: NDArrayf, + transform: rio.transform.Affine, + crs: rio.crs.CRS, + bias_vars: dict[str, NDArrayf] | None = None, + **kwargs: Any, ) -> tuple[NDArrayf, rio.transform.Affine]: """Apply the VerticalShift function to a DEM.""" return dem + self._meta["vshift"], transform @@ -426,6 +438,7 @@ def _fit_func( transform: rio.transform.Affine, crs: rio.crs.CRS, weights: NDArrayf | None, + bias_vars: dict[str, NDArrayf] | None = None, verbose: bool = False, **kwargs: Any, ) -> None: @@ -575,6 +588,7 @@ def _fit_func( transform: rio.transform.Affine, crs: rio.crs.CRS, weights: NDArrayf | None, + bias_vars: dict[str, NDArrayf] | None = None, verbose: bool = False, **kwargs: Any, ) -> None: @@ -589,7 +603,12 @@ def _fit_func( self._meta["func"] = fit_ramp def _apply_func( - self, dem: NDArrayf, transform: rio.transform.Affine, crs: rio.crs.CRS, **kwargs: Any + self, + dem: NDArrayf, + transform: rio.transform.Affine, + crs: rio.crs.CRS, + bias_vars: dict[str, NDArrayf] | None = None, + **kwargs: Any, ) -> tuple[NDArrayf, rio.transform.Affine]: """Apply the deramp function to a DEM.""" x_coords, y_coords = _get_x_and_y_coords(dem.shape, transform) @@ -652,6 +671,7 @@ def _fit_func( transform: rio.transform.Affine, crs: rio.crs.CRS, weights: NDArrayf | None, + bias_vars: dict[str, NDArrayf] | None = None, verbose: bool = False, **kwargs: Any, ) -> None: @@ -928,7 +948,12 @@ def _to_matrix_func(self) -> NDArrayf: return matrix def _apply_func( - self, dem: NDArrayf, transform: rio.transform.Affine, crs: rio.crs.CRS, **kwargs: Any + self, + dem: NDArrayf, + transform: rio.transform.Affine, + crs: rio.crs.CRS, + bias_vars: dict[str, NDArrayf] | None = None, + **kwargs: Any, ) -> tuple[NDArrayf, rio.transform.Affine]: """Apply the Nuth & Kaab shift to a DEM.""" offset_east = self._meta["offset_east_px"] * self._meta["resolution"] @@ -1082,6 +1107,7 @@ def _fit_func( transform: rio.transform.Affine, crs: rio.crs.CRS, weights: NDArrayf | None, + bias_vars: dict[str, NDArrayf] | None = None, verbose: bool = False, **kwargs: Any, ) -> None: diff --git a/xdem/coreg/base.py b/xdem/coreg/base.py index 66f945cf..13b3c4f0 100644 --- a/xdem/coreg/base.py +++ b/xdem/coreg/base.py @@ -768,8 +768,15 @@ def fit( random_state=random_state, ) - main_args = {"ref_dem": ref_dem, "tba_dem": tba_dem, "transform": transform, "crs": crs, "weights": weights, - "verbose": verbose, "random_state": random_state} + main_args = { + "ref_dem": ref_dem, + "tba_dem": tba_dem, + "transform": transform, + "crs": crs, + "weights": weights, + "verbose": verbose, + "random_state": random_state, + } # If bias_vars are defined, update dictionary content to array if bias_vars is not None: @@ -1094,8 +1101,9 @@ def apply( kwargs["resample"] = resample # Run the associated apply function - applied_dem, out_transform = self._apply_func(**main_args, **kwargs - ) # pylint: disable=assignment-from-no-return + applied_dem, out_transform = self._apply_func( + **main_args, **kwargs + ) # pylint: disable=assignment-from-no-return # If it doesn't exist, use apply_matrix() except NotImplementedError: @@ -1309,9 +1317,12 @@ def _fit_func( raise NotImplementedError("This step has to be implemented by subclassing.") def _apply_func( - self, dem: NDArrayf, transform: rio.transform.Affine, crs: rio.crs.CRS, - bias_vars: dict[str, NDArrayf] | None = None, - **kwargs: Any + self, + dem: NDArrayf, + transform: rio.transform.Affine, + crs: rio.crs.CRS, + bias_vars: dict[str, NDArrayf] | None = None, + **kwargs: Any, ) -> tuple[NDArrayf, rio.transform.Affine]: # FOR DEVELOPERS: This function is only needed for non-rigid transforms. raise NotImplementedError("This should have been implemented by subclassing") @@ -1348,7 +1359,7 @@ def copy(self: CoregType) -> CoregType: return new_coreg - def _parse_bias_vars(self, step: int, bias_vars: dict[str, NDArrayf] | None): + def _parse_bias_vars(self, step: int, bias_vars: dict[str, NDArrayf] | None) -> dict[str, NDArrayf]: """Parse bias variables for a pipeline step requiring them.""" # Get number of non-affine coregistration requiring bias variables to be passed @@ -1362,28 +1373,33 @@ def _parse_bias_vars(self, step: int, bias_vars: dict[str, NDArrayf] | None): # Raise error if bias_vars is None if bias_vars is None: - msg = "No `bias_vars` passed to .fit() for bias correction step {} of the pipeline.".format(coreg.__class__) + msg = f"No `bias_vars` passed to .fit() for bias correction step {coreg.__class__} of the pipeline." if nb_needs_vars > 1: - msg += " As you are using several bias correction steps requiring `bias_vars`, don't forget to " \ - "explicitly define their `bias_var_names` during " \ - "instantiation, e.g. {}(bias_var_names=['slope']).".format(coreg.__class__.__name__) + msg += ( + " As you are using several bias correction steps requiring `bias_vars`, don't forget to " + "explicitly define their `bias_var_names` during " + "instantiation, e.g. {}(bias_var_names=['slope']).".format(coreg.__class__.__name__) + ) raise ValueError(msg) # Raise error if no variable were explicitly assigned and there is more than 1 step with bias_vars if var_names is None and nb_needs_vars > 1: - raise ValueError("When using several bias correction steps requiring `bias_vars` in a pipeline," - "the `bias_var_names` need to be explicitly defined at each step's " - "instantiation, e.g. {}(bias_var_names=['slope']).".format(coreg.__class__.__name__)) + raise ValueError( + "When using several bias correction steps requiring `bias_vars` in a pipeline," + "the `bias_var_names` need to be explicitly defined at each step's " + "instantiation, e.g. {}(bias_var_names=['slope']).".format(coreg.__class__.__name__) + ) # Raise error if the variables explicitly assigned don't match the ones passed in bias_vars if not all(n in bias_vars.keys() for n in var_names): - raise ValueError("Not all keys of `bias_vars` in .fit() match the `bias_var_names` defined during " - "instantiation of the bias correction step {}: {}.".format(coreg.__class__, var_names)) + raise ValueError( + "Not all keys of `bias_vars` in .fit() match the `bias_var_names` defined during " + "instantiation of the bias correction step {}: {}.".format(coreg.__class__, var_names) + ) # Add subset dict for this pipeline step to args of fit and apply return {n: bias_vars[n] for n in var_names} - def _fit_func( self, ref_dem: NDArrayf, @@ -1402,8 +1418,14 @@ def _fit_func( if verbose: print(f"Running pipeline step: {i + 1} / {len(self.pipeline)}") - main_args_fit = {"ref_dem": ref_dem, "tba_dem": tba_dem_mod, "transform": transform, "crs": crs, - "weights": weights, "verbose": verbose} + main_args_fit = { + "ref_dem": ref_dem, + "tba_dem": tba_dem_mod, + "transform": transform, + "crs": crs, + "weights": weights, + "verbose": verbose, + } main_args_apply = {"dem": tba_dem_mod, "transform": transform, "crs": crs} @@ -1441,11 +1463,11 @@ def _fit_pts_func( def _apply_func( self, - dem: NDArrayf, - transform: rio.transform.Affine, - crs: rio.crs.CRS, - bias_vars: dict[str, NDArrayf] | None = None, - **kwargs: Any + dem: NDArrayf, + transform: rio.transform.Affine, + crs: rio.crs.CRS, + bias_vars: dict[str, NDArrayf] | None = None, + **kwargs: Any, ) -> tuple[NDArrayf, rio.transform.Affine]: """Apply the coregistration steps sequentially to a DEM.""" dem_mod = dem.copy() @@ -1560,6 +1582,7 @@ def _fit_func( transform: rio.transform.Affine, crs: rio.crs.CRS, weights: NDArrayf | None, + bias_vars: dict[str, NDArrayf] | None = None, verbose: bool = False, **kwargs: Any, ) -> None: @@ -1801,7 +1824,12 @@ def subdivide_array(self, shape: tuple[int, ...]) -> NDArrayf: return subdivide_array(shape, count=self.subdivision) def _apply_func( - self, dem: NDArrayf, transform: rio.transform.Affine, crs: rio.crs.CRS, **kwargs: Any + self, + dem: NDArrayf, + transform: rio.transform.Affine, + crs: rio.crs.CRS, + bias_vars: dict[str, NDArrayf] | None = None, + **kwargs: Any, ) -> tuple[NDArrayf, rio.transform.Affine]: if np.count_nonzero(np.isfinite(dem)) == 0: diff --git a/xdem/coreg/biascorr.py b/xdem/coreg/biascorr.py index 7bbe2db2..b312800c 100644 --- a/xdem/coreg/biascorr.py +++ b/xdem/coreg/biascorr.py @@ -28,6 +28,7 @@ BiasCorrType = TypeVar("BiasCorrType", bound="BiasCorr") + class BiasCorr(Coreg): """ Parent class of bias correction methods: non-rigid coregistrations. @@ -110,8 +111,12 @@ def __init__( # For binning elif fit_or_bin == "bin": - meta_bin = {"bin_sizes": bin_sizes, "bin_statistic": bin_statistic, "bin_apply_method": bin_apply_method, - "bias_var_names": list_bias_var_names} + meta_bin = { + "bin_sizes": bin_sizes, + "bin_statistic": bin_statistic, + "bin_apply_method": bin_apply_method, + "bias_var_names": list_bias_var_names, + } super().__init__(meta=meta_bin) # type: ignore # For both @@ -121,7 +126,7 @@ def __init__( "fit_optimizer": fit_optimizer, "bin_sizes": bin_sizes, "bin_statistic": bin_statistic, - "bias_var_names": list_bias_var_names + "bias_var_names": list_bias_var_names, } super().__init__(meta=meta_bin_and_fit) # type: ignore @@ -151,9 +156,11 @@ def _fit_func( # type: ignore # If bias var names were explicitly passed at instantiation, check that they match the one from the dict if self._meta["bias_var_names"] is not None: - if not sorted(list(bias_vars.keys())) == sorted(self._meta["bias_var_names"]): - raise ValueError("The keys of `bias_vars` do not match the `bias_var_names` defined during " - "instantiation: {}.".format(self._meta["bias_var_names"])) + if not sorted(bias_vars.keys()) == sorted(self._meta["bias_var_names"]): + raise ValueError( + "The keys of `bias_vars` do not match the `bias_var_names` defined during " + "instantiation: {}.".format(self._meta["bias_var_names"]) + ) # Otherwise, store bias variable names from the dictionary else: self._meta["bias_var_names"] = list(bias_vars.keys()) @@ -313,9 +320,11 @@ def _apply_func( # type: ignore raise ValueError("At least one `bias_var` should be passed to the `apply` function, got None.") # Check the bias_vars passed match the ones stored for this bias correction class - if not sorted(list(bias_vars.keys())) == sorted(self._meta["bias_var_names"]): - raise ValueError("The keys of `bias_vars` do not match the `bias_var_names` defined during " - "instantiation or fitting: {}.".format(self._meta["bias_var_names"])) + if not sorted(bias_vars.keys()) == sorted(self._meta["bias_var_names"]): + raise ValueError( + "The keys of `bias_vars` do not match the `bias_var_names` defined during " + "instantiation or fitting: {}.".format(self._meta["bias_var_names"]) + ) # Apply function to get correction (including if binning was done before) if self._fit_or_bin in ["fit", "bin_and_fit"]: @@ -380,7 +389,9 @@ def __init__( between bins, or "per_bin" to apply the statistic for each bin. :param bias_var_names: (Optional) For pipelines, explicitly define bias variables names to use during .fit(). """ - super().__init__(fit_or_bin, fit_func, fit_optimizer, bin_sizes, bin_statistic, bin_apply_method, bias_var_names) + super().__init__( + fit_or_bin, fit_func, fit_optimizer, bin_sizes, bin_statistic, bin_apply_method, bias_var_names + ) def _fit_func( # type: ignore self, @@ -442,7 +453,9 @@ def __init__( between bins, or "per_bin" to apply the statistic for each bin. :param bias_var_names: (Optional) For pipelines, explicitly define bias variables names to use during .fit(). """ - super().__init__(fit_or_bin, fit_func, fit_optimizer, bin_sizes, bin_statistic, bin_apply_method, bias_var_names) + super().__init__( + fit_or_bin, fit_func, fit_optimizer, bin_sizes, bin_statistic, bin_apply_method, bias_var_names + ) def _fit_func( # type: ignore self, @@ -505,7 +518,9 @@ def __init__( between bins, or "per_bin" to apply the statistic for each bin. :param bias_var_names: (Optional) For pipelines, explicitly define bias variables names to use during .fit(). """ - super().__init__(fit_or_bin, fit_func, fit_optimizer, bin_sizes, bin_statistic, bin_apply_method, bias_var_names) + super().__init__( + fit_or_bin, fit_func, fit_optimizer, bin_sizes, bin_statistic, bin_apply_method, bias_var_names + ) def _fit_func( # type: ignore self, @@ -660,7 +675,9 @@ def __init__( between bins, or "per_bin" to apply the statistic for each bin. """ - super().__init__(fit_or_bin, fit_func, fit_optimizer, bin_sizes, bin_statistic, bin_apply_method, [terrain_attribute]) + super().__init__( + fit_or_bin, fit_func, fit_optimizer, bin_sizes, bin_statistic, bin_apply_method, [terrain_attribute] + ) # This is the same as bias_var_names, but let's leave the duplicate for clarity self._meta["terrain_attribute"] = terrain_attribute self._needs_vars = False From 2a343b0481bdbe76251c0513e628c23acd7e9498 Mon Sep 17 00:00:00 2001 From: Romain Hugonnet Date: Fri, 1 Sep 2023 19:13:40 -0800 Subject: [PATCH 7/7] Import future annotations in new test modules --- tests/test_coreg/test_affine.py | 1 + tests/test_coreg/test_workflows.py | 1 + 2 files changed, 2 insertions(+) diff --git a/tests/test_coreg/test_affine.py b/tests/test_coreg/test_affine.py index 75fc74dd..cb9de107 100644 --- a/tests/test_coreg/test_affine.py +++ b/tests/test_coreg/test_affine.py @@ -1,4 +1,5 @@ """Functions to test the affine coregistrations.""" +from __future__ import annotations import copy import warnings diff --git a/tests/test_coreg/test_workflows.py b/tests/test_coreg/test_workflows.py index 7fe9a6ca..f95fbb4e 100644 --- a/tests/test_coreg/test_workflows.py +++ b/tests/test_coreg/test_workflows.py @@ -1,4 +1,5 @@ """Functions to test the coregistration workflows.""" +from __future__ import annotations import os import tempfile