Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
floriscalkoen committed Aug 1, 2024
1 parent 40ff5d7 commit 97ee273
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 37 deletions.
57 changes: 21 additions & 36 deletions scripts/python/make_gcts.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,17 +453,28 @@ def generate_filtered_transects(

transects["transect_id"] = zero_pad_transect_id(transects["transect_id"])

with fsspec.open(TMP_BASE_URI, "wb", **storage_options) as f:
transects.to_parquet(f, index=False)
partitioner = QuadKeyEqualSizePartitioner(
transects,
out_dir=TMP_BASE_URI,
max_size="1GB",
min_quadkey_zoom=4,
sort_by="quadkey",
geo_columns=["bbox", "quadkey"],
storage_options=storage_options,
)
partitioner.process()

# with fsspec.open(TMP_BASE_URI, "wb", **storage_options) as f:
# transects.to_parquet(f, index=False)

logging.info(f"Transects written to {TMP_BASE_URI}")

transects = dask_geopandas.read_parquet(
TMP_BASE_URI, storage_options=storage_options
)
zoom = 5
quadkey_grouper = f"quadkey_{zoom}"
transects[quadkey_grouper] = transects.quadkey.str[:zoom]
# zoom = 5
# quadkey_grouper = f"quadkey_{zoom}"
# transects[quadkey_grouper] = transects.quadkey.str[:zoom]

def process(transects_group):
with fsspec.open(countries_uri, **storage_options) as f:
Expand All @@ -473,48 +484,22 @@ def process(transects_group):
with fsspec.open(regions_uri, **storage_options) as f:
regions = gpd.read_parquet(f, columns=["common_region_name", "geometry"])

result = add_attributes_from_gdfs(
r = add_attributes_from_gdfs(
transects_group, [countries, regions], max_distance=20000
)
return result
return r

logging.info("Part 2: adding attributes to transects...")
logging.info(f"Grouping the transects by quadkey zoom level {zoom}")

# def wrapper(transects, countries_uri, regions_uri, max_distance):
# with fsspec.open(countries_uri, **storage_options) as f:
# countries = gpd.read_parquet(
# f, columns=["country", "common_country_name", "continent", "geometry"]
# )
# with fsspec.open(regions_uri, **storage_options) as f:
# regions = gpd.read_parquet(f, columns=["common_region_name", "geometry"])

# r = add_attributes_from_gdfs(
# transects, [countries, regions], max_distance=max_distance
# )

# return r

# with fsspec.open(countries_uri, **storage_options) as f:
# countries = gpd.read_parquet(
# f, columns=["country", "common_country_name", "continent", "geometry"]
# )
# with fsspec.open(regions_uri, **storage_options) as f:
# regions = gpd.read_parquet(f, columns=["common_region_name", "geometry"])

# logging.info("Scattering countries on client...")
# scattered_countries = client.scatter(countries, broadcast=True)
# logging.info("Scattering regions on client...")
# scattered_regions = client.scatter(regions, broadcast=True)
# logging.info(f"Grouping the transects by quadkey zoom level {zoom}")

tasks = []
for _, group in transects.groupby(quadkey_grouper):
for group in transects.to_delayed():
t = dask.delayed(process)(group, countries_uri, regions_uri, max_distance=20000)
tasks.append(t)

logging.info("Computing the submitted tasks..")
transects = pd.concat(dask.compute(*tasks))
transects = transects.drop(columns=[quadkey_grouper])
# transects = transects.drop(columns=[quadkey_grouper])

logging.info(
f"Partitioning into equal partitions by quadkey at zoom level {MIN_ZOOM_QUADKEY}"
Expand Down
2 changes: 2 additions & 0 deletions src/coastpy/geo/quadtiles_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ def quadkey_to_geojson(quadkey: str) -> dict:
}


# NOTE: consider if it would be better to optionally run this function when the attributes
# are already present int he columns.
def add_geo_columns(
df: gpd.GeoDataFrame,
geo_columns: list[Literal["bbox", "bounding_quadkey", "quadkey"]],
Expand Down
5 changes: 4 additions & 1 deletion src/coastpy/io/partitioner.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
from coastpy.utils.size import size_to_bytes


# NOTE: What about separating the partitioner and the equal size partitioner? - that
# way we can have a more generic partitioner tha does not have to compute the sizes of
# each row.
class EqualSizePartitioner:
def __init__(
self,
Expand Down Expand Up @@ -128,7 +131,7 @@ def __init__(
min_quadkey_zoom,
sort_by,
quadkey_zoom_level=12,
geo_columns=None,
geo_columns: list[Literal["bbox", "bounding_quadkey", "quadkey"]] | None = None,
column_order=None,
dtypes=None,
storage_options=None,
Expand Down
61 changes: 61 additions & 0 deletions src/coastpy/io/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import logging
import pathlib
import uuid
import warnings
Expand Down Expand Up @@ -372,3 +373,63 @@ def read_log_entries(
df = df.sort_values(by="time", ascending=True)

return df


def rm_from_storage(
pattern: str,
storage_options: dict[str, str] | None = None,
confirm: bool = True,
verbose: bool = True,
) -> None:
"""
Deletes all blobs/files in the specified storage location that match the given prefix.
Args:
pattern (str): The pattern or path pattern (including wildcards) for the blobs/files to delete.
storage_options (Dict[str, str], optional): A dictionary containing storage connection details.
confirm (bool): Whether to prompt for confirmation before deletion.
verbose (bool): Whether to display detailed log messages.
Returns:
None
"""
if storage_options is None:
storage_options = {}

# Get filesystem, token, and resolved paths
fs, _, paths = fsspec.get_fs_token_paths(pattern, storage_options=storage_options)

if paths:
if verbose:
logging.info(
f"\nWarning: You are about to delete the following blobs/files matching '{pattern}':"
)
for path in paths:
logging.info(path)

if confirm:
confirmation = input(
f"\nType 'yes' to confirm deletion of blobs/files matching '{pattern}': "
)
else:
confirmation = "yes"

if confirmation.lower() == "yes":
for path in paths:
try:
if verbose:
logging.info(f"Deleting blob/file: {path}")
fs.rm(path)
if verbose:
logging.info(f"Blob/file {path} deleted successfully.")
except Exception as e:
if verbose:
logging.error(f"Failed to delete blob/file: {e}")
if verbose:
logging.info("All specified blobs/files have been deleted.")
else:
if verbose:
logging.info("Blob/file deletion cancelled.")
else:
if verbose:
logging.info(f"No blobs/files found matching '{pattern}'.")

0 comments on commit 97ee273

Please sign in to comment.