diff --git a/scripts/python/make_gcts.py b/scripts/python/make_gcts.py index 450ef40..5b47ddd 100644 --- a/scripts/python/make_gcts.py +++ b/scripts/python/make_gcts.py @@ -1,41 +1,54 @@ +import dask + +# NOTE: explicitly set query-planning to False to avoid issues with dask-geopandas +dask.config.set({"dataframe.query-planning": False}) + import logging -import pathlib +import os import time import warnings from functools import partial -import dask - -# NOTE: explicitly set query-planning to False to avoid issues with dask-geopandas -dask.config.set({"dataframe.query-planning": False}) -import dask.dataframe as dd import dask_geopandas +import fsspec import geopandas as gpd -import mercantile import pandas as pd import pyproj +import shapely from distributed import Client +from dotenv import load_dotenv from geopandas.array import GeometryDtype -from shapely import LineString -from shapely.geometry import Point +from shapely.geometry import LineString, Point from coastpy.geo.ops import crosses_antimeridian from coastpy.geo.quadtiles_utils import add_geo_columns -from coastpy.geo.transect import ( - generate_transects_from_coastline, -) -from coastpy.io.partitioner import QuadKeyEqualSizePartitioner -from coastpy.io.retrieve import retrieve_rois +from coastpy.geo.transect import generate_transects_from_coastline from coastpy.utils.dask_utils import ( silence_shapely_warnings, ) -DATA_DIR = pathlib.Path.home() / "data" -TMP_DIR = DATA_DIR / "tmp" -SRC_DIR = DATA_DIR / "src" -PRC_DIR = DATA_DIR / "prc" -RES_DIR = DATA_DIR / "res" -LIVE_DIR = DATA_DIR / "live" +load_dotenv(override=True) + +sas_token = os.getenv("AZURE_STORAGE_SAS_TOKEN") +storage_options = {"account_name": "coclico", "credential": sas_token} + +utm_grid_url = "az://grid/utm.parquet" +osm_url = "az://coastlines-osm/release/2023-02-09/coast_3857_gen9.parquet" +countries_url = "az://public/countries.parquet" # From overture maps 2024-04-16 + +import datetime + +today = datetime.datetime.now().strftime("%Y-%m-%d") +OUT_BASE_URI = f"az://gcts/release/{today}.parquet" +TMP_BASE_URI = OUT_BASE_URI.replace("az://", "az://tmp/") + +# DATA_DIR = pathlib.Path.home() / "data" +# TMP_DIR = DATA_DIR / "tmp" +# SRC_DIR = DATA_DIR / "src" +# PRC_DIR = DATA_DIR / "prc" +# RES_DIR = DATA_DIR / "res" +# LIVE_DIR = DATA_DIR / "live" + # TODO: make cli using argsparse # transect configuration settings @@ -45,7 +58,7 @@ COASTLINE_SEGMENT_LENGTH = 1e4 TRANSECT_LENGTH = 2000 -FILENAMER = "part.{number}.parquet" +FILENAMER = "part.{numb2er}.parquet" COASTLINE_ID_COLUMN = "FID" # FID (OSM) or OBJECTID (Sayre) COLUMNS = [COASTLINE_ID_COLUMN, "geometry"] COASTLINE_ID_RENAME = "FID" @@ -56,17 +69,17 @@ prc_epsg = pyproj.CRS.from_user_input(PRC_CRS).to_epsg() dst_epsg = pyproj.CRS.from_user_input(DST_CRS).to_epsg() -# dataset specific settings -COASTLINES_DIR = SRC_DIR / "coastlines_osm_generalized_v2023" / "coast_3857_gen9.shp" +# # dataset specific settings +# COASTLINES_DIR = SRC_DIR / "coastlines_osm_generalized_v2023" / "coast_3857_gen9.shp" -UTM_GRID_FP = LIVE_DIR / "tiles" / "utm.parquet" -ADMIN1_FP = LIVE_DIR / "overture" / "2024-02-15" / "admin_bounds_level_1.parquet" -ADMIN2_FP = LIVE_DIR / "overture" / "2024-02-15" / "admin_bounds_level_2.parquet" +# UTM_GRID_FP = LIVE_DIR / "tiles" / "utm.parquet" +# ADMIN1_FP = LIVE_DIR / "overture" / "2024-02-15" / "admin_bounds_level_1.parquet" +# ADMIN2_FP = LIVE_DIR / "overture" / "2024-02-15" / "admin_bounds_level_2.parquet" # OUT_DIR = PRC_DIR / COASTLINES_DIR.stem.replace( # "coast", f"transects_{TRANSECT_LENGTH}_test" # ) -OUT_DIR = PRC_DIR / f"gcts-{TRANSECT_LENGTH}m.parquet" +# OUT_DIR = PRC_DIR / "gcts" / "release" / "2024-07-25" # To drop transects at meridonal boundary SPACING = 100 @@ -208,10 +221,10 @@ def sort_line_segments(segments, original_line): silence_warnings() logging.basicConfig(level=logging.INFO) - logging.info(f"Transects will be written to {OUT_DIR}") + logging.info(f"Transects will be written to {OUT_BASE_URI}") - if not OUT_DIR.exists(): - OUT_DIR.mkdir(exist_ok=True, parents=True) + # if not OUT_DIR.exists(): + # OUT_DIR.mkdir(exist_ok=True, parents=True) start_time = time.time() @@ -221,16 +234,23 @@ def sort_line_segments(segments, original_line): client.run(silence_shapely_warnings) logging.info(f"Client dashboard link: {client.dashboard_link}") - utm_grid = ( - gpd.read_parquet(UTM_GRID_FP).dissolve("epsg").to_crs(prc_epsg).reset_index() - ) + with fsspec.open(utm_grid_url, **storage_options) as f: + utm_grid = gpd.read_parquet(f) + + withr fsspec.open(countries_url, **storage_options) as f: + countries = gpd.read_parquet(f) + + utm_grid = utm_grid.dissolve("epsg").to_crs(prc_epsg).reset_index() + [utm_grid_scattered] = client.scatter( [utm_grid.loc[:, ["geometry", "epsg", "utm_code"]]], broadcast=True ) coastlines = ( - dask_geopandas.read_file(COASTLINES_DIR, npartitions=10) - # .sample(frac=0.01) + dask_geopandas.read_parquet(osm_url, storage_options=storage_options) + .repartition(npartitions=10) + .persist() + # .sample(frac=0.02) .to_crs(prc_epsg) ) @@ -254,13 +274,18 @@ def wrap_is_closed(df): coastlines.crs ) - # Apply the function to add the 'is_closed_island' column to the GeoDataFrame - rois = retrieve_rois() - utm_extent = ( - rois.loc[["GLOBAL_UTM_EXTENT"]] - .to_crs(prc_epsg) - .drop(columns=["processing_priority"]) - ) + utm_extent = gpd.GeoDataFrame( + geometry=[ + shapely.box( + xmin=-179.99998854, + ymin=-80.00000006, + xmax=179.99998854, + ymax=84.00000009, + ) + ], + crs="EPSG:4326", + ).to_crs(prc_epsg) + [utm_extent_scattered] = client.scatter([utm_extent], broadcast=True) def overlay_by_grid(df, grid): @@ -286,6 +311,8 @@ def overlay_by_grid(df, grid): lazy_value = dask.delayed(overlay_by_grid)(lazy_value, utm_grid_scattered) lazy_values.append(lazy_value) # Note the change here + import dask.dataframe as dd + coastlines = dd.from_delayed(lazy_values, meta=META).set_crs(coastlines.crs) # rename fid because they are no longer unique after overlay coastlines = coastlines.rename(columns={"FID": "FID_osm"}).astype( @@ -445,62 +472,75 @@ def generate_filtered_transects( transects, geo_columns=["bbox", "quadkey"], quadkey_zoom_level=12 ) - # NOTE: in next gcts release move this out of processing and add from countries (divisions) seperately - admin1 = ( - gpd.read_parquet(ADMIN1_FP) - .to_crs(transects.crs) - .drop(columns=["id"]) - .rename(columns={"primary_name": "admin_level_1_name"}) - ) - admin2 = ( - gpd.read_parquet(ADMIN2_FP) - .to_crs(transects.crs) - .drop(columns=["id", "isoCountryCodeAlpha2"]) - .rename(columns={"primary_name": "admin_level_2_name"}) - ) - - # NOTE: zoom level 5 is hard-coded here because I believe spatial join will be faster - quadkey_grouper = "quadkey_z5" - transects[quadkey_grouper] = transects.apply( - lambda r: mercantile.quadkey(mercantile.tile(r.lon, r.lat, 5)), axis=1 - ) - - def add_admin_bounds(df, admin_df, max_distance=20000): - points = gpd.GeoDataFrame( - df[["tr_name"]], geometry=gpd.GeoSeries.from_xy(df.lon, df.lat, crs=4326) - ).to_crs(3857) - joined = gpd.sjoin_nearest( - points, admin_df.to_crs(3857), max_distance=max_distance - ).drop(columns=["index_right", "geometry"]) - - df = pd.merge(df, joined, on="tr_name", how="left") - return df - - transects = transects.groupby(quadkey_grouper, group_keys=False).apply( - lambda gr: add_admin_bounds(gr, admin1), - ) - transects = transects.groupby(quadkey_grouper, group_keys=False).apply( - lambda gr: add_admin_bounds(gr, admin2), - ) - - transects = transects.drop(columns=[quadkey_grouper]) - transects["tr_name"] = zero_pad_tr_name(transects["tr_name"]) - - partitioner = QuadKeyEqualSizePartitioner( - transects, - out_dir=OUT_DIR, - max_size=MAX_PARTITION_SIZE, - min_quadkey_zoom=MIN_ZOOM_QUADKEY, - sort_by="quadkey", - geo_columns=["bbox", "quadkey"], - column_order=list(DTYPES.keys()), - dtypes=DTYPES, - ) - partitioner.process() - - logging.info("Done!") - elapsed_time = time.time() - start_time - logging.info( - f"Time (H:M:S): {time.strftime('%H:%M:%S', time.gmtime(elapsed_time))}" - ) + transects.to_parquet("/Users/calkoen/transects-test.gpkg") + + print("writing") + with fsspec.open(TMP_BASE_URI, "wb", **storage_options) as f: + transects.to_parquet(f, index=False) + + # transects.to_parquet( + # TMP_BASE_URI, + # index=False, + # storage_options=storage_options, + # ) + + # # NOTE: in next gcts release move this out of processing and add from countries (divisions) seperately + # admin1 = ( + # gpd.read_parquet(ADMIN1_FP) + # .to_crs(transects.crs) + # .drop(columns=["id"]) + # .rename(columns={"primary_name": "admin_level_1_name"}) + # ) + # admin2 = ( + # gpd.read_parquet(ADMIN2_FP) + # .to_crs(transects.crs) + # .drop(columns=["id", "isoCountryCodeAlpha2"]) + # .rename(columns={"primary_name": "admin_level_2_name"}) + # ) + + # # NOTE: zoom level 5 is hard-coded here because I believe spatial join will be faster + # quadkey_grouper = "quadkey_z5" + # transects[quadkey_grouper] = transects.apply( + # lambda r: mercantile.quadkey(mercantile.tile(r.lon, r.lat, 5)), axis=1 + # ) + + # def add_admin_bounds(df, admin_df, max_distance=20000): + # points = gpd.GeoDataFrame( + # df[["tr_name"]], geometry=gpd.GeoSeries.from_xy(df.lon, df.lat, crs=4326) + # ).to_crs(3857) + # joined = gpd.sjoin_nearest( + # points, admin_df.to_crs(3857), max_distance=max_distance + # ).drop(columns=["index_right", "geometry"]) + + # df = pd.merge(df, joined, on="tr_name", how="left") + # return df + + # transects = transects.groupby(quadkey_grouper, group_keys=False).apply( + # lambda gr: add_admin_bounds(gr, admin1), + # ) + # transects = transects.groupby(quadkey_grouper, group_keys=False).apply( + # lambda gr: add_admin_bounds(gr, admin2), + # ) + + # transects = transects.drop(columns=[quadkey_grouper]) + + # transects.to_parquet(OUT_DIR, index=False) + + # partitioner = QuadKeyEqualSizePartitioner( + # transects, + # out_dir=OUT_DIR, + # max_size=MAX_PARTITION_SIZE, + # min_quadkey_zoom=MIN_ZOOM_QUADKEY, + # sort_by="quadkey", + # geo_columns=["bbox", "quadkey"], + # column_order=list(DTYPES.keys()), + # dtypes=DTYPES, + # ) + # partitioner.process() + + # logging.info("Done!") + # elapsed_time = time.time() - start_time + # logging.info( + # f"Time (H:M:S): {time.strftime('%H:%M:%S', time.gmtime(elapsed_time))}" + # )