diff --git a/AB_environments/AB_baseline.conda.yaml b/AB_environments/AB_baseline.conda.yaml index b45180263c..6a244303d9 100644 --- a/AB_environments/AB_baseline.conda.yaml +++ b/AB_environments/AB_baseline.conda.yaml @@ -49,6 +49,10 @@ dependencies: - h5netcdf ==1.3.0 - xesmf ==0.8.7 - bottleneck ==1.4.1 + - geojson ==3.1.0 + - planetary-computer ==1.0.0 + - pystac-client ==0.8.3 + - odc-stac ==0.3.10 # End copy-paste - pip: diff --git a/AB_environments/AB_sample.conda.yaml b/AB_environments/AB_sample.conda.yaml index f57f03188d..68c6246c75 100644 --- a/AB_environments/AB_sample.conda.yaml +++ b/AB_environments/AB_sample.conda.yaml @@ -55,6 +55,10 @@ dependencies: - h5netcdf ==1.3.0 - xesmf ==0.8.7 - bottleneck ==1.4.1 + - geojson ==3.1.0 + - planetary-computer ==1.0.0 + - pystac-client ==0.8.3 + - odc-stac ==0.3.10 # End copy-paste - pip: diff --git a/ci/environment-test.yml b/ci/environment-test.yml index 9f84f33f12..00ec234b71 100644 --- a/ci/environment-test.yml +++ b/ci/environment-test.yml @@ -13,6 +13,7 @@ dependencies: - pytest - pytest-timeout - pytest-xdist + - python-dotenv - pyyaml # TPC-H correctness test and DuckDB implementation # Can add duckdb back to conda install after: @@ -20,4 +21,4 @@ dependencies: # python-duckdb ==0.10.0 - pip - pip: - - duckdb==0.10.0 + - duckdb==0.10.0 \ No newline at end of file diff --git a/ci/environment.yml b/ci/environment.yml index 602fac5983..abc271b1e4 100644 --- a/ci/environment.yml +++ b/ci/environment.yml @@ -51,6 +51,10 @@ dependencies: - h5netcdf ==1.3.0 - xesmf ==0.8.7 - bottleneck ==1.4.1 + - geojson ==3.1.0 + - planetary-computer ==1.0.0 + - pystac-client ==0.8.3 + - odc-stac ==0.3.10 ######################################################## # PLEASE READ: diff --git a/tests/conftest.py b/tests/conftest.py index 1ffac3f800..95422dd6a8 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -14,6 +14,7 @@ import uuid from functools import lru_cache +import adlfs import dask import dask.array as da import dask_expr @@ -29,6 +30,7 @@ from dask.distributed import Client, WorkerPlugin from dask.distributed.diagnostics.memory_sampler import MemorySampler from dask.distributed.scheduler import logger as scheduler_logger +from dotenv import load_dotenv from packaging.version import Version from sqlalchemy.orm import Session @@ -699,6 +701,32 @@ def gcs_url(gcs, gcs_scratch, test_name_uuid): gcs.rm(url, recursive=True) +@pytest.fixture(scope="session") +def az(): + load_dotenv() + return adlfs.AzureBlobFileSystem() + + +@pytest.fixture(scope="session") +def az_scratch(az): + # Ensure that the test-scratch directory exists, + # but do NOT remove it as multiple test runs could be + # accessing it at the same time + scratch_url = "az://coiled-runtime-ci/scratch-space" + az.mkdirs(scratch_url, exist_ok=True) + return scratch_url + + +@pytest.fixture(scope="function") +def az_url(az, az_scratch, test_name_uuid): + url = f"{az_scratch}/{test_name_uuid}" + az.mkdirs(url, exist_ok=False) + try: + yield url + finally: + az.rm(url, recursive=True) + + # this code was taken from pytest docs # https://docs.pytest.org/en/latest/example/simple.html#making-test-result-information-available-in-fixtures @pytest.hookimpl(tryfirst=True, hookwrapper=True) diff --git a/tests/geospatial/conftest.py b/tests/geospatial/conftest.py index 99ae0718b5..7957bdbb19 100644 --- a/tests/geospatial/conftest.py +++ b/tests/geospatial/conftest.py @@ -31,13 +31,15 @@ def client_factory(cluster_name, github_cluster_tags, benchmark_all): import contextlib @contextlib.contextmanager - def _(n_workers, **cluster_kwargs): + def _(n_workers, env=None, **cluster_kwargs): with coiled.Cluster( name=cluster_name, tags=github_cluster_tags, n_workers=n_workers, **cluster_kwargs, ) as cluster: + if env: + cluster.send_private_envs(env=env) with cluster.get_client() as client: with benchmark_all(client): yield client diff --git a/tests/geospatial/test_satellite_filtering.py b/tests/geospatial/test_satellite_filtering.py new file mode 100644 index 0000000000..e2fe96cf8f --- /dev/null +++ b/tests/geospatial/test_satellite_filtering.py @@ -0,0 +1,128 @@ +import datetime +import os + +import fsspec +import geojson +import odc.stac +import planetary_computer +import pystac_client +import xarray as xr + + +def harmonize_to_old(data: xr.Dataset) -> xr.Dataset: + """ + Harmonize new Sentinel-2 data to the old baseline. + + Parameters + ---------- + data: + A Dataset with various bands as data variables and three dimensions: time, y, x + + Returns + ------- + harmonized: xarray.Dataset + A Dataset with all values harmonized to the old + processing baseline. + """ + cutoff = datetime.datetime(2022, 1, 25) + offset = 1000 + bands = [ + "B01", + "B02", + "B03", + "B04", + "B05", + "B06", + "B07", + "B08", + "B8A", + "B09", + "B10", + "B11", + "B12", + ] + + to_process = list(set(bands) & set(list(data.data_vars))) + old = data.sel(time=slice(cutoff))[to_process] + + new = data.sel(time=slice(cutoff, None)).drop_vars(to_process) + + new_harmonized = data.sel(time=slice(cutoff, None))[to_process].clip(offset) + new_harmonized -= offset + + new = xr.merge([new, new_harmonized]) + return xr.concat([old, new], dim="time") + + +def test_satellite_filtering( + az_url, + scale, + client_factory, + cluster_kwargs={ + "workspace": "dask-benchmarks-azure", + "region": "westeurope", + "wait_for_workers": True, + }, + scale_kwargs={ + "small": {"n_workers": 10}, + "large": {"n_workers": 100}, + }, +): + with client_factory( + **scale_kwargs[scale], + env={ + "AZURE_STORAGE_ACCOUNT_NAME": os.environ["AZURE_STORAGE_ACCOUNT_NAME"], + "AZURE_STORAGE_SAS_TOKEN": os.environ["AZURE_STORAGE_SAS_TOKEN"], + }, + **cluster_kwargs, + ) as client: # noqa: F841 + catalog = pystac_client.Client.open( + "https://planetarycomputer.microsoft.com/api/stac/v1", + modifier=planetary_computer.sign_inplace, + ) + + # GeoJSON for region of interest is from https://github.com/isellsoap/deutschlandGeoJSON/tree/main/1_deutschland + with fsspec.open( + "https://raw.githubusercontent.com/isellsoap/deutschlandGeoJSON/main/1_deutschland/3_mittel.geo.json" + ) as f: + gj = geojson.load(f) + + # Flatten MultiPolygon to single Polygon + coordinates = [] + for x in gj.features[0]["geometry"]["coordinates"]: + coordinates.extend(x) + area_of_interest = { + "type": "Polygon", + "coordinates": coordinates, + } + + # Get stack items + if scale == "small": + time_of_interest = "2024-01-01/2024-09-01" + else: + time_of_interest = "2015-01-01/2024-09-01" + + search = catalog.search( + collections=["sentinel-2-l2a"], + intersects=area_of_interest, + datetime=time_of_interest, + ) + items = search.item_collection() + + # Construct Xarray Dataset from stack items + ds = odc.stac.load( + items, + chunks={}, + patch_url=planetary_computer.sign, + resolution=40, + crs="EPSG:3857", + groupby="solar_day", + ) + # See https://planetarycomputer.microsoft.com/dataset/sentinel-2-l2a#Baseline-Change + ds = harmonize_to_old(ds) + + # Compute humidity index + humidity = (ds.B08 - ds.B11) / (ds.B08 + ds.B11) + + result = humidity.groupby("time.month").mean() + result.to_zarr(az_url)