Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
floriscalkoen committed Aug 1, 2024
1 parent 86eda14 commit a18f7bf
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 10 deletions.
15 changes: 15 additions & 0 deletions scripts/python/make_gcts.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from coastpy.geo.quadtiles_utils import add_geo_columns
from coastpy.geo.transect import generate_transects_from_coastline
from coastpy.io.partitioner import QuadKeyEqualSizePartitioner
from coastpy.io.utils import rm_from_storage
from coastpy.utils.config import configure_instance
from coastpy.utils.dask import (
DaskClientManager,
Expand Down Expand Up @@ -453,6 +454,13 @@ def generate_filtered_transects(

transects["transect_id"] = zero_pad_transect_id(transects["transect_id"])

logging.info(f"Removing files/bytes from {TMP_BASE_URI} if present.")
rm_from_storage(
pattern=(TMP_BASE_URI + "/*.parquet"),
storage_options=storage_options,
confirm=False,
verbose=False,
)
partitioner = QuadKeyEqualSizePartitioner(
transects,
out_dir=TMP_BASE_URI,
Expand Down Expand Up @@ -504,6 +512,13 @@ def process(transects_group, countries_uri, regions_uri, max_distance=20000):
logging.info(
f"Partitioning into equal partitions by quadkey at zoom level {MIN_ZOOM_QUADKEY}"
)
logging.info(f"Removing files/bytes from {OUT_BASE_URI} if present.")
rm_from_storage(
pattern=(OUT_BASE_URI + "/*.parquet"),
storage_options=storage_options,
confirm=False,
verbose=False,
)
partitioner = QuadKeyEqualSizePartitioner(
transects,
out_dir=OUT_BASE_URI,
Expand Down
38 changes: 28 additions & 10 deletions src/coastpy/io/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,20 +396,33 @@ def rm_from_storage(
if storage_options is None:
storage_options = {}

# Create a local logger
logger = logging.getLogger(__name__)
if verbose:
handler = logging.StreamHandler()
handler.setLevel(logging.INFO)
formatter = logging.Formatter("%(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)

if storage_options is None:
storage_options = {}

# Get filesystem, token, and resolved paths
fs, _, paths = fsspec.get_fs_token_paths(pattern, storage_options=storage_options)

if paths:
if verbose:
logging.info(
f"\nWarning: You are about to delete the following blobs/files matching '{pattern}':"
logger.info(
f"Warning: You are about to delete the following {len(paths)} blobs/files matching '{pattern}'."
)
for path in paths:
logging.info(path)
logger.info(path)

if confirm:
confirmation = input(
f"\nType 'yes' to confirm deletion of blobs/files matching '{pattern}': "
f"Type 'yes' to confirm deletion of {len(paths)} blobs/files matching '{pattern}': "
)
else:
confirmation = "yes"
Expand All @@ -418,18 +431,23 @@ def rm_from_storage(
for path in paths:
try:
if verbose:
logging.info(f"Deleting blob/file: {path}")
logger.info(f"Deleting blob/file: {path}")
fs.rm(path)
if verbose:
logging.info(f"Blob/file {path} deleted successfully.")
logger.info(f"Blob/file {path} deleted successfully.")
except Exception as e:
if verbose:
logging.error(f"Failed to delete blob/file: {e}")
logger.error(f"Failed to delete blob/file: {e}")
if verbose:
logging.info("All specified blobs/files have been deleted.")
logger.info("All specified blobs/files have been deleted.")
else:
if verbose:
logging.info("Blob/file deletion cancelled.")
logger.info("Blob/file deletion cancelled.")
else:
if verbose:
logging.info(f"No blobs/files found matching '{pattern}'.")
logger.info(f"No blobs/files found matching '{pattern}'.")

# Remove the handler after use
if verbose:
logger.removeHandler(handler)
handler.close()

0 comments on commit a18f7bf

Please sign in to comment.