diff --git a/scripts/python/make_coastal_grid.py b/scripts/python/make_coastal_grid.py new file mode 100644 index 0000000..7ba7500 --- /dev/null +++ b/scripts/python/make_coastal_grid.py @@ -0,0 +1,253 @@ +import argparse +import datetime +import itertools +import json +import logging +import os +import random +import uuid +from functools import partial +from typing import Literal + +import dotenv +import fsspec +import geopandas as gpd +import pystac + +from coastpy.geo.quadtiles import make_mercantiles +from coastpy.geo.quadtiles_utils import add_geo_columns +from coastpy.io.utils import name_bounds + +# Setup logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +def short_id(seed: str, length: int = 3) -> str: + """Generate a short deterministic ID based on a seed.""" + # Use hashlib for a consistent deterministic hash + random.seed(seed) + return uuid.UUID(int=random.getrandbits(128)).hex[:length] + + +def add_proc_id(geometry, crs): + """Create a unique deterministic proc_id for a geometry.""" + bounds_name = name_bounds(geometry.bounds, crs) + # Use geometry bounds as a stable input for the seed + deterministic_suffix = short_id(str(geometry.bounds)) + return f"{bounds_name}-{deterministic_suffix}" + + +def load_data( + uri: str, storage_options=None, to_crs: str | None = None +) -> gpd.GeoDataFrame: + """ + Load a geospatial dataset and optionally transform its CRS. + + Args: + uri (str): URI of the dataset. + storage_options (dict, optional): Storage options for fsspec. + to_crs (str, optional): CRS to transform the dataset. + + Returns: + GeoDataFrame: The loaded dataset. + """ + with fsspec.open(uri, "rb", **(storage_options or {})) as f: + df = gpd.read_parquet(f) + if to_crs: + df = df.to_crs(to_crs) + logger.info(f"Loaded {uri} with {len(df)} features.") + return df + + +def clip_and_filter(grid, coastal_zone): + """ + Clip the grid tiles by the coastal zone. + """ + # Filter the grid tiles by intersecting coastline + filtered_tiles = ( + gpd.sjoin(grid, coastal_zone, how="inner", predicate="intersects") + .drop_duplicates(subset="coastal_grid:quadkey") + .drop(columns=["index_right"]) + ) + logger.info(f"Filtered grid tiles: {len(filtered_tiles)} features.") + + # Clip the tiles by the coastal zone + clipped_tiles = gpd.overlay(filtered_tiles, coastal_zone, how="intersection") + clipped_tiles = clipped_tiles.explode(index_parts=False).reset_index(drop=True) + logger.info(f"Clipped tiles: {len(clipped_tiles)} features.") + return clipped_tiles + + +def _aggregate_spatial_data(tiles, spatial_data, groupby_column, attributes): + """Perform spatial join and aggregate attributes.""" + joined = gpd.sjoin(tiles, spatial_data, how="inner").drop(columns=["index_right"]) + aggregated = joined.groupby(groupby_column).agg(attributes) + return tiles.merge(aggregated, on=groupby_column, how="left") + + +def add_divisions(tiles, countries): + """Add country and continent info to tiles.""" + attributes = { + "admin:countries": lambda x: json.dumps(list(set(x))), + "admin:continents": lambda x: json.dumps(list(set(x))), + } + countries = countries[["country", "continent", "geometry"]].rename( + columns={"country": "admin:countries", "continent": "admin:continents"} + ) + return _aggregate_spatial_data( + tiles, + countries, + "coastal_grid:id", + attributes, + ) + + +def add_mgrs(tiles, s2grid): + """Add Sentinel-2 MGRS names to tiles.""" + attributes = {"Name": lambda x: json.dumps(list(set(x)))} + tiles = _aggregate_spatial_data( + tiles, s2grid[["Name", "geometry"]], "coastal_grid:id", attributes + ) + tiles = tiles.rename(columns={"Name": "s2:mgrs_tile"}) + return tiles + + +def read_coastal_zone( + buffer_size: Literal["500m", "1000m", "2000m", "5000m", "10000m", "15000m"], +): + """ + Load the coastal zone data layer for a specific buffer size. + """ + coclico_catalog = pystac.Catalog.from_file( + "https://coclico.blob.core.windows.net/stac/v1/catalog.json" + ) + coastal_zone_collection = coclico_catalog.get_child("coastal-zone") + if coastal_zone_collection is None: + msg = "Coastal zone collection not found" + raise ValueError(msg) + item = coastal_zone_collection.get_item(f"coastal_zone_{buffer_size}") + if item is None: + msg = f"Coastal zone item for {buffer_size} not found" + raise ValueError(msg) + href = item.assets["data"].href + with fsspec.open(href, mode="rb") as f: + coastal_zone = gpd.read_parquet(f) + return coastal_zone + + +VALID_BUFFER_SIZES = ["500m", "1000m", "2000m", "5000m", "10000m", "15000m"] +COLUMN_ORDER = [ + "coastal_grid:id", + "coastal_grid:quadkey", + "coastal_grid:bbox", + "admin:countries", + "admin:continents", + "s2:mgrs_tile", + "geometry", +] + + +def main( + buffer_sizes: Literal["500m", "1000m", "2000m", "5000m", "10000m", "15000m"], + zooms: list[int], + release, +): + """ + Main function to process coastal grids for given buffer sizes and zoom levels. + """ + dotenv.load_dotenv() + sas_token = os.getenv("AZURE_STORAGE_SAS_TOKEN") + storage_options = {"account_name": "coclico", "sas_token": sas_token} + + for buffer_size, zoom in itertools.product(buffer_sizes, zooms): + # Validate zoom and buffer_size + if buffer_size not in VALID_BUFFER_SIZES: + msg = f"Invalid buffer size: {buffer_size}" + raise ValueError(msg) + if not (1 <= zoom <= 18): + msg = f"Zoom level must be between 1 and 18. Got: {zoom}" + raise ValueError(msg) + + print(f"Processing buffer size: {buffer_size}, zoom level: {zoom}") + + # Construct storage path + storage_urlpath = f"az://coastal-grid/release/{release}/coastal_grid_z{zoom}_{buffer_size}.parquet" + + # Load datasets + coastal_zone = read_coastal_zone(buffer_size) # type: ignore + with fsspec.open( + "https://coclico.blob.core.windows.net/public/countries.parquet", "rb" + ) as f: + countries = gpd.read_parquet(f) + with fsspec.open( + "https://coclico.blob.core.windows.net/tiles/S2A_OPER_GIP_TILPAR_MPC.parquet", + "rb", + ) as f: + mgrs = gpd.read_parquet(f).to_crs(4326) + + grid = make_mercantiles(zoom).rename( + columns={"quadkey": "coastal_grid:quadkey"} + ) + + # Clip and filter + tiles = clip_and_filter(grid, coastal_zone) + + # Add geographical columns and unique IDs + tiles = add_geo_columns(tiles, geo_columns=["bbox"]).rename( + columns={"bbox": "coastal_grid:bbox"} + ) + tiles = tiles.sort_values("coastal_grid:quadkey") + add_proc_id_partial = partial(add_proc_id, crs=tiles.crs) + tiles["coastal_grid:id"] = tiles.geometry.map(add_proc_id_partial) + + # Aggregate tiles with country and continent data + tiles = add_divisions(tiles, countries) + tiles = add_mgrs(tiles, mgrs) + tiles = tiles[COLUMN_ORDER] + + # Save the processed tiles + with fsspec.open(storage_urlpath, mode="wb", **storage_options) as f: + tiles.to_parquet(f) + + logger.info(f"Saved: {storage_urlpath}") + + +if __name__ == "__main__": + + def validate_date(date_string): + """ + Validates the date string to ensure it's in the 'yyyy-mm-dd' format. + """ + try: + # Attempt to parse the date in 'yyyy-mm-dd' format + datetime.datetime.strptime(date_string, "%Y-%m-%d") + return date_string # Return the valid date string + except ValueError: + msg = f"Invalid date format: {date_string}. Expected format: yyyy-mm-dd." + raise ValueError(msg) from None + + parser = argparse.ArgumentParser(description="Generate coastal grid datasets.") + parser.add_argument( + "--buffer_size", + nargs="+", + type=str, + required=True, + help="List of buffer sizes (e.g., 500m 1000m 2000m 5000m 10000m 15000m).", + ) + parser.add_argument( + "--zoom", + nargs="+", + type=int, + required=True, + help="List of zoom levels (e.g., 4 6 8).", + ) + parser.add_argument( + "--release", + type=validate_date, + required=True, + help="Release date in yyyy-mm-dd format (e.g., 2024-12-09).", + ) + args = parser.parse_args() + + main(args.buffer_size, args.zoom, args.release) diff --git a/scripts/python/make_coastal_zone.py b/scripts/python/make_coastal_zone.py new file mode 100644 index 0000000..7ca7119 --- /dev/null +++ b/scripts/python/make_coastal_zone.py @@ -0,0 +1,445 @@ +#!/usr/bin/env python +import logging +import warnings + +import dask +import dask.dataframe as dd +import dask_geopandas +import geopandas as gpd +import pandas as pd +import shapely +from distributed import Client +from geopandas.array import GeometryDtype +from shapely.geometry import MultiPolygon, Polygon +from shapely.geometry.polygon import orient +from shapely.validation import make_valid + +from coastpy.utils.dask import silence_shapely_warnings + +BUFFER_SIZE = 500 +TOLERANCE = 100 +MIN_COASTLINE_LENGTH = 0 + +RELEASE = "2024-12-08" + +OSM_COASTLINE_URI = "az://coastlines-osm/release/2023-02-09/coast_3857_gen9.parquet" +OSM_COASTLINE_SAMPLE_URI = ( + "az://public/coastlines-osm-sample/release/2023-02-09/coast_3857_gen9.parquet" +) +UTM_GRID_URI = "az://public/utm_grid.parquet" + +PRC_EPSG = 3857 +DST_CRS = 4326 + +STORAGE_URLPATH = ( + f"az://coastal-zone/release/{RELEASE}/coastal_zone_{BUFFER_SIZE}m.parquet" +) + + +def silence_warnings(): + """ + Silence specific warnings for a cleaner output. + + """ + + # Silencing specific warnings + warnings.filterwarnings( + "ignore", + message=( + r"is_sparse is deprecated and will be removed in a future version. Check" + r" `isinstance\(dtype, pd.SparseDtype\)` instead." + ), + ) + warnings.filterwarnings( + "ignore", + message=( + r"is_datetime64tz_dtype is deprecated and will be removed in a future" + r" version. Check `isinstance\(dtype, pd.DatetimeTZDtype\)` instead." + ), + ) + warnings.filterwarnings( + "ignore", + r"DataFrameGroupBy.apply operated on the grouping columns. This behavior is deprecated," + r"and in a future version of pandas the grouping columns will be excluded from the operation." + r"Either pass `include_groups=False` to exclude the groupings or explicitly select the grouping" + r"action= columns after groupby to silence this warning.", + ) + + +if __name__ == "__main__": + import dotenv + import fsspec + + dotenv.load_dotenv() + storage_options = { + "account_name": "coclico", + "sas_token": "AZURE_STORAGE_SAS_TOKEN", + } + + client = Client() + logging.info(client.dashboard_link) + + silence_warnings() + + with fsspec.open(UTM_GRID_URI, **storage_options) as f: + utm_grid = gpd.read_parquet(f) + + utm_grid = utm_grid.dissolve("epsg").to_crs(PRC_EPSG).reset_index() + + [utm_grid_scattered] = client.scatter( + [utm_grid.loc[:, ["geometry", "epsg", "utm_code"]]], broadcast=True + ) # type: ignore + + try: + # NOTE: the source coastline cannot be made public. So here we attempt to + # read the full dataset, but when the access keys are not available, we fallback + # to a sample of the dataset. + coastlines = ( + dask_geopandas.read_parquet( + OSM_COASTLINE_URI, storage_options=storage_options + ) + .repartition(npartitions=10) + .persist() + # .sample(frac=0.1) + .to_crs(PRC_EPSG) + ) + logging.info("Successfully loaded the full private dataset.") + + except Exception as e: + # Log the exception and fallback to the sample dataset + logging.warning( + f"Failed to load private dataset: {e}. Defaulting to sample dataset." + ) + coastlines = ( + dask_geopandas.read_parquet(OSM_COASTLINE_SAMPLE_URI) + .repartition(npartitions=10) + .persist() + .to_crs(PRC_EPSG) + ) + logging.info("Successfully loaded the sample dataset.") + + def is_closed(geometry): + """Check if a LineString geometry is closed.""" + return geometry.is_closed + + def wrap_is_closed(df): + df["osm_coastline_is_closed"] = df.geometry.astype(object).apply(is_closed) + return df + + META = gpd.GeoDataFrame( + { + "FID": pd.Series([], dtype="i8"), + "geometry": gpd.GeoSeries([], dtype=GeometryDtype), + "osm_coastline_is_closed": pd.Series([], dtype="bool"), + } + ) + + coastlines = coastlines.map_partitions(wrap_is_closed, meta=META).set_crs( + coastlines.crs + ) + + utm_extent = gpd.GeoDataFrame( + geometry=[ + shapely.box( + xmin=-179.99998854, + ymin=-80.00000006, + xmax=179.99998854, + ymax=84.00000009, + ) + ], + crs="EPSG:4326", + ).to_crs(PRC_EPSG) + + [utm_extent_scattered] = client.scatter([utm_extent], broadcast=True) # type: ignore + + def overlay_by_grid(df, grid): + return gpd.overlay( + df, + grid, + keep_geom_type=False, + ).explode(column="geometry", index_parts=False) + + META = gpd.GeoDataFrame( + { + "FID": pd.Series([], dtype="i8"), + "osm_coastline_is_closed": pd.Series([], dtype="bool"), + "epsg": pd.Series([], dtype="i8"), + "utm_code": pd.Series([], dtype=object), + "geometry": gpd.GeoSeries([], dtype=GeometryDtype), + } + ) + + lazy_values = [] + for partition in coastlines.to_delayed(): + lazy_value = dask.delayed(overlay_by_grid)(partition, utm_extent_scattered) + lazy_value = dask.delayed(overlay_by_grid)(lazy_value, utm_grid_scattered) + lazy_values.append(lazy_value) # Note the change here + + import dask.dataframe as dd + + coastlines = dd.from_delayed(lazy_values, meta=META).set_crs(coastlines.crs) + # rename fid because they are no longer unique after overlay + coastlines = coastlines.rename(columns={"FID": "FID_osm"}).astype( + {"FID_osm": "i4", "epsg": "i4"} + ) # type: ignore + + # TODO: use coastpy.geo.utils add_geometry_lengths + def add_lengths(df, utm_epsg): + silence_shapely_warnings() + # compute geometry length in local utm crs + df = ( + df.to_crs(utm_epsg) + .assign(geometry_length=lambda df: df.geometry.length) + .to_crs(df.crs) + ) + # compute total coastline length per FID + coastline_lengths = ( + df.groupby("FID_osm")["geometry_length"] + .sum() + .rename("osm_coastline_length") + .reset_index() + ) + # add to dataframe + return pd.merge( + df.drop(columns=["geometry_length"]), coastline_lengths, on="FID_osm" + ) + + META = gpd.GeoDataFrame( + { + "FID_osm": pd.Series([], dtype="i4"), + "osm_coastline_is_closed": pd.Series([], dtype="bool"), + "epsg": pd.Series([], dtype="i4"), + "utm_code": pd.Series([], dtype="string"), + "geometry": gpd.GeoSeries([], dtype=GeometryDtype), + "osm_coastline_length": pd.Series([], dtype="f8"), + } + ) + + # NOTE: check how to handle the group keys with Pandas > 2.2.2 + coastlines = coastlines.map_partitions( + lambda partition: partition.groupby("epsg", group_keys=False).apply( + lambda gr: add_lengths(gr, gr.name) + ), + meta=META, + ).set_crs(coastlines.crs) + # the coastlines have been clipped to a utm grid, so add a new name id + + def add_coastline_names(df): + segment_ids = df.groupby("FID_osm").cumcount() + names = [ + f"cl{fid}s{seg}" for fid, seg in zip(df.FID_osm, segment_ids, strict=False) + ] + df["osm_coastline_id"] = names + return df + + META = gpd.GeoDataFrame( + { + "FID_osm": pd.Series([], dtype="i4"), + "osm_coastline_is_closed": pd.Series([], dtype="bool"), + "epsg": pd.Series([], dtype="i4"), + "utm_code": pd.Series([], dtype="string"), + "geometry": gpd.GeoSeries([], dtype=GeometryDtype), + "osm_coastline_length": pd.Series([], dtype="f8"), + "osm_coastline_id": pd.Series([], dtype="string"), + } + ) + coastlines = coastlines.map_partitions(add_coastline_names, meta=META).set_crs( + coastlines.crs + ) + + # drop coastlines that are too short + coastlines = coastlines.loc[ + coastlines.osm_coastline_length > MIN_COASTLINE_LENGTH + ].persist() + + def buffer_geometry(df, utm_crs, buffer_size): + silence_shapely_warnings() + df["buffer"] = gpd.GeoDataFrame( + geometry=df.to_crs(utm_crs).buffer(buffer_size), crs=utm_crs + ).to_crs(df.crs) + return df + + def fix_self_intersecting_polygons(row): + """ + Corrects geometries where buffering creates self-intersecting or invalid polygons. + Ensures proper winding (CCW) for exteriors and removes invalid geometries. + + Args: + row: A DataFrame row with a 'buffer' geometry to correct. + + Returns: + The corrected row with a valid and properly oriented geometry. + """ + silence_shapely_warnings() + with warnings.catch_warnings(): + warnings.simplefilter("ignore", category=RuntimeWarning) + row = row.copy() + + try: + if row.buffer.geom_type == "Polygon": + # Fix winding of the exterior ring + if not row.geometry.within(row.buffer): + row.buffer = orient(Polygon(row.buffer.exterior), sign=1.0) + + elif row.buffer.geom_type == "MultiPolygon": + # Process each polygon in the multipolygon + parts = [] + for polygon in row.buffer.geoms: + if not row.geometry.within(polygon): + polygon = orient(Polygon(polygon.exterior), sign=1.0) # noqa + parts.append(polygon) + row.buffer = MultiPolygon(parts) + + except Exception as e: + logging.error(f"Failed to fix self-intersecting polygon: {e}") + return None + + # Simplify geometry to remove unnecessary complexity + row.buffer = row.buffer.simplify( + tolerance=TOLERANCE, preserve_topology=True + ) + + return row + + def compute_buffers(df): + silence_shapely_warnings() + df = df.groupby("epsg", group_keys=False).apply( + lambda p: buffer_geometry(p, p.name, BUFFER_SIZE) + ) + df = df.apply(fix_self_intersecting_polygons, axis=1) + df = df.dropna(subset=["geometry"]) + df = ( + df.drop(columns="geometry") + .rename(columns={"buffer": "geometry"}) + .set_geometry("geometry", crs=df.crs) + ) + columns_to_drop = ["osm_coastline_id", "osm_coastline_is_closed", "utm_code"] + + df = df.drop(columns=columns_to_drop) + df = df.rename(columns={"FID_osm": "FID", "osm_coastline_length": "FID_length"}) + return df + + META = gpd.GeoDataFrame( + { + "FID": pd.Series([], dtype="i4"), + "epsg": pd.Series([], dtype="i4"), + "FID_length": pd.Series([], dtype="f8"), + "geometry": gpd.GeoSeries([], dtype=GeometryDtype), + } + ) + + buffer = ( + coastlines.map_partitions(compute_buffers, meta=META) + .set_crs(coastlines.crs) + .persist() + ) + + def crosses_antimeridian(geometry): + """ + Check if a geometry crosses the antimeridian. + + Args: + geometry (shapely.geometry.base.BaseGeometry): The input geometry. + + Returns: + bool: True if the geometry crosses the antimeridian, False otherwise. + """ + minx, miny, maxx, maxy = geometry.bounds + return maxx - minx > 180 + + def map_crosses_antimeridian(df): + src_crs = df.crs + df = df.to_crs(4326) + df["crosses_antimeridian"] = df["geometry"].apply(crosses_antimeridian) + df = df.to_crs(src_crs) + return df + + def correct_antimeridian_cross(row): + """ + Correct geometries crossing the antimeridian using the antimeridian library. + + Args: + row (pd.Series): Row containing the geometry to correct. + + Returns: + shapely.geometry.base.BaseGeometry: Corrected geometry. + """ + geom = row.geometry + + try: + # Fix geometry using antimeridian library + import antimeridian + + if geom.geom_type == "Polygon": + fixed = antimeridian.fix_polygon(geom, fix_winding=True) + fixed = make_valid(fixed) + return fixed + elif geom.geom_type == "MultiPolygon": + fixed = antimeridian.fix_multi_polygon(geom, fix_winding=True) + fixed = make_valid(fixed) + return fixed + except Exception as e: + logging.error(f"Failed to correct antimeridian crossing: {e}") + return geom + + def correct_antimeridian_crosses_in_df(df): + """ + Correct geometries that cross the antimeridian. + + Args: + df (gpd.GeoDataFrame): Input GeoDataFrame with `crosses_antimeridian` column. + utm_grid (gpd.GeoDataFrame): UTM grid for overlay. + + Returns: + gpd.GeoDataFrame: Updated GeoDataFrame with corrected geometries. + """ + df = df.copy() + crs = df.crs + df = df.to_crs(4326) + + # Create a boolean mask for rows to correct + rows_to_correct = df["crosses_antimeridian"] + + # Apply the correction only to rows where `crosses_antimeridian` is True + df.loc[rows_to_correct, "geometry"] = df.loc[rows_to_correct].apply( + lambda row: correct_antimeridian_cross(row), axis=1 + ) + df = df.to_crs(crs) + return df + + META = gpd.GeoDataFrame( + { + "FID": pd.Series([], dtype="i4"), + "epsg": pd.Series([], dtype="i4"), + "FID_length": pd.Series([], dtype="f8"), + "geometry": gpd.GeoSeries([], dtype=GeometryDtype), + "crosses_antimeridian": pd.Series([], dtype=bool), + } + ) + + lazy_values = [] + for partition in buffer.to_delayed(): + t = dask.delayed(map_crosses_antimeridian)(partition) + t = dask.delayed(correct_antimeridian_crosses_in_df)(t) + lazy_values.append(t) + buffer = dd.from_delayed(lazy_values, meta=META).set_crs(buffer.crs) + + # NOTE: this is useful + union = (buffer.unary_union).compute() + + # NOTE: maybe add datetime and other metadata? + buffer = ( + gpd.GeoDataFrame(geometry=[union], crs=buffer.crs) + .explode(index_parts=False) + .reset_index(drop=True) + ).set_geometry("geometry", crs=buffer.crs) + + # NOTE: There will be 1 remaining LineString exactly at the antimeridian + buffer = buffer[ + (buffer.geom_type == "Polygon") | (buffer.geom_type == "MultiPolygon") + ] + buffer = buffer.to_crs(DST_CRS) + + logging.info(f"Writing buffer to {STORAGE_URLPATH}..") + with fsspec.open(STORAGE_URLPATH, mode="wb", **storage_options) as f: + buffer.to_parquet(f) diff --git a/scripts/python/stac_coastal_grid.py b/scripts/python/stac_coastal_grid.py new file mode 100644 index 0000000..8c3b0f7 --- /dev/null +++ b/scripts/python/stac_coastal_grid.py @@ -0,0 +1,382 @@ +import datetime +import os +import pathlib +import re +from typing import Any + +import fsspec +import pystac +import stac_geoparquet +import tqdm +from dotenv import load_dotenv +from pystac.extensions.item_assets import ItemAssetsExtension +from pystac.extensions.scientific import Publication, ScientificExtension +from pystac.extensions.version import VersionExtension +from pystac.provider import ProviderRole +from pystac.stac_io import DefaultStacIO + +from coastpy.io.utils import PathParser +from coastpy.libs import stac_table +from coastpy.stac import ParquetLayout + +# Load the environment variables from the .env file +load_dotenv() + +# Get the SAS token and storage account name from environment variables +sas_token = os.getenv("AZURE_STORAGE_SAS_TOKEN") +STORAGE_ACCOUNT_NAME = "coclico" +storage_options = {"account_name": STORAGE_ACCOUNT_NAME, "credential": sas_token} + +# Container and URI configuration +VERSION = "2024-12-10" +DATETIME_STAC_CREATED = datetime.datetime.now(datetime.UTC) +DATETIME_DATA_CREATED = datetime.datetime(2023, 2, 9) +CONTAINER_NAME = "coastal-grid" +PREFIX = f"release/{VERSION}" +CONTAINER_URI = f"az://{CONTAINER_NAME}/{PREFIX}" +PARQUET_MEDIA_TYPE = "application/vnd.apache.parquet" +LICENSE = "CC-BY-4.0" + +# Collection information +COLLECTION_ID = "coastal-grid" +COLLECTION_TITLE = "Coastal Grid" + +DESCRIPTION = """ +The Coastal Grid dataset provides a global tiling system for geospatial analytics in coastal areas. +It supports scalable data processing workflows by offering structured grids at varying zoom levels +(5, 6, 7) and buffer sizes (500m, 1000m, 2000m, 5000m, 10000m, 15000m). + +Each tile contains information on intersecting countries, continents, and Sentinel-2 MGRS tiles +as nested JSON lists. The dataset is particularly suited for applications requiring global coastal +coverage, such as satellite-based coastal monitoring, spatial analytics, and large-scale data processing. + +Key Features: +- Global coverage of the coastal zone, derived from OpenStreetMap's generalized coastline (2023-02). +- Precomputed intersections with countries, continents, and MGRS tiles. +- Designed for use in scalable geospatial workflows. + +This dataset is structured as a STAC collection, with individual items for each zoom level and buffer +size combination. Users can filter items by the `zoom` and `buffer_size` fields in the STAC metadata. + +Please consider the following citation when using this dataset: + +Floris Reinier Calkoen, Arjen Pieter Luijendijk, Kilian Vos, Etiënne Kras, Fedor Baart, +Enabling coastal analytics at planetary scale, Environmental Modelling & Software, 2024, +106257, ISSN 1364-8152, https://doi.org/10.1016/j.envsoft.2024.106257. +""" + +ASSET_TITLE = "Coastal Grid" +ASSET_DESCRIPTION = ( + "Parquet dataset providing a global structured coastal grid for coastal analytics" +) + +GEOPARQUET_STAC_ITEMS_HREF = f"az://items/{COLLECTION_ID}.parquet" + +COLUMN_DESCRIPTIONS = [ + { + "name": "coastal_grid:id", + "type": "string", + "description": "Unique identifier for each tile, derived from bounds and a deterministic hex suffix.", + }, + { + "name": "coastal_grid:quadkey", + "type": "string", + "description": "Mercator quadkey for each tile, indicating its spatial location and zoom level.", + }, + { + "name": "coastal_grid:bbox", + "type": "object", + "description": "Bounding box of the tile in WGS84 coordinates, represented as a dictionary.", + }, + { + "name": "admin:countries", + "type": "string", + "description": """JSON list of countries intersecting the tile (e.g., '["CA", "US"]').""", + }, + { + "name": "admin:continents", + "type": "string", + "description": """JSON list of continents intersecting the tile (e.g., '["NA", "SA"]').""", + }, + { + "name": "s2:mgrs_tile", + "type": "string", + "description": """JSON list of Sentinel-2 MGRS tiles intersecting the tile (e.g., '["15XWL", "16XDR"]').""", + }, + { + "name": "geometry", + "type": "geometry", + "description": "Polygon geometry defining the spatial extent of the tile.", + }, +] + + +ASSET_EXTRA_FIELDS = { + "table:storage_options": {"account_name": "coclico"}, + "table:columns": COLUMN_DESCRIPTIONS, +} + + +def add_citation_extension(collection): + """ + Add citation-related metadata to the STAC collection using the Citation Extension. + """ + # Add the Scientific Extension to the collection + ScientificExtension.add_to(collection) + + # Define the DOI and citation + DOI = "10.1016/j.envsoft.2024.106257" + CITATION = ( + "Floris Reinier Calkoen, Arjen Pieter Luijendijk, Kilian Vos, Etiënne Kras, Fedor Baart, " + "Enabling coastal analytics at planetary scale, Environmental Modelling & Software, 2024, " + "106257, ISSN 1364-8152, https://doi.org/10.1016/j.envsoft.2024.106257." + ) + + # Add the DOI and citation to the collection's extra fields + sci_ext = ScientificExtension.ext(collection, add_if_missing=True) + sci_ext.doi = DOI + sci_ext.citation = CITATION + + # Optionally add publications (if applicable) + sci_ext.publications = [Publication(doi=DOI, citation=CITATION)] + + return collection + + +def create_collection( + description: str | None = None, extra_fields: dict[str, Any] | None = None +) -> pystac.Collection: + providers = [ + pystac.Provider( + name="Deltares", + roles=[ + ProviderRole.PRODUCER, + ProviderRole.PROCESSOR, + ProviderRole.HOST, + ProviderRole.LICENSOR, + ], + url="https://deltares.nl", + ), + ] + + extent = pystac.Extent( + pystac.SpatialExtent([[-180.0, 90.0, 180.0, -90.0]]), + pystac.TemporalExtent([[DATETIME_DATA_CREATED, None]]), + ) + + links = [ + pystac.Link( + pystac.RelType.LICENSE, + target="https://creativecommons.org/licenses/by/4.0/", + media_type="text/html", + title="CC BY 4.0 ", + ), + ] + + keywords = [ + "Coastal analytics", + "Coastal change", + "Coastal monitoring", + "Satellite-derived shorelines", + "Coastal zone", + "Deltares", + "CoCliCo", + "GeoParquet", + ] + if description is None: + description = DESCRIPTION + + collection = pystac.Collection( + id=COLLECTION_ID, + title=COLLECTION_TITLE, + description=description, + license=LICENSE, + providers=providers, + extent=extent, + catalog_type=pystac.CatalogType.RELATIVE_PUBLISHED, + ) + + collection.add_asset( + "thumbnail", + pystac.Asset( + "https://coclico.blob.core.windows.net/assets/thumbnails/coastal-grid-thumbnail.jpeg", + title="Thumbnail", + media_type=pystac.MediaType.JPEG, + ), + ) + collection.links = links + collection.keywords = keywords + + ItemAssetsExtension.add_to(collection) + + collection.extra_fields["item_assets"] = { + "data": { + "title": ASSET_TITLE, + "description": ASSET_DESCRIPTION, + "roles": ["data"], + "type": stac_table.PARQUET_MEDIA_TYPE, + **ASSET_EXTRA_FIELDS, + } + } + + if extra_fields: + collection.extra_fields.update(extra_fields) + + collection = add_citation_extension(collection) + + version_ext = VersionExtension.ext(collection, add_if_missing=True) + version_ext.version = VERSION + + # NOTE: Add schema validation after making a PR to the stac-table repo + # collection.stac_extensions.append(stac_table.SCHEMA_URI) + + return collection + + +def create_item( + urlpath: str, + storage_options: dict[str, Any] | None = None, + properties: dict[str, Any] | None = None, + item_extra_fields: dict[str, Any] | None = None, + asset_extra_fields: dict[str, Any] | None = None, +) -> pystac.Item: + """Create a STAC Item""" + + if item_extra_fields is None: + item_extra_fields = {} + + if properties is None: + properties = {} + + pp = PathParser(urlpath, account_name=STORAGE_ACCOUNT_NAME) + + template = pystac.Item( + id=pp.stac_item_id, + properties=properties, + geometry=None, + bbox=None, + datetime=DATETIME_DATA_CREATED, + stac_extensions=[], + ) + template.common_metadata.created = DATETIME_STAC_CREATED + + item = stac_table.generate( + uri=pp.to_cloud_uri(), + template=template, + infer_bbox=True, + proj=True, + infer_geometry=False, + infer_datetime=stac_table.InferDatetimeOptions.no, + datetime_column=None, + metadata_created=DATETIME_STAC_CREATED, + data_created=DATETIME_DATA_CREATED, + count_rows=True, + asset_key="data", + asset_href=pp.to_cloud_uri(), + asset_title=ASSET_TITLE, + asset_description=ASSET_DESCRIPTION, + asset_media_type=PARQUET_MEDIA_TYPE, + asset_roles=["data"], + asset_extra_fields=asset_extra_fields, + storage_options=storage_options, + validate=False, + ) + assert isinstance(item, pystac.Item) + + # add descriptions to item properties + if "table:columns" in ASSET_EXTRA_FIELDS and "table:columns" in item.properties: + source_lookup = { + col["name"]: col for col in ASSET_EXTRA_FIELDS["table:columns"] + } + + for target_col in item.properties["table:columns"]: + source_col = source_lookup.get(target_col["name"]) + if source_col: + target_col.setdefault("description", source_col.get("description")) + + # Optionally add an HTTPS link if the URI uses a 'cloud protocol' + if not item.assets["data"].href.startswith("https://"): + item.add_link( + pystac.Link( + rel="alternate", + target=pp.to_https_url(), + title="HTTPS access", + media_type=PARQUET_MEDIA_TYPE, + ) + ) + return item + + +if __name__ == "__main__": + storage_options = {"account_name": "coclico", "credential": sas_token} + fs, token, [root] = fsspec.get_fs_token_paths( + CONTAINER_URI, storage_options=storage_options + ) + paths = fs.glob(CONTAINER_URI + "/**/*.parquet") + uris = ["az://" + p for p in paths] + + STAC_DIR = pathlib.Path.home() / "dev" / "coclicodata" / "current" + catalog = pystac.Catalog.from_file(str(STAC_DIR / "catalog.json")) + + stac_io = DefaultStacIO() + layout = ParquetLayout() + + collection = create_collection( + extra_fields={"storage_pattern": CONTAINER_URI + "/*.parquet"} + ) + collection.validate_all() + + for uri in tqdm.tqdm(uris, desc="Processing files"): + match = re.search(r"_z([0-9]+)_([0-9]+m)\.parquet$", uri) + if match: + zoom_level = match.group(1) # Capture the zoom level + buffer_size = match.group(2) # Capture the buffer size + item_extra_fields = {"zoom_level": zoom_level, "buffer_size": buffer_size} + + item = create_item( + uri, + storage_options=storage_options, + asset_extra_fields=ASSET_EXTRA_FIELDS, + item_extra_fields=item_extra_fields, + ) + item.validate() + collection.add_item(item) + + collection.update_extent_from_items() + + items = list(collection.get_all_items()) + items_as_json = [i.to_dict() for i in items] + item_extents = stac_geoparquet.to_geodataframe(items_as_json) + + with fsspec.open(GEOPARQUET_STAC_ITEMS_HREF, mode="wb", **storage_options) as f: + item_extents.to_parquet(f) + + gpq_items_asset = pystac.Asset( + GEOPARQUET_STAC_ITEMS_HREF, + title="GeoParquet STAC items", + description="Snapshot of the collection's STAC items exported to GeoParquet format.", + media_type=PARQUET_MEDIA_TYPE, + roles=["metadata"], + ) + gpq_items_asset.common_metadata.created = DATETIME_DATA_CREATED + collection.add_asset("geoparquet-stac-items", gpq_items_asset) + + # TODO: there should be a cleaner method to remove the previous stac catalog and its items + try: + if catalog.get_child(collection.id): + catalog.remove_child(collection.id) + print(f"Removed child: {collection.id}.") + except Exception: + pass + + catalog.add_child(collection) + + collection.normalize_hrefs(str(STAC_DIR / collection.id), layout) + + collection.validate_all() + + catalog.save( + catalog_type=pystac.CatalogType.SELF_CONTAINED, + dest_href=str(STAC_DIR), + stac_io=stac_io, + ) diff --git a/scripts/python/stac_coastal_zone.py b/scripts/python/stac_coastal_zone.py new file mode 100644 index 0000000..77ab8da --- /dev/null +++ b/scripts/python/stac_coastal_zone.py @@ -0,0 +1,360 @@ +import datetime +import os +import pathlib +import re +from typing import Any + +import fsspec +import pystac +import stac_geoparquet +import tqdm +from dotenv import load_dotenv +from pystac.extensions.item_assets import ItemAssetsExtension +from pystac.extensions.scientific import Publication, ScientificExtension +from pystac.extensions.version import VersionExtension +from pystac.provider import ProviderRole +from pystac.stac_io import DefaultStacIO + +from coastpy.io.utils import PathParser +from coastpy.libs import stac_table +from coastpy.stac import ParquetLayout + +# Load the environment variables from the .env file +load_dotenv() + +# Get the SAS token and storage account name from environment variables +sas_token = os.getenv("AZURE_STORAGE_SAS_TOKEN") +STORAGE_ACCOUNT_NAME = "coclico" +storage_options = {"account_name": STORAGE_ACCOUNT_NAME, "credential": sas_token} + + +# Container and URI configuration +VERSION = "2024-12-08" +DATETIME_STAC_CREATED = datetime.datetime.now(datetime.UTC) +DATETIME_DATA_CREATED = datetime.datetime(2023, 2, 9) +CONTAINER_NAME = "coastal-zone" +PREFIX = f"release/{VERSION}" +CONTAINER_URI = f"az://{CONTAINER_NAME}/{PREFIX}" +PARQUET_MEDIA_TYPE = "application/vnd.apache.parquet" +LICENSE = "CC-BY-4.0" + +# Collection information +COLLECTION_ID = "coastal-zone" +COLLECTION_TITLE = "Coastal Zone" + +DESCRIPTION = """ +The Coastal Zone dataset provides a vectorized representation of coastal zones at multiple buffer distances. +It is derived from a generalized version of the OpenStreetMap coastline (2023-02) and serves as a valuable +tool for masking other datasets or for spatial analysis in coastal regions. + +This STAC collection includes multiple layers, each corresponding to a specific buffer distance: +500m, 1000m, 2000m, 5000m, 10000m, and 15000m. The buffer distance defines the zone's extent, with the +total width being twice the buffer distance (e.g., a 5000m buffer results in a zone 10km wide). + +Each layer in the collection is stored as a separate item and can be filtered using the `buffer_size` +field in the item's properties. These layers contain only the geometry, enabling seamless integration with +other geospatial data. + +Please consider the following citation when using this dataset: + +Floris Reinier Calkoen, Arjen Pieter Luijendijk, Kilian Vos, Etiënne Kras, Fedor Baart, +Enabling coastal analytics at planetary scale, Environmental Modelling & Software, 2024, +106257, ISSN 1364-8152, https://doi.org/10.1016/j.envsoft.2024.106257. +(https://www.sciencedirect.com/science/article/pii/S1364815224003189) + +""" + +ASSET_TITLE = "Coastal Zone" +ASSET_DESCRIPTION = ( + "Parquet dataset with coastal zone geometries for multiple buffer distances." +) + +GEOPARQUET_STAC_ITEMS_HREF = f"az://items/{COLLECTION_ID}.parquet" + +COLUMN_DESCRIPTIONS = [ + { + "name": "geometry", + "type": "byte_array", + "description": "Well-Known Binary (WKB) representation of the transect as a linestring geometry.", + }, +] + + +ASSET_EXTRA_FIELDS = { + "table:storage_options": {"account_name": "coclico"}, + "table:columns": COLUMN_DESCRIPTIONS, +} + + +def add_citation_extension(collection): + """ + Add citation-related metadata to the STAC collection using the Citation Extension. + """ + # Add the Scientific Extension to the collection + ScientificExtension.add_to(collection) + + # Define the DOI and citation + DOI = "10.1016/j.envsoft.2024.106257" + CITATION = ( + "Floris Reinier Calkoen, Arjen Pieter Luijendijk, Kilian Vos, Etiënne Kras, Fedor Baart, " + "Enabling coastal analytics at planetary scale, Environmental Modelling & Software, 2024, " + "106257, ISSN 1364-8152, https://doi.org/10.1016/j.envsoft.2024.106257." + ) + + # Add the DOI and citation to the collection's extra fields + sci_ext = ScientificExtension.ext(collection, add_if_missing=True) + sci_ext.doi = DOI + sci_ext.citation = CITATION + + # Optionally add publications (if applicable) + sci_ext.publications = [Publication(doi=DOI, citation=CITATION)] + + # Add a link to the DOI + collection.add_link( + pystac.Link( + rel="cite-as", + target=f"https://doi.org/{DOI}", + title="Citation DOI", + ) + ) + + return collection + + +def create_collection( + description: str | None = None, extra_fields: dict[str, Any] | None = None +) -> pystac.Collection: + providers = [ + pystac.Provider( + name="Deltares", + roles=[ + ProviderRole.PRODUCER, + ProviderRole.PROCESSOR, + ProviderRole.HOST, + ProviderRole.LICENSOR, + ], + url="https://deltares.nl", + ), + ] + + extent = pystac.Extent( + pystac.SpatialExtent([[-180.0, 90.0, 180.0, -90.0]]), + pystac.TemporalExtent([[DATETIME_DATA_CREATED, None]]), + ) + + links = [ + pystac.Link( + pystac.RelType.LICENSE, + target="https://creativecommons.org/licenses/by/4.0/", + media_type="text/html", + title="CC BY 4.0 ", + ), + ] + + keywords = [ + "Coastal analytics", + "Cloud technology", + "Coastal change", + "Coastal monitoring", + "Satellite-derived shorelines", + "Coastal zone", + "Deltares", + "CoCliCo", + "GeoParquet", + ] + if description is None: + description = DESCRIPTION + + collection = pystac.Collection( + id=COLLECTION_ID, + title=COLLECTION_TITLE, + description=description, + license=LICENSE, + providers=providers, + extent=extent, + catalog_type=pystac.CatalogType.RELATIVE_PUBLISHED, + ) + + collection.add_asset( + "thumbnail", + pystac.Asset( + "https://coclico.blob.core.windows.net/assets/thumbnails/coastal-zone-thumbnail.jpeg", + title="Thumbnail", + media_type=pystac.MediaType.JPEG, + roles=["thumbnail"], + ), + ) + collection.links = links + collection.keywords = keywords + + ItemAssetsExtension.add_to(collection) + + collection.extra_fields["item_assets"] = { + "data": { + "title": ASSET_TITLE, + "description": ASSET_DESCRIPTION, + "roles": ["data"], + "type": stac_table.PARQUET_MEDIA_TYPE, + **ASSET_EXTRA_FIELDS, + } + } + + if extra_fields: + collection.extra_fields.update(extra_fields) + + collection = add_citation_extension(collection) + version_ext = VersionExtension.ext(collection, add_if_missing=True) + version_ext.version = VERSION + + # NOTE: Add schema validation after making a PR to the stac-table repo + # collection.stac_extensions.append(stac_table.SCHEMA_URI) + + return collection + + +def create_item( + urlpath: str, + storage_options: dict[str, Any] | None = None, + properties: dict[str, Any] | None = None, + item_extra_fields: dict[str, Any] | None = None, + asset_extra_fields: dict[str, Any] | None = None, +) -> pystac.Item: + """Create a STAC Item""" + + if item_extra_fields is None: + item_extra_fields = {} + + if properties is None: + properties = {} + + pp = PathParser(urlpath, account_name=STORAGE_ACCOUNT_NAME) + + template = pystac.Item( + id=pp.stac_item_id, + properties=properties, + geometry=None, + bbox=None, + datetime=DATETIME_DATA_CREATED, + stac_extensions=[], + ) + template.common_metadata.created = DATETIME_STAC_CREATED + + item = stac_table.generate( + uri=pp.to_cloud_uri(), + template=template, + infer_bbox=True, + proj=True, + infer_geometry=False, + infer_datetime=stac_table.InferDatetimeOptions.no, + datetime_column=None, + metadata_created=DATETIME_STAC_CREATED, + data_created=DATETIME_DATA_CREATED, + count_rows=True, + asset_key="data", + asset_href=pp.to_cloud_uri(), + asset_title=ASSET_TITLE, + asset_description=ASSET_DESCRIPTION, + asset_media_type=PARQUET_MEDIA_TYPE, + asset_roles=["data"], + asset_extra_fields=asset_extra_fields, + storage_options=storage_options, + validate=False, + ) + assert isinstance(item, pystac.Item) + + # add descriptions to item properties + if "table:columns" in ASSET_EXTRA_FIELDS and "table:columns" in item.properties: + source_lookup = { + col["name"]: col for col in ASSET_EXTRA_FIELDS["table:columns"] + } + + for target_col in item.properties["table:columns"]: + source_col = source_lookup.get(target_col["name"]) + if source_col: + target_col.setdefault("description", source_col.get("description")) + + # Optionally add an HTTPS link if the URI uses a 'cloud protocol' + if not item.assets["data"].href.startswith("https://"): + item.add_link( + pystac.Link( + rel="alternate", + target=pp.to_https_url(), + title="HTTPS access", + media_type=PARQUET_MEDIA_TYPE, + ) + ) + return item + + +if __name__ == "__main__": + storage_options = {"account_name": "coclico", "credential": sas_token} + fs, token, [root] = fsspec.get_fs_token_paths( + CONTAINER_URI, storage_options=storage_options + ) + paths = fs.glob(CONTAINER_URI + "/**/*.parquet") + uris = ["az://" + p for p in paths] + + STAC_DIR = pathlib.Path.home() / "dev" / "coclicodata" / "current" + catalog = pystac.Catalog.from_file(str(STAC_DIR / "catalog.json")) + + stac_io = DefaultStacIO() + layout = ParquetLayout() + + collection = create_collection( + extra_fields={"storage_pattern": CONTAINER_URI + "/*.parquet"} + ) + collection.validate_all() + + for uri in tqdm.tqdm(uris, desc="Processing files"): + match = re.search(r"_([0-9]+m)\.parquet$", uri) + if match: + buffer_size = match.group(1) + item_extra_fields = {"buffer_size": buffer_size} + + item = create_item( + uri, + storage_options=storage_options, + asset_extra_fields=ASSET_EXTRA_FIELDS, + item_extra_fields=item_extra_fields, + ) + item.validate() + collection.add_item(item) + + collection.update_extent_from_items() + + items = list(collection.get_all_items()) + items_as_json = [i.to_dict() for i in items] + item_extents = stac_geoparquet.to_geodataframe(items_as_json) + + with fsspec.open(GEOPARQUET_STAC_ITEMS_HREF, mode="wb", **storage_options) as f: + item_extents.to_parquet(f) + + gpq_items_asset = pystac.Asset( + GEOPARQUET_STAC_ITEMS_HREF, + title="GeoParquet STAC items", + description="Snapshot of the collection's STAC items exported to GeoParquet format.", + media_type=PARQUET_MEDIA_TYPE, + roles=["metadata"], + ) + gpq_items_asset.common_metadata.created = DATETIME_DATA_CREATED + collection.add_asset("geoparquet-stac-items", gpq_items_asset) + + # TODO: there should be a cleaner method to remove the previous stac catalog and its items + try: + if catalog.get_child(collection.id): + catalog.remove_child(collection.id) + print(f"Removed child: {collection.id}.") + except Exception: + pass + + catalog.add_child(collection) + + collection.normalize_hrefs(str(STAC_DIR / collection.id), layout) + + collection.validate_all() + + catalog.save( + catalog_type=pystac.CatalogType.SELF_CONTAINED, + dest_href=str(STAC_DIR), + stac_io=stac_io, + ) diff --git a/scripts/python/stac_deltadtm.py b/scripts/python/stac_deltadtm.py index 4446127..c354d79 100644 --- a/scripts/python/stac_deltadtm.py +++ b/scripts/python/stac_deltadtm.py @@ -1,9 +1,7 @@ -import dataclasses import datetime import logging import os import pathlib -import re from typing import Any import fsspec @@ -24,10 +22,18 @@ from coclicodata.coclico_stac.layouts import CoCliCoCOGLayout from dotenv import load_dotenv from pystac.extensions import raster +from pystac.extensions.file import FileExtension +from pystac.extensions.item_assets import ItemAssetsExtension +from pystac.extensions.projection import ProjectionExtension +from pystac.extensions.scientific import ScientificExtension +from pystac.extensions.version import VersionExtension +from pystac.provider import ProviderRole from pystac.stac_io import DefaultStacIO from stactools.core.utils import antimeridian from tqdm import tqdm +from coastpy.io.utils import PathParser + # Load the environment variables from the .env file load_dotenv(override=True) @@ -50,47 +56,27 @@ "xarray:storage_options": {"account_name": "coclico"}, } -DATE = datetime.datetime(2023, 10, 30, tzinfo=datetime.UTC) +# Container and URI configuration +VERSION = "v1.1" +DATETIME_STAC_CREATED = datetime.datetime.now(datetime.UTC) +DATETIME_DATA_CREATED = datetime.datetime(2023, 10, 30, tzinfo=datetime.UTC) +CONTAINER_NAME = "deltares-delta-dtm" +CONTAINER_URI = f"az://{CONTAINER_NAME}/{VERSION}" +GEOPARQUET_STAC_ITEMS_HREF = f"az://items/{COLLECTION_ID}.parquet" +LICENSE = "CC-BY-4.0" +# Data specifics NODATA_VALUE = -9999 RESOLUTION = 30 +DATA_TYPE = raster.DataType.FLOAT32 +UNIT = "m" PARQUET_MEDIA_TYPE = "application/vnd.apache.parquet" -CONTAINER_NAME = "deltares-delta-dtm" -PREFIX = "v1.1" -CONTAINER_URI = f"az://{CONTAINER_NAME}/{PREFIX}" -GEOPARQUET_STAC_ITEMS_HREF = f"az://items/{COLLECTION_ID}.parquet" - +# NOTE: Since December 2024 we have changed to https hrefs instead of az blob storage paths EXAMPLE_HREF = "https://coclico.blob.core.windows.net/deltares-delta-dtm/v1.1/DeltaDTM_v1_1_N03W052.tif" -@dataclasses.dataclass -class PathParser: - """ - Parses a cloud storage path into its component parts, specifically designed for Azure Blob Storage and COG data. - This class assumes paths are formatted like "az:////.tif" - """ - - path: str - container: str | None = None - prefix: str | None = None - name: str | None = None - stac_item_id: str | None = None - - _base_href = f"https://{STORAGE_ACCOUNT_NAME}.blob.core.windows.net" - - def __post_init__(self) -> None: - stripped_path = re.sub(r"^\w+://", "", self.path) - split_path = stripped_path.rstrip("/").split("/") - - self.container = split_path[0] - self.name = split_path[-1] - self.prefix = "/".join(split_path[1:-1]) - self.stac_item_id = self.name.rsplit(".", 1)[0] - self.href = f"{self._base_href}/{self.container}/{self.prefix}/{self.name}" - - def create_collection( description: str | None = None, extra_fields: dict[str, Any] | None = None ) -> pystac.Collection: @@ -98,10 +84,10 @@ def create_collection( pystac.Provider( name="Deltares", roles=[ - pystac.provider.ProviderRole.PRODUCER, - pystac.provider.ProviderRole.PROCESSOR, - pystac.provider.ProviderRole.HOST, - pystac.provider.ProviderRole.LICENSOR, + ProviderRole.PRODUCER, + ProviderRole.PROCESSOR, + ProviderRole.HOST, + ProviderRole.LICENSOR, ], url="https://deltares.nl", ), @@ -109,7 +95,7 @@ def create_collection( extent = pystac.Extent( pystac.SpatialExtent([[-180.0, 90.0, 180.0, -90.0]]), - pystac.TemporalExtent([[DATE, None]]), + pystac.TemporalExtent([[DATETIME_DATA_CREATED, None]]), ) links = [ @@ -147,12 +133,13 @@ def create_collection( "https://coclico.blob.core.windows.net/assets/thumbnails/deltares-delta-dtm-thumbnail.jpeg", title="Thumbnail", media_type=pystac.MediaType.JPEG, + roles=["thumbnail"], ), ) collection.links = links collection.keywords = keywords - pystac.extensions.item_assets.ItemAssetsExtension.add_to(collection) + ItemAssetsExtension.add_to(collection) collection.extra_fields["item_assets"] = { "data": { @@ -167,7 +154,7 @@ def create_collection( if extra_fields: collection.extra_fields.update(extra_fields) - pystac.extensions.scientific.ScientificExtension.add_to(collection) + ScientificExtension.add_to(collection) collection.extra_fields["sci:citation"] = ( """Pronk, Maarten. 2024. “DeltaDTM v1.1: A Global Coastal Digital Terrain Model.” 4TU.ResearchData. https://doi.org/10.4121/21997565.V3.""" ) @@ -179,7 +166,7 @@ def create_collection( } ] - pystac.extensions.version.VersionExtension.add_to(collection) + VersionExtension.add_to(collection) collection.extra_fields["version"] = "1.1" return collection @@ -194,19 +181,16 @@ def create_item(block, item_id, antimeridian_strategy=antimeridian.Strategy.SPLI id=item_id, geometry=geometry, bbox=bbox, - datetime=DATE, + datetime=DATETIME_DATA_CREATED, properties={}, ) + item.common_metadata.created = DATETIME_STAC_CREATED antimeridian.fix_item(item, antimeridian_strategy) - item.common_metadata.created = datetime.datetime.now(datetime.UTC) - - ext = pystac.extensions.projection.ProjectionExtension.ext( - item, add_if_missing=True - ) + ext = ProjectionExtension.ext(item, add_if_missing=True) ext.bbox = block.rio.bounds() # these are provided in the crs of the data - ext.shape = tuple(v for k, v in block.sizes.items() if k in ["y", "x"]) + ext.shape = [v for k, v in block.sizes.items() if k in ["y", "x"]] ext.epsg = block.rio.crs.to_epsg() ext.geometry = shapely.geometry.mapping(shapely.geometry.box(*ext.bbox)) ext.transform = list(block.rio.transform())[:6] @@ -216,7 +200,7 @@ def create_item(block, item_id, antimeridian_strategy=antimeridian.Strategy.SPLI def create_asset( - item, asset_title, asset_href, nodata, resolution, data_type, nbytes=None + item, asset_title, asset_href, nodata, resolution, data_type, nbytes=None, unit=None ): asset = pystac.Asset( href=asset_href, @@ -224,10 +208,11 @@ def create_asset( title=asset_title, roles=["data"], ) + asset.common_metadata.created = DATETIME_DATA_CREATED item.add_asset("data", asset) - pystac.extensions.file.FileExtension.ext(asset, add_if_missing=True) + FileExtension.ext(asset, add_if_missing=True) if nbytes: asset.extra_fields["file:size"] = nbytes @@ -236,7 +221,8 @@ def create_asset( raster.RasterBand.create( nodata=nodata, spatial_resolution=resolution, - data_type=data_type, # e.g., raster.DataType.INT8 + data_type=data_type, + unit=unit, ) ] @@ -262,19 +248,20 @@ def create_asset( for fp in tqdm(fps, desc="Processing file paths"): href = "az://" + fp - pp = PathParser(href) + urlpath = PathParser(href, account_name=STORAGE_ACCOUNT_NAME) with fs.open(fp, "rb") as f: block = xr.open_dataset(f, engine="rasterio")["band_data"].squeeze() - item = create_item(block, pp.stac_item_id) + item = create_item(block, urlpath.stac_item_id) item = create_asset( item, ASSET_TITLE, - pp.href, + urlpath.to_https_url(), nodata=NODATA_VALUE, resolution=RESOLUTION, - data_type=raster.DataType.FLOAT32, + data_type=DATA_TYPE, + unit=UNIT, ) collection.add_item(item) @@ -287,25 +274,25 @@ def create_asset( with fsspec.open(GEOPARQUET_STAC_ITEMS_HREF, mode="wb", **storage_options) as f: item_extents.to_parquet(f) - collection.add_asset( - "geoparquet-stac-items", - pystac.Asset( - GEOPARQUET_STAC_ITEMS_HREF, - title="GeoParquet STAC items", - description="Snapshot of the collection's STAC items exported to GeoParquet format.", - media_type=PARQUET_MEDIA_TYPE, - roles=["data"], - ), + gpq_items_asset = pystac.Asset( + GEOPARQUET_STAC_ITEMS_HREF, + title="GeoParquet STAC items", + description="Snapshot of the collection's STAC items exported to GeoParquet format.", + media_type=PARQUET_MEDIA_TYPE, + roles=["metadata"], ) + gpq_items_asset.common_metadata.created = DATETIME_DATA_CREATED + collection.add_asset("geoparquet-stac-items", gpq_items_asset) - collection.add_asset( - "geoserver_link", - pystac.Asset( - # https://coclico.avi.deltares.nl/geoserver/%s/wms?bbox={bbox-epsg-3857}&format=image/png&service=WMS&version=1.1.1&request=GetMap&srs=EPSG:3857&transparent=true&width=256&height=256&layers=%s"%(COLLECTION_ID, COLLECTION_ID + ":" + ASSET_TITLE), - "https://coclico.avi.deltares.nl/geoserver/cfhp/wms?bbox={bbox-epsg-3857}&format=image/png&service=WMS&version=1.1.1&request=GetMap&srs=EPSG:3857&transparent=true&width=256&height=256&layers=cfhp:HIGH_DEFENDED_MAPS_Mean_spring_tide", # test - title="Geoserver Mosaic link", - media_type=pystac.media_type.MediaType.COG, - ), + # Currently this is a link for testing purpose. When the data is available on the WMS + # server it will be updated here. + collection.add_link( + pystac.Link( + rel="service", + target="https://coclico.avi.deltares.nl/geoserver/cfhp/wms?bbox={bbox-epsg-3857}&format=image/png&service=WMS&version=1.1.1&request=GetMap&srs=EPSG:3857&transparent=true&width=256&height=256&layers=cfhp:HIGH_DEFENDED_MAPS_Mean_spring_tide", + title="Geoserver WMS Link", + media_type="application/vnd.ogc.wms_xml", + ) ) catalog = pystac.Catalog.from_file(str(STAC_DIR / "catalog.json")) diff --git a/scripts/python/stac_gcts.py b/scripts/python/stac_gcts.py index f99a6f3..20ed669 100644 --- a/scripts/python/stac_gcts.py +++ b/scripts/python/stac_gcts.py @@ -1,20 +1,20 @@ -import dataclasses import datetime import os import pathlib -import re from typing import Any import fsspec import pystac import stac_geoparquet +import tqdm from dotenv import load_dotenv from pystac.extensions.item_assets import ItemAssetsExtension -from pystac.extensions.scientific import ScientificExtension +from pystac.extensions.scientific import Publication, ScientificExtension from pystac.extensions.version import VersionExtension from pystac.provider import ProviderRole from pystac.stac_io import DefaultStacIO +from coastpy.io.utils import PathParser from coastpy.libs import stac_table from coastpy.stac import ParquetLayout @@ -23,16 +23,15 @@ # Get the SAS token and storage account name from environment variables sas_token = os.getenv("AZURE_STORAGE_SAS_TOKEN") -storage_account_name = "coclico" -storage_options = {"account_name": storage_account_name, "credential": sas_token} - -# NOTE: -TEST_RELEASE = True +STORAGE_ACCOUNT_NAME = "coclico" +storage_options = {"account_name": STORAGE_ACCOUNT_NAME, "credential": sas_token} # Container and URI configuration +VERSION = "2024-08-02" +DATETIME_STAC_CREATED = datetime.datetime.now(datetime.UTC) +DATETIME_DATA_CREATED = datetime.datetime(2023, 2, 9) CONTAINER_NAME = "gcts" -RELEASE_DATE = "2024-08-02" -PREFIX = f"release/{RELEASE_DATE}" +PREFIX = f"release/{VERSION}" CONTAINER_URI = f"az://{CONTAINER_NAME}/{PREFIX}" PARQUET_MEDIA_TYPE = "application/vnd.apache.parquet" LICENSE = "CC-BY-4.0" @@ -58,11 +57,7 @@ ASSET_TITLE = "GCTS" ASSET_DESCRIPTION = f"Parquet dataset with coastal transects ({TRANSECT_LENGTH} m) at 100 m alongshore resolution for this region." -# GeoParquet STAC items -if TEST_RELEASE: - GEOPARQUET_STAC_ITEMS_HREF = f"az://items-test/{COLLECTION_ID}.parquet" -else: - GEOPARQUET_STAC_ITEMS_HREF = f"az://items/{COLLECTION_ID}.parquet" +GEOPARQUET_STAC_ITEMS_HREF = f"az://items/{COLLECTION_ID}.parquet" COLUMN_DESCRIPTIONS = [ { @@ -144,42 +139,39 @@ } -@dataclasses.dataclass -class PathParts: +def add_citation_extension(collection): """ - Parses a path into its component parts, supporting variations with and without hive partitioning, - and with and without geographical bounds. - - Attributes: - path (str): The full path to parse. - container (str | None): The storage container or bucket name. - prefix (str | None): The path prefix between the container and the file name. - name (str | None): The file name including its extension. - stac_item_id (str | None): The identifier used for STAC, which is the file name without the .parquet extension. + Add citation-related metadata to the STAC collection using the Citation Extension. """ + # Add the Scientific Extension to the collection + ScientificExtension.add_to(collection) - path: str - container: str | None = None - prefix: str | None = None - name: str | None = None - stac_item_id: str | None = None - - def __post_init__(self) -> None: - # Strip any protocol pattern like "az://" - stripped_path = re.sub(r"^\w+://", "", self.path) - split = stripped_path.rstrip("/").split("/") + # Define the DOI and citation + DOI = "10.1016/j.envsoft.2024.106257" + CITATION = ( + "Floris Reinier Calkoen, Arjen Pieter Luijendijk, Kilian Vos, Etiënne Kras, Fedor Baart, " + "Enabling coastal analytics at planetary scale, Environmental Modelling & Software, 2024, " + "106257, ISSN 1364-8152, https://doi.org/10.1016/j.envsoft.2024.106257." + ) - # Extract the container which is the first part of the path - self.container = split[0] + # Add the DOI and citation to the collection's extra fields + sci_ext = ScientificExtension.ext(collection, add_if_missing=True) + sci_ext.doi = DOI + sci_ext.citation = CITATION - # The file name is the last element in the split path - self.name = split[-1] + # Optionally add publications (if applicable) + sci_ext.publications = [Publication(doi=DOI, citation=CITATION)] - # The prefix is everything between the container and the filename - self.prefix = "/".join(split[1:-1]) if len(split) > 2 else None + # Add a link to the DOI + collection.add_link( + pystac.Link( + rel="cite-as", + target=f"https://doi.org/{DOI}", + title="Citation DOI", + ) + ) - # stac_item_id is the filename without the .parquet extension - self.stac_item_id = self.name.replace(".parquet", "") + return collection def create_collection( @@ -198,11 +190,9 @@ def create_collection( ), ] - start_datetime = datetime.datetime.strptime(RELEASE_DATE, "%Y-%m-%d") - extent = pystac.Extent( pystac.SpatialExtent([[-180.0, 90.0, 180.0, -90.0]]), - pystac.TemporalExtent([[start_datetime, None]]), + pystac.TemporalExtent([[DATETIME_DATA_CREATED, None]]), ) links = [ @@ -247,6 +237,7 @@ def create_collection( "https://coclico.blob.core.windows.net/assets/thumbnails/gcts-thumbnail.jpeg", title="Thumbnail", media_type=pystac.MediaType.JPEG, + roles=["thumbnail"], ), ) collection.links = links @@ -267,82 +258,66 @@ def create_collection( if extra_fields: collection.extra_fields.update(extra_fields) - ScientificExtension.add_to(collection) - # TODO: revise citation upon publication - - CITATION = """ - Floris Reinier Calkoen, Arjen Pieter Luijendijk, Kilian Vos, Etiënne Kras, Fedor Baart, - Enabling coastal analytics at planetary scale, Environmental Modelling & Software, 2024, - 106257, ISSN 1364-8152, https://doi.org/10.1016/j.envsoft.2024.106257. - (https://www.sciencedirect.com/science/article/pii/S1364815224003189) - """ + collection = add_citation_extension(collection) + version_ext = VersionExtension.ext(collection, add_if_missing=True) + version_ext.version = VERSION - # NOTE: we could make a separate DOI for the transects and then link the paper in - # sci:citations as a feature. However, for now I (Floris) prefer to use the same DOI. - collection.extra_fields["sci:citation"] = CITATION - # collection.extra_fields["sci:doi"] = "https://doi.org/10.1016/j.envsoft.2024.106257" - - collection.stac_extensions.append(stac_table.SCHEMA_URI) - - VersionExtension.add_to(collection) - collection.extra_fields["version"] = RELEASE_DATE + # NOTE: Add schema validation after making a PR to the stac-table repo + # collection.stac_extensions.append(stac_table.SCHEMA_URI) return collection def create_item( - asset_href: str, + urlpath: str, storage_options: dict[str, Any] | None = None, + properties: dict[str, Any] | None = None, + item_extra_fields: dict[str, Any] | None = None, asset_extra_fields: dict[str, Any] | None = None, ) -> pystac.Item: - """Create a STAC Item - - For + """Create a STAC Item""" - Args: - asset_href (str): The HREF pointing to an asset associated with the item + if item_extra_fields is None: + item_extra_fields = {} - Returns: - Item: STAC Item object - """ - - parts = PathParts(asset_href) + if properties is None: + properties = {} - properties = { - "title": ASSET_TITLE, - "description": ASSET_DESCRIPTION, - } + pp = PathParser(urlpath, account_name=STORAGE_ACCOUNT_NAME) - dt = datetime.datetime.strptime(RELEASE_DATE, "%Y-%m-%d") - # shape = shapely.box(*bbox) - # geometry = shapely.geometry.mapping(shape) template = pystac.Item( - id=parts.stac_item_id, + id=pp.stac_item_id, properties=properties, geometry=None, bbox=None, - datetime=dt, + datetime=DATETIME_DATA_CREATED, stac_extensions=[], ) + template.common_metadata.created = DATETIME_STAC_CREATED item = stac_table.generate( - uri=asset_href, + uri=pp.to_cloud_uri(), template=template, infer_bbox=True, - infer_geometry=None, - datetime_column=None, + proj=True, + infer_geometry=False, infer_datetime=stac_table.InferDatetimeOptions.no, + datetime_column=None, + metadata_created=DATETIME_STAC_CREATED, + data_created=DATETIME_DATA_CREATED, count_rows=True, asset_key="data", + asset_href=pp.to_cloud_uri(), + asset_title=ASSET_TITLE, + asset_description=ASSET_DESCRIPTION, + asset_media_type=PARQUET_MEDIA_TYPE, + asset_roles=["data"], asset_extra_fields=asset_extra_fields, - proj=True, storage_options=storage_options, validate=False, ) assert isinstance(item, pystac.Item) - item.common_metadata.created = datetime.datetime.now(datetime.UTC) - # add descriptions to item properties if "table:columns" in ASSET_EXTRA_FIELDS and "table:columns" in item.properties: source_lookup = { @@ -354,10 +329,16 @@ def create_item( if source_col: target_col.setdefault("description", source_col.get("description")) - # TODO: make configurable upstream - item.assets["data"].title = ASSET_TITLE - item.assets["data"].description = ASSET_DESCRIPTION - + # Optionally add an HTTPS link if the URI uses a 'cloud protocol' + if not item.assets["data"].href.startswith("https://"): + item.add_link( + pystac.Link( + rel="alternate", + target=pp.to_https_url(), + title="HTTPS access", + media_type=PARQUET_MEDIA_TYPE, + ) + ) return item @@ -374,11 +355,12 @@ def create_item( stac_io = DefaultStacIO() layout = ParquetLayout() - - collection = create_collection(extra_fields={"container_uri": CONTAINER_URI}) + collection = create_collection( + extra_fields={"storage_pattern": CONTAINER_URI + "/*.parquet"} + ) collection.validate_all() - for uri in uris: + for uri in tqdm.tqdm(uris, desc="Processing files"): item = create_item(uri, storage_options=storage_options) item.validate() collection.add_item(item) @@ -392,16 +374,15 @@ def create_item( with fsspec.open(GEOPARQUET_STAC_ITEMS_HREF, mode="wb", **storage_options) as f: item_extents.to_parquet(f) - collection.add_asset( - "geoparquet-stac-items", - pystac.Asset( - GEOPARQUET_STAC_ITEMS_HREF, - title="GeoParquet STAC items", - description="Snapshot of the collection's STAC items exported to GeoParquet format.", - media_type=PARQUET_MEDIA_TYPE, - roles=["data"], - ), + gpq_items_asset = pystac.Asset( + GEOPARQUET_STAC_ITEMS_HREF, + title="GeoParquet STAC items", + description="Snapshot of the collection's STAC items exported to GeoParquet format.", + media_type=PARQUET_MEDIA_TYPE, + roles=["metadata"], ) + gpq_items_asset.common_metadata.created = DATETIME_DATA_CREATED + collection.add_asset("geoparquet-stac-items", gpq_items_asset) # TODO: there should be a cleaner method to remove the previous stac catalog and its items try: diff --git a/src/coastpy/geo/utils.py b/src/coastpy/geo/utils.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/coastpy/io/cloud.py b/src/coastpy/io/cloud.py index 9bbe7a9..4045cc8 100644 --- a/src/coastpy/io/cloud.py +++ b/src/coastpy/io/cloud.py @@ -1,5 +1,4 @@ import logging -import pathlib from typing import Any import fsspec @@ -7,145 +6,6 @@ import pandas as pd import xarray as xr -from coastpy.io.utils import is_local_file_path - - -def to_https_url(href: str, storage_options: dict | None = None) -> str: - """Converts a cloud storage href to its corresponding HTTPS URL. - - Args: - href (str): The href string to be converted. - storage_options (dict, optional): Dictionary of storage options that can contain - platform-specific settings. For Azure Blob Storage, it should contain - the 'account_name' key. - - Returns: - str: The corresponding HTTPS URL. - - Raises: - ValueError: If the protocol in href is unknown or unsupported or if required - options are missing in 'storage_options'. - """ - - # Local file system - if storage_options is None: - storage_options = {} - if pathlib.Path(href).exists(): - return f"file://{href}" - - # Google Cloud Storage - for protocol in ["gs://", "gcs://"]: - if href.startswith(protocol): - return href.replace(protocol, "https://storage.googleapis.com/") - - # Azure Blob Storage - if href.startswith(("az://", "azure://", "abfs://")): - if "account_name" not in storage_options: - msg = ( - "For Azure Blob Storage hrefs, the 'account_name' is required in" - " 'storage_options'." - ) - raise ValueError(msg) - - _, path = href.split("://", 1) - account_name = storage_options["account_name"] - return f"https://{account_name}.blob.core.windows.net/{path}" - - # AWS S3 - if href.startswith("s3://"): - _, rest = href.split("://", 1) - bucket, key = rest.split("/", 1) - return f"https://{bucket}.s3.amazonaws.com/{key}" - - msg = f"Unknown or unsupported protocol in href: {href}" - raise ValueError(msg) - - -def to_uri(href_or_uri: str, protocol: str | None = None) -> str: - """ - Converts an HTTPS URL to its corresponding cloud storage URI based on the specified protocol. - If the input is already a valid cloud URI, it adjusts the URI based on the provided protocol or returns it as is. - - Args: - href_or_uri (str): The HTTPS URL or cloud URI to be converted or validated. - protocol (Optional[str]): The desired cloud protocol ("gs", "gcs", "az", "azure", "abfs", "s3"). - - Returns: - str: The corresponding URI in the desired protocol. - - Raises: - ValueError: If the protocol is unknown, unsupported, or href_or_uri does not match the expected format. - - Example: - >>> to_uri_protocol("https://storage.googleapis.com/my-bucket/my-file.txt", protocol="gs") - 'gs://my-bucket/my-file.txt' - """ - valid_protocols = ["gs", "gcs", "az", "azure", "abfs", "s3"] - href_or_uri = str(href_or_uri) - - # Google Cloud Storage - gcs_formats = [ - "https://storage.googleapis.com/", - "https://storage.cloud.google.com/", - ] - - for fmt in gcs_formats: - if href_or_uri.startswith(fmt): - prefix = href_or_uri.replace(fmt, "") - if protocol in [None, "gs"]: - return f"gs://{prefix}" - elif protocol == "gcs": - return f"gcs://{prefix}" - else: - msg = ( - "For Google Cloud Storage href, valid protocols are 'gs' and" - f" 'gcs', but got '{protocol}'." - ) - raise ValueError(msg) - - # Azure Blob Storage - if ".blob.core.windows.net/" in href_or_uri: - # Split the href to get the container and path - container_and_path = href_or_uri.split(".blob.core.windows.net/", 1)[1] - if protocol in [None, "az", "azure"]: - return f"az://{container_and_path}" - elif protocol == "abfs": - return f"abfs://{container_and_path}" - msg = ( - "For Azure Blob Storage href, valid protocols are 'az', 'azure', and" - f" 'abfs', but got '{protocol}'." - ) - raise ValueError(msg) - - # AWS S3 - if href_or_uri.startswith("https://s3.") and ".amazonaws.com/" in href_or_uri: - parts = href_or_uri.replace("https://", "").split(".amazonaws.com/") - bucket = parts[0].replace("s3.", "") - key = parts[1] - if protocol in [None, "s3"]: - return f"s3://{bucket}/{key}" - msg = f"For AWS S3 href, the valid protocol is 's3', but got '{protocol}'." - raise ValueError(msg) - - # If the input is already a valid cloud URI - for uri_prefix in ["gs://", "gcs://", "az://", "azure://", "abfs://", "s3://"]: - if href_or_uri.startswith(uri_prefix): - if protocol is None: - return href_or_uri - else: - return to_uri(href_or_uri.replace(uri_prefix, "https://", 1), protocol) - - if is_local_file_path(href_or_uri): - return href_or_uri - - msg = ( - f"Unknown or unsupported protocol: {protocol}. Valid protocols are" - f" {', '.join(valid_protocols)}.\nOr href_or_uri does not match expected" - f" format: {href_or_uri}" - ) - - raise ValueError(msg) - def write_block( block: xr.DataArray, diff --git a/src/coastpy/io/utils.py b/src/coastpy/io/utils.py index 8f86172..36bc8ec 100644 --- a/src/coastpy/io/utils.py +++ b/src/coastpy/io/utils.py @@ -1,3 +1,4 @@ +import dataclasses import json import logging import pathlib @@ -5,12 +6,12 @@ import warnings from datetime import datetime from posixpath import join as urljoin -from urllib.parse import urlsplit +from typing import Any +from urllib.parse import urlparse, urlsplit import fsspec import geopandas as gpd import pandas as pd -import xarray import xarray as xr from pyproj import Transformer from shapely.geometry import box @@ -19,9 +20,9 @@ logger = logging.getLogger(__name__) -def is_local_file_path(path: str | pathlib.Path) -> bool: +def is_file(urlpath: str | pathlib.Path) -> bool: """ - Determine if a given path is a local filesystem path using urlsplit. + Determine if a urlpath is a filesystem path by using urlsplit. Args: path (str): The path to check. @@ -29,11 +30,185 @@ def is_local_file_path(path: str | pathlib.Path) -> bool: Returns: bool: True if it's a local file path, False otherwise. """ - parsed = urlsplit(str(path)) + parsed = urlsplit(str(urlpath)) return parsed.scheme in ["", "file"] +@dataclasses.dataclass +class PathParser: + """ + Parses cloud storage paths into components, supporting multiple protocols. + + Attributes: + urlpath (str): Full URL or URI to parse. + scheme (str): Protocol from the URL (e.g., "https", "az", "gs", "s3"). + container (str): Storage container, bucket, or top-level directory. + prefix (str | None): Path prefix inside the container/bucket. + name (str): File name including its extension. + suffix (str): File extension (e.g., ".parquet", ".tif"). + stac_item_id (str): Identifier for STAC, derived from the file name. + base_url (str): Base HTTPS URL for accessing the resource. + base_uri (str): Base URI for the cloud storage provider. + href (str): Full HTTPS URL for accessing the resource. + uri (str): Full URI for cloud-specific access. + account_name (str | None): Account name for cloud storage (required for Azure). + path (str): Full path within the container, excluding the container name. + """ + + urlpath: str + scheme: str = "" + container: str = "" + path: str = "" + name: str = "" + suffix: str = "" + stac_item_id: str = "" + https_url: str = "" + cloud_uri: str = "" + account_name: str | None = None + cloud_protocol: str = "" + base_dir: str | pathlib.Path = "" + _base_https_url: str = "" + _base_cloud_uri: str = "" + + SUPPORTED_PROTOCOLS = {"https", "az", "gs", "s3"} + DEFAULT_BASE_DIR = pathlib.Path.home() / "data" / "tmp" + + def __post_init__(self): + if is_file(self.urlpath): + self._parse_path() + else: + self._parse_url() + + def _parse_url(self): + parsed = urlparse(self.urlpath) + + # Basic parsing + self.scheme = parsed.scheme + self.name = parsed.path.split("/")[-1] + self.suffix = f".{self.name.split('.')[-1]}" + self.stac_item_id = self.name.split(".")[0] + + if not self.base_dir: + self.base_dir = self.DEFAULT_BASE_DIR + + # Check for supported protocols + if self.scheme not in self.SUPPORTED_PROTOCOLS: + msg = f"Unsupported protocol: {self.scheme}" + raise ValueError(msg) + + # Protocol-specific parsing + if self.scheme == "https": + self.container = parsed.path.split("/")[1] + self._base_https_url = ( + self.scheme + "://" + parsed.netloc + "/" + self.container + ) + self.path = "/".join(parsed.path.split("/")[2:]) + + if "windows" in self._base_https_url: + if not self.cloud_protocol: + self.cloud_protocol = "az" + elif "google" in self._base_https_url: + if not self.cloud_protocol: + self.cloud_protocol = "gs" + elif "amazon" in self._base_https_url: # noqa: SIM102 + if not self.cloud_protocol: + self.cloud_protocol = "s3" + + elif self.scheme in {"az", "gs", "s3"}: + self.path = parsed.path.lstrip("/") # Remove leading slash + self.container = parsed.netloc + self.cloud_protocol = self.scheme + + if self.scheme == "az": + if not self.account_name: + msg = "For 'az://' URIs, 'account_name' must be provided." + raise ValueError(msg) + self._base_https_url = f"https://{self.account_name}.blob.core.windows.net/{self.container}" + elif self.scheme == "gs": + self._base_https_url = ( + f"https://storage.googleapis.com/{self.container}" + ) + elif self.scheme == "s3": + self._base_https_url = f"https://{self.container}.s3.amazonaws.com" + + # Common attributes + self.https_url = f"{self._base_https_url}/{self.path}" + self.cloud_uri = f"{self.cloud_protocol}://{self.container}/{self.path}" + + def _parse_path(self): + path = pathlib.Path(self.urlpath) + + if not self.base_dir: + msg = "For local file paths, 'base_dir' must be provided." + raise ValueError(msg) + + if not self.cloud_protocol: + msg = "For local file paths, 'cloud_protocol' must be provided." + raise ValueError(msg) + + path = path.relative_to(self.base_dir) + parts = path.parts + if len(parts) < 2: + msg = "Local file paths must have at least two components." + raise ValueError(msg) + self.container = parts[0] + self.path = "/".join(parts[1:]) + self.name = path.name + self.suffix = f".{path.suffix}" + self.stac_item_id = path.stem + + if self.cloud_protocol == "az": + if not self.account_name: + msg = "For 'az://' URIs, 'account_name' must be provided." + raise ValueError(msg) + self._base_https_url = ( + f"https://{self.account_name}.blob.core.windows.net/{self.container}" + ) + + elif self.cloud_protocol == "gs": + self._base_https_url = f"https://storage.googleapis.com/{self.container}" + + elif self.cloud_protocol == "s3": + self._base_https_url = f"https://{self.container}.s3.amazonaws.com" + + # Common attributes + self.https_url = f"{self._base_https_url}/{self.path}" + self.cloud_uri = f"{self.cloud_protocol}://{self.container}/{self.path}" + + def to_filepath(self, base_dir: pathlib.Path | str | None = None) -> pathlib.Path: + """ + Convert the parsed path to a local file path. + + Args: + base_dir (pathlib.Path | str | None): Base directory for constructing the path. + + Returns: + pathlib.Path: The constructed local file path. + """ + if base_dir is None: + base_dir = self.base_dir + return pathlib.Path(base_dir) / self.container / self.path + + def to_https_url(self) -> str: + """ + Convert the parsed path to an HTTPS URL. + + Returns: + str: The corresponding HTTPS URL. + """ + return self.https_url + + def to_cloud_uri(self) -> str: + """ + Convert the parsed path to a cloud storage URI. + + Returns: + str: The corresponding cloud storage URI. + """ + return self.cloud_uri + + def name_block( block: xr.DataArray, storage_prefix: str = "", @@ -103,7 +278,7 @@ def name_block( name = f"{'_'.join(components)}.tif" - if not is_local_file_path(storage_prefix): # cloud storage + if not is_file(storage_prefix): # cloud storage return str(urljoin(storage_prefix, name)) else: # local storage return str(pathlib.Path(storage_prefix) / name) @@ -165,14 +340,37 @@ def name_table( name = f"{'_'.join(components)}.parquet" - if not is_local_file_path(storage_prefix): + if not is_file(storage_prefix): return str(urljoin(storage_prefix, name)) else: return str(pathlib.Path(storage_prefix) / name) +def name_bounds(bounds: tuple, crs: Any): + """ + Generate a location-based name for bounding box coordinates. + + Args: + bounds (tuple): Bounding box as (minx, miny, maxx, maxy). + crs (str): Coordinate reference system of the bounds. + + Returns: + str: Formatted name, e.g., "n45e123". + """ + bounds_geometry = box(*bounds) + if crs != "EPSG:4326": + transformer = Transformer.from_crs(crs, "EPSG:4326", always_xy=True) + bounds_geometry = transform(transformer.transform, bounds_geometry) + minx, miny, _, _ = bounds_geometry.bounds + lon_prefix = "e" if minx >= 0 else "w" + lat_prefix = "n" if miny >= 0 else "s" + formatted_minx = f"{abs(minx):03.0f}" + formatted_miny = f"{abs(miny):02.0f}" + return f"{lat_prefix}{formatted_miny}{lon_prefix}{formatted_minx}" + + def name_data( - data: pd.DataFrame | gpd.GeoDataFrame | xarray.Dataset | xarray.DataArray, + data: pd.DataFrame | gpd.GeoDataFrame | xr.Dataset | xr.DataArray, prefix: str | None = None, filename_prefix: str | None = None, include_bounds: bool = True, @@ -208,7 +406,7 @@ def name_data( suffix = ".parquet" if include_bounds and isinstance( - data, gpd.GeoDataFrame | xarray.Dataset | xarray.DataArray + data, gpd.GeoDataFrame | xr.Dataset | xr.DataArray ): if isinstance(data, gpd.GeoDataFrame): bounds = data.total_bounds @@ -219,19 +417,7 @@ def name_data( bounds = data.rio.bounds() crs = data.rio.crs.to_string() - def name_bounds(bounds, crs): - bounds_geometry = box(*bounds) - if crs != "EPSG:4326": - transformer = Transformer.from_crs(crs, "EPSG:4326", always_xy=True) - bounds_geometry = transform(transformer.transform, bounds_geometry) - minx, miny, _, _ = bounds_geometry.bounds - lon_prefix = "e" if minx >= 0 else "w" - lat_prefix = "n" if miny >= 0 else "s" - formatted_minx = f"{abs(minx):03.0f}" - formatted_miny = f"{abs(miny):02.0f}" - return f"{lat_prefix}{formatted_miny}{lon_prefix}{formatted_minx}" - - bounds_part = name_bounds(bounds, crs) + bounds_part = name_bounds(tuple(bounds), crs) if bounds_part: parts.append(bounds_part) diff --git a/src/coastpy/libs/stac_table.py b/src/coastpy/libs/stac_table.py index 6a02e34..620e449 100644 --- a/src/coastpy/libs/stac_table.py +++ b/src/coastpy/libs/stac_table.py @@ -7,22 +7,24 @@ """ import copy +import datetime import enum -from typing import TypeVar +from typing import Any, TypeVar import dask +import pystac # NOTE: until query planning is enabled in Dask GeoPandas dask.config.set({"dataframe.query-planning": False}) - import dask_geopandas import fsspec import numpy as np import pandas as pd import pyarrow as pa import pyproj -import pystac import shapely.geometry +from pystac.asset import Asset +from pystac.extensions import projection from shapely.ops import transform T = TypeVar("T", pystac.Collection, pystac.Item) @@ -42,121 +44,71 @@ class InferDatetimeOptions(str, enum.Enum): def generate( uri: str, - template, - infer_bbox=None, - infer_geometry=False, - datetime_column=None, - infer_datetime=InferDatetimeOptions.no, - count_rows=True, - asset_key="data", - asset_extra_fields=None, - proj=True, - storage_options=None, - validate=True, -) -> T: + template: pystac.Item, + infer_bbox: bool = True, + proj: bool | dict | None = None, + infer_geometry: bool = False, + infer_datetime: str = "no", + datetime_column: str | None = None, + data_created: datetime.datetime | None = None, + metadata_created: datetime.datetime | None = None, + count_rows: bool = True, + asset_href: str | None = None, + asset_key: str = "data", + asset_title: str = "Dataset root", + asset_description: str = "Root of the dataset", + asset_media_type: str = "application/vnd.apache.parquet", + asset_roles: list[str] | None = None, + asset_extra_fields: dict[str, Any] | None = None, + storage_options: dict[str, Any] | None = None, + validate: bool = True, +) -> pystac.Item: """ - Generate a STAC Item from a Parquet Dataset. - - Parameters - ---------- - uri : str - The fsspec-compatible URI pointing to the input table to generate a - STAC item for. - template : pystac.Item - The template item. This will be cloned and new data will be filled in. - infer_bbox : str, optional - The column name to use setting the Item's bounding box. - - .. note:: - - If the dataset doesn't provide spatial partitions, this will - require computation. - - infer_geometry: bool, optional - Whether to fill the item's `geometry` field with the union of the - geometries in the `infer_bbox` column. - - datetime_column: str, optional - The column name to use when setting the Item's `datetime` or - `start_datetime` and `end_datetime` properties. The method used is - determined by `infer_datetime`. - - infer_datetime: str, optional. - The method used to find a datetime from the values in `datetime_column`. - Use the options in the `InferDatetimeOptions` enum. - - - no : do not infer a datetime - - midpoint : Set `datetime` to the midpoint of the highest and lowest values. - - unique : Set `datetime` to the unique value. Raises if more than one - unique value is found. - - range : Set `start_datetime` and `end_datetime` to the minimum and - maximum values. - - count_rows : bool, default True - Whether to add the row count to `table:row_count`. - - asset_key : str, default "data" - The asset key to use for the parquet dataset. The href will be the ``uri`` and - the roles will be ``["data"]``. - - asset_extra_fields : dict, optional - Additional fields to set in the asset's ``extra_fields``. - - proj : bool or dict, default True - Whether to extract projection information from the dataset and store it - using the `projection` extension. - - By default, just `proj:crs` is extracted. If `infer_bbox` or `infer_geometry` - are specified, those will be set as well. - - Alternatively, provide a dict of values to include. - - storage_options: mapping, optional - A dictionary of keywords to provide to :meth:`fsspec.get_fs_token_paths` - when creating an fsspec filesystem with a str ``ds``. - - validate : bool, default True - Whether to validate the returned pystac.Item. - - Returns - ------- - pystac.Item - The updated pystac.Item with the following fields set - - * stac_extensions : added `table` extension - * table:columns - - Examples - -------- - - This example generates a STAC item based on the "naturalearth_lowres" datset - from geopandas. There's a bit of setup. - - >>> import datetime, geopandas, pystac, stac_table - >>> gdf = geopandas.read_file(geopandas.datasets.get_path("naturalearth_lowres")) - >>> gdf.to_parquet("data.parquet") - - Now we can create the item. - - >>> # Create the template Item - >>> item = pystac.Item( - ... "naturalearth_lowres", - ... geometry=None, - ... bbox=None, - ... datetime=datetime.datetime(2021, 1, 1), - ... properties={}, - ... ) - >>> result = stac_table.generate("data.parquet", item) - >>> result - + Generate a STAC Item from a Parquet dataset. + + Args: + uri (str): URI of the dataset (e.g., "az://path/to/file.parquet"). + template (pystac.Item): Template STAC item to clone and populate. + infer_bbox (bool, optional): Whether to infer the bounding box. + proj (bool or dict, optional): Include projection information. + - Pass `True` for default projection metadata (e.g., `proj:crs`). + - Pass a dictionary for custom projection values. + infer_geometry (bool, optional): Whether to compute and add geometry information. + infer_datetime (str, optional): Method to infer datetime: + - "no": Do not infer datetime. + - "midpoint": Use the midpoint of the datetime range. + - "unique": Use the unique datetime value (raises an error if not unique). + - "range": Add `start_datetime` and `end_datetime` based on the range. + datetime_column (str, optional): Name of the column containing datetime values. + Required if `infer_datetime` is not "no". + data_created (str, optional): ISO 8601 timestamp for when the data was created. + metadata_created (str, optional): ISO 8601 timestamp for when metadata was created. + count_rows (bool, optional): Add the row count to `table:row_count`. + asset_href (str, optional): Custom URI for the asset (overrides `uri` if provided). + asset_key (str, optional): Key for the asset in the STAC item (default: "data"). + asset_title (str, optional): Title of the asset. + asset_description (str, optional): Description of the asset. + asset_media_type (str, optional): Media type of the asset (e.g., Parquet MIME type). + asset_roles (str or list, optional): Roles for the asset (default: "data"). + asset_extra_fields (dict, optional): Additional metadata fields for the asset. + storage_options (dict, optional): fsspec storage options for accessing the dataset. + validate (bool, optional): Validate the generated STAC item (default: True). + + Returns: + pystac.Item: A STAC item populated with metadata and assets. """ - template = copy.deepcopy(template) + + if asset_roles is None: + asset_roles = ["data"] + + if proj is None or proj is False: + proj = {} + + # NOTE: Consider if its better to create from template or from scratch + item = copy.deepcopy(template) data = None storage_options = storage_options or {} - # data = dask_geopandas.read_parquet( - # ds, storage_options=storage_options - # ) ds = parquet_dataset_from_url(uri, storage_options) if ( @@ -170,107 +122,124 @@ def generate( uri, storage_options=storage_options, gather_spatial_partitions=False ) data.calculate_spatial_partitions() - # # # TODO: this doesn't actually work - # data = dask_geopandas.read_parquet( - # ds.files, filesystem=ds.filesystem, - # ) columns = get_columns(ds.schema) - template.properties["table:columns"] = columns + item.properties["table:columns"] = columns if proj is True: proj = get_proj(data) proj = proj or {} # TODO: Add schema when published - if SCHEMA_URI not in template.stac_extensions: - template.stac_extensions.append(SCHEMA_URI) - if proj and pystac.extensions.projection.SCHEMA_URI not in template.stac_extensions: - template.stac_extensions.append(pystac.extensions.projection.SCHEMA_URI) + if SCHEMA_URI not in item.stac_extensions: + item.stac_extensions.append(SCHEMA_URI) + if proj and projection.SCHEMA_URI not in item.stac_extensions: + item.stac_extensions.append(projection.SCHEMA_URI) extra_proj = {} if infer_bbox: - src_crs = data.spatial_partitions.crs.to_epsg() + spatial_partitions = data.spatial_partitions # type: ignore + + if spatial_partitions is None: + msg = "No spatial partitions found in the dataset." + raise ValueError(msg) + + src_crs = spatial_partitions.crs.to_epsg() tf = pyproj.Transformer.from_crs(src_crs, 4326, always_xy=True) - bbox = data.spatial_partitions.unary_union.bounds + bbox = spatial_partitions.unary_union.bounds # NOTE: bbox of unary union will be stored under proj extension as projected extra_proj["proj:bbox"] = bbox # NOTE: bbox will be stored in pystsac.Item.bbox in EPSG:4326 bbox = transform(tf.transform, shapely.geometry.box(*bbox)) - template.bbox = bbox.bounds + item.bbox = list(bbox.bounds) if infer_geometry: # NOTE: geom under proj extension as projected - geometry = data.unary_union.compute() + geometry = data.unary_union.compute() # type: ignore extra_proj["proj:geometry"] = shapely.geometry.mapping(geometry) # NOTE: geometry will be stored in pystsac.Item.geometry in EPSG:4326 - src_crs = data.spatial_partitions.crs.to_epsg() + src_crs = data.spatial_partitions.crs.to_epsg() # type: ignore tf = pyproj.Transformer.from_crs(src_crs, 4326, always_xy=True) geometry = transform(tf.transform, geometry) - template.geometry = shapely.geometry.mapping(geometry) + item.geometry = shapely.geometry.mapping(geometry) - if infer_bbox and template.geometry is None: + if infer_bbox and item.geometry is None: # If bbox is set then geometry must be set as well. - template.geometry = shapely.geometry.mapping( - shapely.geometry.box(*template.bbox) + item.geometry = shapely.geometry.mapping( + shapely.geometry.box(*item.bbox, ccw=True) # type: ignore ) - if infer_geometry and template.bbox is None: - template.bbox = shapely.geometry.shape(template.geometry).bounds + if infer_geometry and item.bbox is None: + item.bbox = shapely.geometry.shape(item.geometry).bounds # type: ignore if proj or extra_proj: - template.properties.update(**extra_proj, **proj) + item.properties.update(**extra_proj, **proj) - if infer_datetime != InferDatetimeOptions.no and datetime_column is None: - msg = "Must specify 'datetime_column' when 'infer_datetime != no'." - raise ValueError(msg) + if infer_datetime == InferDatetimeOptions.no: + if data_created is None: + msg = "Must specify 'datetime_data' when 'infer_datetime == no'." + raise ValueError(msg) + if datetime_column is not None: + msg = "Leave 'datetime_column' empty when 'infer_datetime == no'." + raise ValueError(msg) + else: + if datetime_column is None: + msg = "Must specify 'datetime_column' when 'infer_datetime != no'." + raise ValueError(msg) + if data_created is not None: + msg = "Leave 'datetime_data' empty when inferring datetime." + raise ValueError(msg) + + if metadata_created is not None: + item.common_metadata.created = metadata_created + + if data_created is not None: + item.datetime = data_created if infer_datetime == InferDatetimeOptions.midpoint: - values = dask.compute(data[datetime_column].min(), data[datetime_column].max()) - template.properties["datetime"] = pd.Series(values).mean().to_pydatetime() + values = dask.compute(data[datetime_column].min(), data[datetime_column].max()) # type: ignore + item.datetime = pd.Timestamp(pd.Series(values).mean()).to_pydatetime() - if infer_datetime == InferDatetimeOptions.unique: - values = data[datetime_column].unique().compute() + if infer_datetime == InferDatetimeOptions.unique and data_created is not None: + values = data[datetime_column].unique().compute() # type: ignore n = len(values) if n > 1: msg = f"infer_datetime='unique', but {n} unique values found." raise ValueError(msg) - template.properties["datetime"] = values[0].to_pydatetime() + item.datetime = values[0].to_pydatetime() if infer_datetime == InferDatetimeOptions.range: - values = dask.compute(data[datetime_column].min(), data[datetime_column].max()) + values = dask.compute(data[datetime_column].min(), data[datetime_column].max()) # type: ignore values = np.array(pd.Series(values).dt.to_pydatetime()) - template.properties["start_datetime"] = values[0].isoformat() + "Z" - template.properties["end_datetime"] = values[1].isoformat() + "Z" + item.common_metadata.start_datetime = values[0] + item.common_metadata.end_datetime = values[1] if count_rows: - template.properties["table:row_count"] = sum( - x.count_rows() for x in ds.fragments - ) + item.properties["table:row_count"] = sum(x.count_rows() for x in ds.fragments) if asset_key: - asset = pystac.asset.Asset( - # NOTE: consider using the https protocol; makes it easier for user to download - # to_https_url(items[0].assets["data"].href, storage_options={"account_name": "coclico"}) - uri, - title="Dataset root", - media_type=PARQUET_MEDIA_TYPE, - roles=["data"], - # extra_fields={"table:storage_options": asset_extra_fields}, + href = asset_href if asset_href is not None else uri + asset = Asset( + href, + title=asset_title, + description=asset_description, + media_type=asset_media_type, + roles=asset_roles, extra_fields=asset_extra_fields, ) - template.add_asset(asset_key, asset) + + item.add_asset(asset_key, asset) if validate: - template.validate() + item.validate() - return template + return item -def get_proj(ds): +def get_proj(ds) -> dict: """ Read projection information from the dataset. """ @@ -310,11 +279,29 @@ def get_columns(schema: pa.Schema, prefix: str = "") -> list: return columns -def parquet_dataset_from_url(url: str, storage_options): - fs, _, _ = fsspec.get_fs_token_paths(url, storage_options=storage_options) +def parquet_dataset_from_url(url: str, storage_options: dict): + """ + Load a Parquet dataset from a URL, handling both `https://` and `az://` protocols. + + Args: + url (str): The URL of the dataset (e.g., `az://path/to/file.parquet` or `https://...`). + storage_options (dict): Options for cloud storage (e.g., account name, SAS tokens). + + Returns: + pyarrow.parquet.ParquetDataset: A ParquetDataset object for the given URL. + """ + protocol = url.split("://")[0] + _storage_options = {} if protocol == "https" else storage_options or {} + fs, _, _ = fsspec.get_fs_token_paths(url, storage_options=_storage_options) pa_fs = pa.fs.PyFileSystem(pa.fs.FSSpecHandler(fs)) - url2 = url.split("://", 1)[-1] # pyarrow doesn't auto-strip the prefix. - # NOTE: use_legacy_dataset will be deprecated, so test if it works without it. - # ds = pa.parquet.ParquetDataset(url2, filesystem=pa_fs, use_legacy_dataset=False) + url2 = url.split("://", 1)[-1] ds = pa.parquet.ParquetDataset(url2, filesystem=pa_fs) return ds + + +# def parquet_dataset_from_url(url: str, storage_options): +# fs, _, _ = fsspec.get_fs_token_paths(url, storage_options=storage_options) +# pa_fs = pa.fs.PyFileSystem(pa.fs.FSSpecHandler(fs)) +# url2 = url.split("://", 1)[-1] # pyarrow doesn't auto-strip the prefix. +# ds = pa.parquet.ParquetDataset(url2, filesystem=pa_fs) +# return ds diff --git a/tutorials/coastal_grid.ipynb b/tutorials/coastal_grid.ipynb new file mode 100644 index 0000000..15ec5b3 --- /dev/null +++ b/tutorials/coastal_grid.ipynb @@ -0,0 +1,214 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "0", + "metadata": {}, + "source": [ + "# Coastal Zone\n", + "\n", + "The Coastal Grid dataset provides a global tiling system for geospatial analytics in coastal areas.\n", + "It supports scalable data processing workflows by offering structured grids at varying zoom levels\n", + "(5, 6, 7) and buffer sizes (500m, 1000m, 2000m, 5000m, 10000m, 15000m).\n", + "\n", + "Each tile contains information on intersecting countries, continents, and Sentinel-2 MGRS tiles\n", + "as nested JSON lists. The dataset is particularly suited for applications requiring global coastal\n", + "coverage, such as satellite-based coastal monitoring, spatial analytics, and large-scale data processing.\n", + "\n", + "Key Features:\n", + "- Global coverage of the coastal zone, derived from OpenStreetMap's generalized coastline (2023-02).\n", + "- Precomputed intersections with countries, continents, and MGRS tiles.\n", + "- Designed for use in scalable geospatial workflows.\n", + "\n", + "This dataset is structured as a STAC collection, with individual items for each zoom level and buffer\n", + "size combination. Users can filter items by the `zoom` and `buffer_size` fields in the STAC metadata.\n", + "\n", + "Please consider the following citation when using this dataset:\n", + "\n", + "Floris Reinier Calkoen, Arjen Pieter Luijendijk, Kilian Vos, Etiënne Kras, Fedor Baart,\n", + "Enabling coastal analytics at planetary scale, Environmental Modelling & Software, 2024,\n", + "106257, ISSN 1364-8152, https://doi.org/10.1016/j.envsoft.2024.106257." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1", + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "\n", + "import dask\n", + "\n", + "dask.config.set({\"dataframe.query-planning\": False})\n", + "\n", + "import geopandas as gpd\n", + "import hvplot.pandas\n", + "import pandas as pd\n", + "import pystac\n", + "import shapely\n", + "from ipyleaflet import Map, basemaps\n", + "\n", + "storage_options = {\"account_name\": \"coclico\"}" + ] + }, + { + "cell_type": "markdown", + "id": "2", + "metadata": {}, + "source": [ + "### Connect to the CoCliCo STAC " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3", + "metadata": {}, + "outputs": [], + "source": [ + "coclico_catalog = pystac.Catalog.from_file(\n", + " \"https://coclico.blob.core.windows.net/stac/v1/catalog.json\"\n", + ")\n", + "coastal_zone_collection = coclico_catalog.get_child(\"coastal-grid\")\n", + "coastal_zone_collection" + ] + }, + { + "cell_type": "markdown", + "id": "4", + "metadata": {}, + "source": [ + "### The collection includes multiple buffer distances" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "5", + "metadata": {}, + "outputs": [], + "source": [ + "list(coastal_zone_collection.get_all_items())" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6", + "metadata": {}, + "outputs": [], + "source": [ + "item = coastal_zone_collection.get_item(\"coastal_grid_z6_10000m\")\n", + "item" + ] + }, + { + "cell_type": "markdown", + "id": "7", + "metadata": {}, + "source": [ + "## The data is stored here" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8", + "metadata": {}, + "outputs": [], + "source": [ + "href = item.assets[\"data\"].href\n", + "href" + ] + }, + { + "cell_type": "markdown", + "id": "9", + "metadata": {}, + "source": [ + "## Read it into GeoPandas" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "10", + "metadata": {}, + "outputs": [], + "source": [ + "import fsspec\n", + "with fsspec.open(href, mode=\"rb\", **storage_options) as f:\n", + " coastal_zone = gpd.read_parquet(f)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "11", + "metadata": {}, + "outputs": [], + "source": [ + "print(coastal_zone.shape)\n", + "coastal_zone.head()" + ] + }, + { + "cell_type": "markdown", + "id": "12", + "metadata": {}, + "source": [ + "## Show the data on an interactive map" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "13", + "metadata": {}, + "outputs": [], + "source": [ + "coastal_zone.hvplot.polygons(\n", + " geo=True,\n", + " tiles=\"ESRI\",\n", + " color=\"coastal_grid:id\", \n", + " cmap=\"Category10\", \n", + " line_color=\"orange\", \n", + " alpha=0.8, \n", + " width=600, \n", + " legend=False\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "14", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python [conda env:coastal]", + "language": "python", + "name": "conda-env-coastal-py" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.4" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/tutorials/coastal_zone.ipynb b/tutorials/coastal_zone.ipynb new file mode 100644 index 0000000..e6929cc --- /dev/null +++ b/tutorials/coastal_zone.ipynb @@ -0,0 +1,207 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "0", + "metadata": {}, + "source": [ + "# Coastal Zone\n", + "\n", + "The Coastal Zone dataset provides a vectorized representation of coastal zones at multiple buffer distances. \n", + "It is derived from a generalized version of the OpenStreetMap coastline (2023-02) and serves as a valuable \n", + "tool for masking other datasets or for spatial analysis in coastal regions. \n", + "\n", + "This STAC collection includes multiple layers, each corresponding to a specific buffer distance:\n", + "500m, 1000m, 2000m, 5000m, 10000m, and 15000m. The buffer distance defines the zone's extent, with the \n", + "total width being twice the buffer distance (e.g., a 5000m buffer results in a zone 10km wide). \n", + "\n", + "Each layer in the collection is stored as a separate item and can be filtered using the `buffer_size` \n", + "field in the item's properties. These layers contain only the geometry and are stored in cloud-optimized\n", + "format to enable seamless integration with big geospatial data analytics products.\n", + "\n", + "Please consider the following citation when using this dataset:\n", + "\n", + "Floris Reinier Calkoen, Arjen Pieter Luijendijk, Kilian Vos, Etiënne Kras, Fedor Baart,\n", + "Enabling coastal analytics at planetary scale, Environmental Modelling & Software, 2024,\n", + "106257, ISSN 1364-8152, [https://doi.org/10.1016/j.envsoft.2024.106257](https://www.sciencedirect.com/science/article/pii/S1364815224003189)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1", + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "\n", + "import dask\n", + "\n", + "dask.config.set({\"dataframe.query-planning\": False})\n", + "\n", + "import geopandas as gpd\n", + "import hvplot.pandas\n", + "import pandas as pd\n", + "import pystac\n", + "import shapely\n", + "from ipyleaflet import Map, basemaps\n", + "\n", + "storage_options = {\"account_name\": \"coclico\"}" + ] + }, + { + "cell_type": "markdown", + "id": "2", + "metadata": {}, + "source": [ + "### Connect to the CoCliCo STAC " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3", + "metadata": {}, + "outputs": [], + "source": [ + "coclico_catalog = pystac.Catalog.from_file(\n", + " \"https://coclico.blob.core.windows.net/stac/v1/catalog.json\"\n", + ")\n", + "coastal_zone_collection = coclico_catalog.get_child(\"coastal-zone\")\n", + "coastal_zone_collection" + ] + }, + { + "cell_type": "markdown", + "id": "4", + "metadata": {}, + "source": [ + "### The collection includes multiple buffer distances" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "5", + "metadata": {}, + "outputs": [], + "source": [ + "list(coastal_zone_collection.get_all_items())" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6", + "metadata": {}, + "outputs": [], + "source": [ + "item = coastal_zone_collection.get_item(\"coastal_zone_15000m\")\n", + "item" + ] + }, + { + "cell_type": "markdown", + "id": "7", + "metadata": {}, + "source": [ + "## The data is stored here" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8", + "metadata": {}, + "outputs": [], + "source": [ + "href = item.assets[\"data\"].href\n", + "href" + ] + }, + { + "cell_type": "markdown", + "id": "9", + "metadata": {}, + "source": [ + "## Read it into GeoPandas" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "10", + "metadata": {}, + "outputs": [], + "source": [ + "import fsspec\n", + "with fsspec.open(href, mode=\"rb\", **storage_options) as f:\n", + " coastal_zone = gpd.read_parquet(f)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "11", + "metadata": {}, + "outputs": [], + "source": [ + "coastal_zone.head()" + ] + }, + { + "cell_type": "markdown", + "id": "12", + "metadata": {}, + "source": [ + "## Show the data on an interactive map" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "13", + "metadata": {}, + "outputs": [], + "source": [ + "coastal_zone.hvplot.polygons(\n", + " geo=True,\n", + " tiles=\"ESRI\",\n", + " color=\"green\", # Fill color\n", + " line_color=\"orange\", # Outline color\n", + " alpha=0.8, # Opacity for fill\n", + " width=600\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "14", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python [conda env:coastal]", + "language": "python", + "name": "conda-env-coastal-py" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.4" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/tutorials/deltadtm.ipynb b/tutorials/deltadtm.ipynb index 55f3509..7bd1a35 100644 --- a/tutorials/deltadtm.ipynb +++ b/tutorials/deltadtm.ipynb @@ -166,6 +166,22 @@ "metadata": {}, "outputs": [], "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "13", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "14", + "metadata": {}, + "outputs": [], + "source": [] } ], "metadata": {