Skip to content

Commit

Permalink
Add satellite image processing benchmark (#1550)
Browse files Browse the repository at this point in the history
Co-authored-by: Hendrik Makait <hendrik@makait.com>
  • Loading branch information
jrbourbeau and hendrikmakait authored Oct 18, 2024
1 parent d92edc4 commit c97750c
Show file tree
Hide file tree
Showing 7 changed files with 173 additions and 2 deletions.
4 changes: 4 additions & 0 deletions AB_environments/AB_baseline.conda.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ dependencies:
- h5netcdf ==1.3.0
- xesmf ==0.8.7
- bottleneck ==1.4.1
- geojson ==3.1.0
- planetary-computer ==1.0.0
- pystac-client ==0.8.3
- odc-stac ==0.3.10
# End copy-paste

- pip:
Expand Down
4 changes: 4 additions & 0 deletions AB_environments/AB_sample.conda.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ dependencies:
- h5netcdf ==1.3.0
- xesmf ==0.8.7
- bottleneck ==1.4.1
- geojson ==3.1.0
- planetary-computer ==1.0.0
- pystac-client ==0.8.3
- odc-stac ==0.3.10
# End copy-paste

- pip:
Expand Down
3 changes: 2 additions & 1 deletion ci/environment-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ dependencies:
- pytest
- pytest-timeout
- pytest-xdist
- python-dotenv
- pyyaml
# TPC-H correctness test and DuckDB implementation
# Can add duckdb back to conda install after:
# https://github.com/coiled/benchmarks/issues/1418
# python-duckdb ==0.10.0
- pip
- pip:
- duckdb==0.10.0
- duckdb==0.10.0
4 changes: 4 additions & 0 deletions ci/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ dependencies:
- h5netcdf ==1.3.0
- xesmf ==0.8.7
- bottleneck ==1.4.1
- geojson ==3.1.0
- planetary-computer ==1.0.0
- pystac-client ==0.8.3
- odc-stac ==0.3.10

########################################################
# PLEASE READ:
Expand Down
28 changes: 28 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import uuid
from functools import lru_cache

import adlfs
import dask
import dask.array as da
import dask_expr
Expand All @@ -29,6 +30,7 @@
from dask.distributed import Client, WorkerPlugin
from dask.distributed.diagnostics.memory_sampler import MemorySampler
from dask.distributed.scheduler import logger as scheduler_logger
from dotenv import load_dotenv
from packaging.version import Version
from sqlalchemy.orm import Session

Expand Down Expand Up @@ -699,6 +701,32 @@ def gcs_url(gcs, gcs_scratch, test_name_uuid):
gcs.rm(url, recursive=True)


@pytest.fixture(scope="session")
def az():
load_dotenv()
return adlfs.AzureBlobFileSystem()


@pytest.fixture(scope="session")
def az_scratch(az):
# Ensure that the test-scratch directory exists,
# but do NOT remove it as multiple test runs could be
# accessing it at the same time
scratch_url = "az://coiled-runtime-ci/scratch-space"
az.mkdirs(scratch_url, exist_ok=True)
return scratch_url


@pytest.fixture(scope="function")
def az_url(az, az_scratch, test_name_uuid):
url = f"{az_scratch}/{test_name_uuid}"
az.mkdirs(url, exist_ok=False)
try:
yield url
finally:
az.rm(url, recursive=True)


# this code was taken from pytest docs
# https://docs.pytest.org/en/latest/example/simple.html#making-test-result-information-available-in-fixtures
@pytest.hookimpl(tryfirst=True, hookwrapper=True)
Expand Down
4 changes: 3 additions & 1 deletion tests/geospatial/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,15 @@ def client_factory(cluster_name, github_cluster_tags, benchmark_all):
import contextlib

@contextlib.contextmanager
def _(n_workers, **cluster_kwargs):
def _(n_workers, env=None, **cluster_kwargs):
with coiled.Cluster(
name=cluster_name,
tags=github_cluster_tags,
n_workers=n_workers,
**cluster_kwargs,
) as cluster:
if env:
cluster.send_private_envs(env=env)
with cluster.get_client() as client:
with benchmark_all(client):
yield client
Expand Down
128 changes: 128 additions & 0 deletions tests/geospatial/test_satellite_filtering.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
import datetime
import os

import fsspec
import geojson
import odc.stac
import planetary_computer
import pystac_client
import xarray as xr


def harmonize_to_old(data: xr.Dataset) -> xr.Dataset:
"""
Harmonize new Sentinel-2 data to the old baseline.
Parameters
----------
data:
A Dataset with various bands as data variables and three dimensions: time, y, x
Returns
-------
harmonized: xarray.Dataset
A Dataset with all values harmonized to the old
processing baseline.
"""
cutoff = datetime.datetime(2022, 1, 25)
offset = 1000
bands = [
"B01",
"B02",
"B03",
"B04",
"B05",
"B06",
"B07",
"B08",
"B8A",
"B09",
"B10",
"B11",
"B12",
]

to_process = list(set(bands) & set(list(data.data_vars)))
old = data.sel(time=slice(cutoff))[to_process]

new = data.sel(time=slice(cutoff, None)).drop_vars(to_process)

new_harmonized = data.sel(time=slice(cutoff, None))[to_process].clip(offset)
new_harmonized -= offset

new = xr.merge([new, new_harmonized])
return xr.concat([old, new], dim="time")


def test_satellite_filtering(
az_url,
scale,
client_factory,
cluster_kwargs={
"workspace": "dask-benchmarks-azure",
"region": "westeurope",
"wait_for_workers": True,
},
scale_kwargs={
"small": {"n_workers": 10},
"large": {"n_workers": 100},
},
):
with client_factory(
**scale_kwargs[scale],
env={
"AZURE_STORAGE_ACCOUNT_NAME": os.environ["AZURE_STORAGE_ACCOUNT_NAME"],
"AZURE_STORAGE_SAS_TOKEN": os.environ["AZURE_STORAGE_SAS_TOKEN"],
},
**cluster_kwargs,
) as client: # noqa: F841
catalog = pystac_client.Client.open(
"https://planetarycomputer.microsoft.com/api/stac/v1",
modifier=planetary_computer.sign_inplace,
)

# GeoJSON for region of interest is from https://github.com/isellsoap/deutschlandGeoJSON/tree/main/1_deutschland
with fsspec.open(
"https://raw.githubusercontent.com/isellsoap/deutschlandGeoJSON/main/1_deutschland/3_mittel.geo.json"
) as f:
gj = geojson.load(f)

# Flatten MultiPolygon to single Polygon
coordinates = []
for x in gj.features[0]["geometry"]["coordinates"]:
coordinates.extend(x)
area_of_interest = {
"type": "Polygon",
"coordinates": coordinates,
}

# Get stack items
if scale == "small":
time_of_interest = "2024-01-01/2024-09-01"
else:
time_of_interest = "2015-01-01/2024-09-01"

search = catalog.search(
collections=["sentinel-2-l2a"],
intersects=area_of_interest,
datetime=time_of_interest,
)
items = search.item_collection()

# Construct Xarray Dataset from stack items
ds = odc.stac.load(
items,
chunks={},
patch_url=planetary_computer.sign,
resolution=40,
crs="EPSG:3857",
groupby="solar_day",
)
# See https://planetarycomputer.microsoft.com/dataset/sentinel-2-l2a#Baseline-Change
ds = harmonize_to_old(ds)

# Compute humidity index
humidity = (ds.B08 - ds.B11) / (ds.B08 + ds.B11)

result = humidity.groupby("time.month").mean()
result.to_zarr(az_url)

0 comments on commit c97750c

Please sign in to comment.