From 09416b06809a8527d89b5283ecebba196a2fab0d Mon Sep 17 00:00:00 2001 From: Baudouin Raoult Date: Wed, 19 Jun 2024 17:47:13 +0000 Subject: [PATCH] work on zarr copies --- src/anemoi/datasets/commands/copy.py | 49 +++++++++++++++------------- 1 file changed, 26 insertions(+), 23 deletions(-) diff --git a/src/anemoi/datasets/commands/copy.py b/src/anemoi/datasets/commands/copy.py index a3fbba76..d9516d53 100644 --- a/src/anemoi/datasets/commands/copy.py +++ b/src/anemoi/datasets/commands/copy.py @@ -27,13 +27,13 @@ class S3Downloader: - def __init__(self, source, target, transfers, overwrite, resume, progress, **kwargs): + def __init__(self, source, target, transfers, overwrite, resume, verbosity, **kwargs): self.source = source self.target = target self.transfers = transfers self.overwrite = overwrite self.resume = resume - self.progress = progress + self.verbosity = verbosity def run(self): if self.overwrite and os.path.exists(self.target): @@ -43,41 +43,41 @@ def run(self): self.source + "/" if not self.source.endswith("/") else self.source, self.target, overwrite=self.overwrite, - ignore_existing=self.resume, - show_progress=1, # if self.progress else 0, + resume=self.resume, + verbosity=self.verbosity, threads=self.transfers, ) class S3Uploader: - def __init__(self, source, target, transfers, overwrite, resume, progress, **kwargs): + def __init__(self, source, target, transfers, overwrite, resume, verbosity, **kwargs): self.source = source self.target = target self.transfers = transfers self.overwrite = overwrite self.resume = resume - self.progress = progress + self.verbosity = verbosity def run(self): upload( self.source, self.target, overwrite=self.overwrite, - ignore_existing=self.resume, - show_progress=1, # if self.progress else 0, + resume=self.resume, + verbosity=self.verbosity, threads=self.transfers, ) class DefaultCopier: - def __init__(self, source, target, transfers, block_size, overwrite, resume, progress, nested, rechunk, **kwargs): + def __init__(self, source, target, transfers, block_size, overwrite, resume, verbosity, nested, rechunk, **kwargs): self.source = source self.target = target self.transfers = transfers self.block_size = block_size self.overwrite = overwrite self.resume = resume - self.progress = progress + self.verbosity = verbosity self.nested = nested self.rechunk = rechunk @@ -90,7 +90,7 @@ def _store(self, path, nested=False): return zarr.storage.NestedDirectoryStore(path) return path - def copy_chunk(self, n, m, source, target, _copy, progress): + def copy_chunk(self, n, m, source, target, _copy, verbosity): if _copy[n:m].all(): LOG.info(f"Skipping {n} to {m}") return None @@ -110,7 +110,7 @@ def copy_chunk(self, n, m, source, target, _copy, progress): range(n, m), desc=f"Copying {n} to {m}", leave=False, - disable=not isatty and not progress, + disable=not isatty and not verbosity, ): target[i] = source[i] @@ -135,7 +135,7 @@ def parse_rechunking(self, rechunking, source_data): # raise NotImplementedError("Rechunking with multiple transfers is not implemented") return chunks - def copy_data(self, source, target, _copy, progress): + def copy_data(self, source, target, _copy, verbosity): LOG.info("Copying data") source_data = source["data"] @@ -164,7 +164,7 @@ def copy_data(self, source, target, _copy, progress): source_data, target_data, _copy, - progress, + verbosity, ) ) n += self.block_size @@ -179,7 +179,7 @@ def copy_data(self, source, target, _copy, progress): LOG.info("Copied data") - def copy_array(self, name, source, target, _copy, progress): + def copy_array(self, name, source, target, _copy, verbosity): for k, v in source.attrs.items(): target.attrs[k] = v @@ -187,14 +187,14 @@ def copy_array(self, name, source, target, _copy, progress): return if name == "data": - self.copy_data(source, target, _copy, progress) + self.copy_data(source, target, _copy, verbosity) return LOG.info(f"Copying {name}") target[name] = source[name] LOG.info(f"Copied {name}") - def copy_group(self, source, target, _copy, progress): + def copy_group(self, source, target, _copy, verbosity): import zarr for k, v in source.attrs.items(): @@ -207,7 +207,7 @@ def copy_group(self, source, target, _copy, progress): source[name], group, _copy, - progress, + verbosity, ) else: self.copy_array( @@ -215,10 +215,10 @@ def copy_group(self, source, target, _copy, progress): source, target, _copy, - progress, + verbosity, ) - def copy(self, source, target, progress): + def copy(self, source, target, verbosity): import zarr if "_copy" not in target: @@ -229,7 +229,7 @@ def copy(self, source, target, progress): _copy = target["_copy"] _copy_np = _copy[:] - self.copy_group(source, target, _copy_np, progress) + self.copy_group(source, target, _copy_np, verbosity) del target["_copy"] def run(self): @@ -288,7 +288,7 @@ def open_target(): assert target is not None, target source = zarr.open(self._store(self.source), mode="r") - self.copy(source, target, self.progress) + self.copy(source, target, self.verbosity) class CopyMixin: @@ -307,7 +307,10 @@ def add_arguments(self, command_parser): ) command_parser.add_argument("--transfers", type=int, default=8, help="Number of parallel transfers.") command_parser.add_argument( - "--progress", action="store_true", help="Force show progress bar, even if not in an interactive shell." + "--versosity", + type=int, + help="Verbosity level. 0 is silent, 1 is normal, 2 is verbose.", + default=1, ) command_parser.add_argument("--nested", action="store_true", help="Use ZARR's nested directpry backend.") command_parser.add_argument(