Skip to content

Commit

Permalink
work on zarr copies
Browse files Browse the repository at this point in the history
  • Loading branch information
b8raoult committed Jun 19, 2024
1 parent 5f728ff commit 09416b0
Showing 1 changed file with 26 additions and 23 deletions.
49 changes: 26 additions & 23 deletions src/anemoi/datasets/commands/copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@


class S3Downloader:
def __init__(self, source, target, transfers, overwrite, resume, progress, **kwargs):
def __init__(self, source, target, transfers, overwrite, resume, verbosity, **kwargs):
self.source = source
self.target = target
self.transfers = transfers
self.overwrite = overwrite
self.resume = resume
self.progress = progress
self.verbosity = verbosity

def run(self):
if self.overwrite and os.path.exists(self.target):
Expand All @@ -43,41 +43,41 @@ def run(self):
self.source + "/" if not self.source.endswith("/") else self.source,
self.target,
overwrite=self.overwrite,
ignore_existing=self.resume,
show_progress=1, # if self.progress else 0,
resume=self.resume,
verbosity=self.verbosity,
threads=self.transfers,
)


class S3Uploader:
def __init__(self, source, target, transfers, overwrite, resume, progress, **kwargs):
def __init__(self, source, target, transfers, overwrite, resume, verbosity, **kwargs):
self.source = source
self.target = target
self.transfers = transfers
self.overwrite = overwrite
self.resume = resume
self.progress = progress
self.verbosity = verbosity

def run(self):
upload(
self.source,
self.target,
overwrite=self.overwrite,
ignore_existing=self.resume,
show_progress=1, # if self.progress else 0,
resume=self.resume,
verbosity=self.verbosity,
threads=self.transfers,
)


class DefaultCopier:
def __init__(self, source, target, transfers, block_size, overwrite, resume, progress, nested, rechunk, **kwargs):
def __init__(self, source, target, transfers, block_size, overwrite, resume, verbosity, nested, rechunk, **kwargs):
self.source = source
self.target = target
self.transfers = transfers
self.block_size = block_size
self.overwrite = overwrite
self.resume = resume
self.progress = progress
self.verbosity = verbosity
self.nested = nested
self.rechunk = rechunk

Expand All @@ -90,7 +90,7 @@ def _store(self, path, nested=False):
return zarr.storage.NestedDirectoryStore(path)
return path

def copy_chunk(self, n, m, source, target, _copy, progress):
def copy_chunk(self, n, m, source, target, _copy, verbosity):
if _copy[n:m].all():
LOG.info(f"Skipping {n} to {m}")
return None
Expand All @@ -110,7 +110,7 @@ def copy_chunk(self, n, m, source, target, _copy, progress):
range(n, m),
desc=f"Copying {n} to {m}",
leave=False,
disable=not isatty and not progress,
disable=not isatty and not verbosity,
):
target[i] = source[i]

Expand All @@ -135,7 +135,7 @@ def parse_rechunking(self, rechunking, source_data):
# raise NotImplementedError("Rechunking with multiple transfers is not implemented")
return chunks

def copy_data(self, source, target, _copy, progress):
def copy_data(self, source, target, _copy, verbosity):
LOG.info("Copying data")
source_data = source["data"]

Expand Down Expand Up @@ -164,7 +164,7 @@ def copy_data(self, source, target, _copy, progress):
source_data,
target_data,
_copy,
progress,
verbosity,
)
)
n += self.block_size
Expand All @@ -179,22 +179,22 @@ def copy_data(self, source, target, _copy, progress):

LOG.info("Copied data")

def copy_array(self, name, source, target, _copy, progress):
def copy_array(self, name, source, target, _copy, verbosity):
for k, v in source.attrs.items():
target.attrs[k] = v

if name == "_copy":
return

if name == "data":
self.copy_data(source, target, _copy, progress)
self.copy_data(source, target, _copy, verbosity)
return

LOG.info(f"Copying {name}")
target[name] = source[name]
LOG.info(f"Copied {name}")

def copy_group(self, source, target, _copy, progress):
def copy_group(self, source, target, _copy, verbosity):
import zarr

for k, v in source.attrs.items():
Expand All @@ -207,18 +207,18 @@ def copy_group(self, source, target, _copy, progress):
source[name],
group,
_copy,
progress,
verbosity,
)
else:
self.copy_array(
name,
source,
target,
_copy,
progress,
verbosity,
)

def copy(self, source, target, progress):
def copy(self, source, target, verbosity):
import zarr

if "_copy" not in target:
Expand All @@ -229,7 +229,7 @@ def copy(self, source, target, progress):
_copy = target["_copy"]
_copy_np = _copy[:]

self.copy_group(source, target, _copy_np, progress)
self.copy_group(source, target, _copy_np, verbosity)
del target["_copy"]

def run(self):
Expand Down Expand Up @@ -288,7 +288,7 @@ def open_target():
assert target is not None, target

source = zarr.open(self._store(self.source), mode="r")
self.copy(source, target, self.progress)
self.copy(source, target, self.verbosity)


class CopyMixin:
Expand All @@ -307,7 +307,10 @@ def add_arguments(self, command_parser):
)
command_parser.add_argument("--transfers", type=int, default=8, help="Number of parallel transfers.")
command_parser.add_argument(
"--progress", action="store_true", help="Force show progress bar, even if not in an interactive shell."
"--versosity",
type=int,
help="Verbosity level. 0 is silent, 1 is normal, 2 is verbose.",
default=1,
)
command_parser.add_argument("--nested", action="store_true", help="Use ZARR's nested directpry backend.")
command_parser.add_argument(
Expand Down

0 comments on commit 09416b0

Please sign in to comment.