Skip to content

Commit

Permalink
make gcts with data from cloud
Browse files Browse the repository at this point in the history
  • Loading branch information
floriscalkoen committed Jul 26, 2024
1 parent 451f9ac commit 937817e
Showing 1 changed file with 140 additions and 100 deletions.
240 changes: 140 additions & 100 deletions scripts/python/make_gcts.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,54 @@
import dask

# NOTE: explicitly set query-planning to False to avoid issues with dask-geopandas
dask.config.set({"dataframe.query-planning": False})

import logging
import pathlib
import os
import time
import warnings
from functools import partial

import dask

# NOTE: explicitly set query-planning to False to avoid issues with dask-geopandas
dask.config.set({"dataframe.query-planning": False})
import dask.dataframe as dd
import dask_geopandas
import fsspec
import geopandas as gpd
import mercantile
import pandas as pd
import pyproj
import shapely
from distributed import Client
from dotenv import load_dotenv
from geopandas.array import GeometryDtype
from shapely import LineString
from shapely.geometry import Point
from shapely.geometry import LineString, Point

from coastpy.geo.ops import crosses_antimeridian
from coastpy.geo.quadtiles_utils import add_geo_columns
from coastpy.geo.transect import (
generate_transects_from_coastline,
)
from coastpy.io.partitioner import QuadKeyEqualSizePartitioner
from coastpy.io.retrieve import retrieve_rois
from coastpy.geo.transect import generate_transects_from_coastline
from coastpy.utils.dask_utils import (
silence_shapely_warnings,
)

DATA_DIR = pathlib.Path.home() / "data"
TMP_DIR = DATA_DIR / "tmp"
SRC_DIR = DATA_DIR / "src"
PRC_DIR = DATA_DIR / "prc"
RES_DIR = DATA_DIR / "res"
LIVE_DIR = DATA_DIR / "live"
load_dotenv(override=True)

sas_token = os.getenv("AZURE_STORAGE_SAS_TOKEN")
storage_options = {"account_name": "coclico", "credential": sas_token}

utm_grid_url = "az://grid/utm.parquet"
osm_url = "az://coastlines-osm/release/2023-02-09/coast_3857_gen9.parquet"
countries_url = "az://public/countries.parquet" # From overture maps 2024-04-16

import datetime

today = datetime.datetime.now().strftime("%Y-%m-%d")
OUT_BASE_URI = f"az://gcts/release/{today}.parquet"
TMP_BASE_URI = OUT_BASE_URI.replace("az://", "az://tmp/")

# DATA_DIR = pathlib.Path.home() / "data"
# TMP_DIR = DATA_DIR / "tmp"
# SRC_DIR = DATA_DIR / "src"
# PRC_DIR = DATA_DIR / "prc"
# RES_DIR = DATA_DIR / "res"
# LIVE_DIR = DATA_DIR / "live"


# TODO: make cli using argsparse
# transect configuration settings
Expand All @@ -45,7 +58,7 @@
COASTLINE_SEGMENT_LENGTH = 1e4
TRANSECT_LENGTH = 2000

FILENAMER = "part.{number}.parquet"
FILENAMER = "part.{numb2er}.parquet"
COASTLINE_ID_COLUMN = "FID" # FID (OSM) or OBJECTID (Sayre)
COLUMNS = [COASTLINE_ID_COLUMN, "geometry"]
COASTLINE_ID_RENAME = "FID"
Expand All @@ -56,17 +69,17 @@
prc_epsg = pyproj.CRS.from_user_input(PRC_CRS).to_epsg()
dst_epsg = pyproj.CRS.from_user_input(DST_CRS).to_epsg()

# dataset specific settings
COASTLINES_DIR = SRC_DIR / "coastlines_osm_generalized_v2023" / "coast_3857_gen9.shp"
# # dataset specific settings
# COASTLINES_DIR = SRC_DIR / "coastlines_osm_generalized_v2023" / "coast_3857_gen9.shp"

UTM_GRID_FP = LIVE_DIR / "tiles" / "utm.parquet"
ADMIN1_FP = LIVE_DIR / "overture" / "2024-02-15" / "admin_bounds_level_1.parquet"
ADMIN2_FP = LIVE_DIR / "overture" / "2024-02-15" / "admin_bounds_level_2.parquet"
# UTM_GRID_FP = LIVE_DIR / "tiles" / "utm.parquet"
# ADMIN1_FP = LIVE_DIR / "overture" / "2024-02-15" / "admin_bounds_level_1.parquet"
# ADMIN2_FP = LIVE_DIR / "overture" / "2024-02-15" / "admin_bounds_level_2.parquet"

# OUT_DIR = PRC_DIR / COASTLINES_DIR.stem.replace(
# "coast", f"transects_{TRANSECT_LENGTH}_test"
# )
OUT_DIR = PRC_DIR / f"gcts-{TRANSECT_LENGTH}m.parquet"
# OUT_DIR = PRC_DIR / "gcts" / "release" / "2024-07-25"

# To drop transects at meridonal boundary
SPACING = 100
Expand Down Expand Up @@ -208,10 +221,10 @@ def sort_line_segments(segments, original_line):
silence_warnings()

logging.basicConfig(level=logging.INFO)
logging.info(f"Transects will be written to {OUT_DIR}")
logging.info(f"Transects will be written to {OUT_BASE_URI}")

if not OUT_DIR.exists():
OUT_DIR.mkdir(exist_ok=True, parents=True)
# if not OUT_DIR.exists():
# OUT_DIR.mkdir(exist_ok=True, parents=True)

start_time = time.time()

Expand All @@ -221,16 +234,23 @@ def sort_line_segments(segments, original_line):
client.run(silence_shapely_warnings)
logging.info(f"Client dashboard link: {client.dashboard_link}")

utm_grid = (
gpd.read_parquet(UTM_GRID_FP).dissolve("epsg").to_crs(prc_epsg).reset_index()
)
with fsspec.open(utm_grid_url, **storage_options) as f:
utm_grid = gpd.read_parquet(f)

withr fsspec.open(countries_url, **storage_options) as f:
countries = gpd.read_parquet(f)

utm_grid = utm_grid.dissolve("epsg").to_crs(prc_epsg).reset_index()

[utm_grid_scattered] = client.scatter(
[utm_grid.loc[:, ["geometry", "epsg", "utm_code"]]], broadcast=True
)

coastlines = (
dask_geopandas.read_file(COASTLINES_DIR, npartitions=10)
# .sample(frac=0.01)
dask_geopandas.read_parquet(osm_url, storage_options=storage_options)
.repartition(npartitions=10)
.persist()
# .sample(frac=0.02)
.to_crs(prc_epsg)
)

Expand All @@ -254,13 +274,18 @@ def wrap_is_closed(df):
coastlines.crs
)

# Apply the function to add the 'is_closed_island' column to the GeoDataFrame
rois = retrieve_rois()
utm_extent = (
rois.loc[["GLOBAL_UTM_EXTENT"]]
.to_crs(prc_epsg)
.drop(columns=["processing_priority"])
)
utm_extent = gpd.GeoDataFrame(
geometry=[
shapely.box(
xmin=-179.99998854,
ymin=-80.00000006,
xmax=179.99998854,
ymax=84.00000009,
)
],
crs="EPSG:4326",
).to_crs(prc_epsg)

[utm_extent_scattered] = client.scatter([utm_extent], broadcast=True)

def overlay_by_grid(df, grid):
Expand All @@ -286,6 +311,8 @@ def overlay_by_grid(df, grid):
lazy_value = dask.delayed(overlay_by_grid)(lazy_value, utm_grid_scattered)
lazy_values.append(lazy_value) # Note the change here

import dask.dataframe as dd

coastlines = dd.from_delayed(lazy_values, meta=META).set_crs(coastlines.crs)
# rename fid because they are no longer unique after overlay
coastlines = coastlines.rename(columns={"FID": "FID_osm"}).astype(
Expand Down Expand Up @@ -445,62 +472,75 @@ def generate_filtered_transects(
transects, geo_columns=["bbox", "quadkey"], quadkey_zoom_level=12
)

# NOTE: in next gcts release move this out of processing and add from countries (divisions) seperately
admin1 = (
gpd.read_parquet(ADMIN1_FP)
.to_crs(transects.crs)
.drop(columns=["id"])
.rename(columns={"primary_name": "admin_level_1_name"})
)
admin2 = (
gpd.read_parquet(ADMIN2_FP)
.to_crs(transects.crs)
.drop(columns=["id", "isoCountryCodeAlpha2"])
.rename(columns={"primary_name": "admin_level_2_name"})
)

# NOTE: zoom level 5 is hard-coded here because I believe spatial join will be faster
quadkey_grouper = "quadkey_z5"
transects[quadkey_grouper] = transects.apply(
lambda r: mercantile.quadkey(mercantile.tile(r.lon, r.lat, 5)), axis=1
)

def add_admin_bounds(df, admin_df, max_distance=20000):
points = gpd.GeoDataFrame(
df[["tr_name"]], geometry=gpd.GeoSeries.from_xy(df.lon, df.lat, crs=4326)
).to_crs(3857)
joined = gpd.sjoin_nearest(
points, admin_df.to_crs(3857), max_distance=max_distance
).drop(columns=["index_right", "geometry"])

df = pd.merge(df, joined, on="tr_name", how="left")
return df

transects = transects.groupby(quadkey_grouper, group_keys=False).apply(
lambda gr: add_admin_bounds(gr, admin1),
)
transects = transects.groupby(quadkey_grouper, group_keys=False).apply(
lambda gr: add_admin_bounds(gr, admin2),
)

transects = transects.drop(columns=[quadkey_grouper])

transects["tr_name"] = zero_pad_tr_name(transects["tr_name"])

partitioner = QuadKeyEqualSizePartitioner(
transects,
out_dir=OUT_DIR,
max_size=MAX_PARTITION_SIZE,
min_quadkey_zoom=MIN_ZOOM_QUADKEY,
sort_by="quadkey",
geo_columns=["bbox", "quadkey"],
column_order=list(DTYPES.keys()),
dtypes=DTYPES,
)
partitioner.process()

logging.info("Done!")
elapsed_time = time.time() - start_time
logging.info(
f"Time (H:M:S): {time.strftime('%H:%M:%S', time.gmtime(elapsed_time))}"
)
transects.to_parquet("/Users/calkoen/transects-test.gpkg")

print("writing")
with fsspec.open(TMP_BASE_URI, "wb", **storage_options) as f:
transects.to_parquet(f, index=False)

# transects.to_parquet(
# TMP_BASE_URI,
# index=False,
# storage_options=storage_options,
# )

# # NOTE: in next gcts release move this out of processing and add from countries (divisions) seperately
# admin1 = (
# gpd.read_parquet(ADMIN1_FP)
# .to_crs(transects.crs)
# .drop(columns=["id"])
# .rename(columns={"primary_name": "admin_level_1_name"})
# )
# admin2 = (
# gpd.read_parquet(ADMIN2_FP)
# .to_crs(transects.crs)
# .drop(columns=["id", "isoCountryCodeAlpha2"])
# .rename(columns={"primary_name": "admin_level_2_name"})
# )

# # NOTE: zoom level 5 is hard-coded here because I believe spatial join will be faster
# quadkey_grouper = "quadkey_z5"
# transects[quadkey_grouper] = transects.apply(
# lambda r: mercantile.quadkey(mercantile.tile(r.lon, r.lat, 5)), axis=1
# )

# def add_admin_bounds(df, admin_df, max_distance=20000):
# points = gpd.GeoDataFrame(
# df[["tr_name"]], geometry=gpd.GeoSeries.from_xy(df.lon, df.lat, crs=4326)
# ).to_crs(3857)
# joined = gpd.sjoin_nearest(
# points, admin_df.to_crs(3857), max_distance=max_distance
# ).drop(columns=["index_right", "geometry"])

# df = pd.merge(df, joined, on="tr_name", how="left")
# return df

# transects = transects.groupby(quadkey_grouper, group_keys=False).apply(
# lambda gr: add_admin_bounds(gr, admin1),
# )
# transects = transects.groupby(quadkey_grouper, group_keys=False).apply(
# lambda gr: add_admin_bounds(gr, admin2),
# )

# transects = transects.drop(columns=[quadkey_grouper])

# transects.to_parquet(OUT_DIR, index=False)

# partitioner = QuadKeyEqualSizePartitioner(
# transects,
# out_dir=OUT_DIR,
# max_size=MAX_PARTITION_SIZE,
# min_quadkey_zoom=MIN_ZOOM_QUADKEY,
# sort_by="quadkey",
# geo_columns=["bbox", "quadkey"],
# column_order=list(DTYPES.keys()),
# dtypes=DTYPES,
# )
# partitioner.process()

# logging.info("Done!")
# elapsed_time = time.time() - start_time
# logging.info(
# f"Time (H:M:S): {time.strftime('%H:%M:%S', time.gmtime(elapsed_time))}"
# )

0 comments on commit 937817e

Please sign in to comment.