diff --git a/openeo_processes_dask/process_implementations/cubes/__init__.py b/openeo_processes_dask/process_implementations/cubes/__init__.py index 164a5795..d18a1475 100644 --- a/openeo_processes_dask/process_implementations/cubes/__init__.py +++ b/openeo_processes_dask/process_implementations/cubes/__init__.py @@ -4,5 +4,6 @@ from .general import * from .indices import * from .load import * +from .mask_polygon import * from .merge import * from .reduce import * diff --git a/openeo_processes_dask/process_implementations/cubes/_filter.py b/openeo_processes_dask/process_implementations/cubes/_filter.py index 3cf5d4a0..7eb50f4c 100644 --- a/openeo_processes_dask/process_implementations/cubes/_filter.py +++ b/openeo_processes_dask/process_implementations/cubes/_filter.py @@ -1,13 +1,21 @@ +import json import logging import warnings from typing import Callable +import dask.array as da +import geopandas as gpd import numpy as np import pyproj +import rasterio import rioxarray +import shapely import xarray as xr from openeo_pg_parser_networkx.pg_schema import BoundingBox, TemporalInterval +from openeo_processes_dask.process_implementations.cubes.mask_polygon import ( + mask_polygon, +) from openeo_processes_dask.process_implementations.data_model import RasterCube from openeo_processes_dask.process_implementations.exceptions import ( BandFilterParameterMissing, @@ -16,9 +24,18 @@ TooManyDimensions, ) +DEFAULT_CRS = "EPSG:4326" + + logger = logging.getLogger(__name__) -__all__ = ["filter_labels", "filter_temporal", "filter_bands", "filter_bbox"] +__all__ = [ + "filter_labels", + "filter_temporal", + "filter_bands", + "filter_bbox", + "filter_spatial", +] def filter_temporal( @@ -102,6 +119,25 @@ def filter_bands(data: RasterCube, bands: list[str] = None) -> RasterCube: return data +def filter_spatial(data: RasterCube, geometries) -> RasterCube: + if "type" in geometries and geometries["type"] == "FeatureCollection": + gdf = gpd.GeoDataFrame.from_features(geometries, DEFAULT_CRS) + elif "type" in geometries and geometries["type"] in ["Polygon"]: + polygon = shapely.geometry.Polygon(geometries["coordinates"][0]) + gdf = gpd.GeoDataFrame(geometry=[polygon]) + gdf.crs = DEFAULT_CRS + + bbox = gdf.total_bounds + spatial_extent = BoundingBox( + west=bbox[0], east=bbox[2], south=bbox[1], north=bbox[3] + ) + + data = filter_bbox(data, spatial_extent) + data = mask_polygon(data, geometries) + + return data + + def filter_bbox(data: RasterCube, extent: BoundingBox) -> RasterCube: try: input_crs = str(data.rio.crs) diff --git a/openeo_processes_dask/process_implementations/cubes/mask_polygon.py b/openeo_processes_dask/process_implementations/cubes/mask_polygon.py new file mode 100644 index 00000000..5731b45a --- /dev/null +++ b/openeo_processes_dask/process_implementations/cubes/mask_polygon.py @@ -0,0 +1,165 @@ +import json +import logging +from typing import Any, Union + +import dask.array as da +import geopandas as gpd +import numpy as np +import rasterio +import rioxarray +import shapely +import xarray as xr +from xarray.core import dtypes + +from openeo_processes_dask.process_implementations.data_model import ( + RasterCube, + VectorCube, +) + +DEFAULT_CRS = "EPSG:4326" + + +logger = logging.getLogger(__name__) + +__all__ = [ + "mask_polygon", +] + + +def mask_polygon( + data: RasterCube, + mask: Union[VectorCube, str], + replacement: Any = dtypes.NA, + inside: bool = True, +) -> RasterCube: + y_dim = data.openeo.y_dim + x_dim = data.openeo.x_dim + t_dim = data.openeo.temporal_dims + b_dim = data.openeo.band_dims + + if len(t_dim) == 0: + t_dim = None + else: + t_dim = t_dim[0] + if len(b_dim) == 0: + b_dim = None + else: + b_dim = b_dim[0] + + y_dim_size = data.sizes[y_dim] + x_dim_size = data.sizes[x_dim] + + # Reproject vector data to match the raster data cube. + ## Get the CRS of data cube + try: + data_crs = str(data.rio.crs) + except Exception as e: + raise Exception(f"Not possible to estimate the input data projection! {e}") + + data = data.rio.set_crs(data_crs) + + ## Reproject vector data if the input vector data is Polygon or Multi Polygon + if "type" in mask and mask["type"] == "FeatureCollection": + geometries = gpd.GeoDataFrame.from_features(mask, DEFAULT_CRS) + geometries = geometries.to_crs(data_crs) + geometries = geometries.to_json() + geometries = json.loads(geometries) + elif "type" in mask and mask["type"] in ["Polygon"]: + polygon = shapely.geometry.Polygon(mask["coordinates"][0]) + geometries = gpd.GeoDataFrame(geometry=[polygon]) + geometries.crs = DEFAULT_CRS + geometries = geometries.to_crs(data_crs) + geometries = geometries.to_json() + geometries = json.loads(geometries) + else: + raise ValueError( + "Unsupported or missing geometry type. Expected 'Polygon' or 'FeatureCollection'." + ) + + data_dims = list(data.dims) + + # Get the Affine transformer + transform = data.rio.transform() + + # Initialize an empty mask + # Set the same chunk size as the input data + data_chunks = {} + chunks_shapes = data.chunks + for i, d in enumerate(data_dims): + data_chunks[d] = chunks_shapes[i][0] + + if data_dims.index(x_dim[0]) < data_dims.index(y_dim[0]): + final_mask = da.zeros( + (x_dim_size, y_dim_size), + chunks={x_dim: data_chunks[x_dim], y_dim: data_chunks[y_dim]}, + dtype=bool, + ) + + dask_out_shape = da.from_array( + (x_dim_size, y_dim_size), + chunks={x_dim: data_chunks[x_dim], y_dim: data_chunks[y_dim]}, + ) + else: + final_mask = da.zeros( + (y_dim_size, x_dim_size), + chunks={y_dim: data_chunks[y_dim], x_dim: data_chunks[x_dim]}, + dtype=bool, + ) + + dask_out_shape = da.from_array( + (y_dim_size, x_dim_size), + chunks={y_dim: data_chunks[y_dim], x_dim: data_chunks[x_dim]}, + ) + + # CHECK IF the input single polygon or multiple Polygons + if "type" in geometries and geometries["type"] == "FeatureCollection": + for feature in geometries["features"]: + polygon = shapely.geometry.Polygon(feature["geometry"]["coordinates"][0]) + + # Create a GeoSeries from the geometry + geo_series = gpd.GeoSeries(polygon) + + # Convert the GeoSeries to a GeometryArray + geometry_array = geo_series.geometry.array + + mask = da.map_blocks( + rasterio.features.geometry_mask, + geometry_array, + transform=transform, + out_shape=dask_out_shape, + dtype=bool, + invert=inside, + ) + final_mask |= mask + + elif "type" in geometries and geometries["type"] in ["Polygon"]: + polygon = shapely.geometry.Polygon(geometries["coordinates"][0]) + geo_series = gpd.GeoSeries(polygon) + + # Convert the GeoSeries to a GeometryArray + geometry_array = geo_series.geometry.array + mask = da.map_blocks( + rasterio.features.geometry_mask, + geometry_array, + transform=transform, + out_shape=dask_out_shape, + dtype=bool, + invert=inside, + ) + final_mask |= mask + + masked_dims = len(final_mask.shape) + + diff_axes = [] + for axis in range(len(data_dims)): + try: + if final_mask.shape[axis] != data.shape[axis]: + diff_axes.append(axis) + except: + if len(diff_axes) < (len(data_dims) - 2): + diff_axes.append(axis) + + final_mask = np.expand_dims(final_mask, axis=diff_axes) + filtered_ds = data.where(final_mask, other=replacement) + + return filtered_ds diff --git a/tests/conftest.py b/tests/conftest.py index a80b3231..f7dfcf73 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,5 +1,6 @@ import importlib import inspect +import json import logging import dask_geopandas @@ -14,6 +15,7 @@ BoundingBox, TemporalInterval, ) +from shapely.geometry import Polygon from openeo_processes_dask.process_implementations.core import process from openeo_processes_dask.process_implementations.data_model import VectorCube @@ -68,6 +70,29 @@ def bounding_box_small( return BoundingBox.parse_obj(spatial_extent) +@pytest.fixture +def polygon_geometry_small( + west=10.47, east=10.48, south=46.12, north=46.18, crs="EPSG:4326" +): + # Bounding box coordinates + west, east, south, north = 10.47, 10.48, 46.12, 46.18 + + # Create a small polygon + geometry = [ + Polygon( + [(west, south), (west, north), (east, north), (east, south), (west, south)] + ) + ] + + # Create a GeoDataFrame with a single polygon and default CRS 'wgs84' + gdf = gpd.GeoDataFrame(geometry=geometry, crs=crs) + + geometries = gdf.to_json() + geometries = json.loads(geometries) + + return geometries + + @pytest.fixture def temporal_interval(interval=["2018-05-01", "2018-06-01"]) -> TemporalInterval: return TemporalInterval.parse_obj(interval) diff --git a/tests/test_filter.py b/tests/test_filter.py index efb1ebba..cf7f840f 100644 --- a/tests/test_filter.py +++ b/tests/test_filter.py @@ -10,6 +10,7 @@ from openeo_processes_dask.process_implementations.cubes._filter import ( filter_bands, filter_bbox, + filter_spatial, filter_temporal, ) from openeo_processes_dask.process_implementations.cubes.reduce import reduce_dimension @@ -83,6 +84,28 @@ def test_filter_bands(temporal_interval, bounding_box, random_raster_data): assert output_cube["bands"].values == "SCL" +@pytest.mark.parametrize("size", [(30, 30, 30, 1)]) +@pytest.mark.parametrize("dtype", [np.uint8]) +def test_filter_spatial( + temporal_interval, + bounding_box, + random_raster_data, + polygon_geometry_small, +): + input_cube = create_fake_rastercube( + data=random_raster_data, + spatial_extent=bounding_box, + temporal_extent=temporal_interval, + bands=["B02"], + backend="dask", + ) + + output_cube = filter_spatial(data=input_cube, geometries=polygon_geometry_small) + + assert len(output_cube.y) < len(input_cube.y) + assert len(output_cube.x) < len(input_cube.x) + + @pytest.mark.parametrize("size", [(30, 30, 1, 1)]) @pytest.mark.parametrize("dtype", [np.uint8]) def test_filter_bbox( diff --git a/tests/test_mask.py b/tests/test_mask.py new file mode 100644 index 00000000..c5c6148e --- /dev/null +++ b/tests/test_mask.py @@ -0,0 +1,31 @@ +import numpy as np +import pytest +from openeo_pg_parser_networkx.pg_schema import TemporalInterval + +from openeo_processes_dask.process_implementations.cubes.mask_polygon import ( + mask_polygon, +) +from tests.mockdata import create_fake_rastercube + + +@pytest.mark.parametrize("size", [(30, 30, 30, 1)]) +@pytest.mark.parametrize("dtype", [np.uint8]) +def test_mask_polygon( + temporal_interval, + bounding_box, + random_raster_data, + polygon_geometry_small, +): + input_cube = create_fake_rastercube( + data=random_raster_data, + spatial_extent=bounding_box, + temporal_extent=temporal_interval, + bands=["B02"], + backend="dask", + ) + + output_cube = mask_polygon(data=input_cube, mask=polygon_geometry_small) + + assert np.isnan(output_cube).sum() > np.isnan(input_cube).sum() + assert len(output_cube.y) == len(input_cube.y) + assert len(output_cube.x) == len(input_cube.x)