Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OPT+RF of zarr downloads: do not wait for full files listing + compute %done from total zarr size #1443

Merged
merged 4 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 53 additions & 30 deletions dandi/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -481,8 +481,8 @@
return v


def _skip_file(msg: Any) -> dict:
return {"status": "skipped", "message": str(msg)}
def _skip_file(msg: Any, **kwargs: Any) -> dict:
return {"status": "skipped", "message": str(msg), **kwargs}


def _populate_dandiset_yaml(
Expand Down Expand Up @@ -514,7 +514,7 @@
existing is DownloadExisting.REFRESH
and os.lstat(dandiset_yaml).st_mtime >= mtime.timestamp()
):
yield _skip_file("already exists")
yield _skip_file("already exists", size=os.lstat(dandiset_yaml).st_mtime)

Check warning on line 517 in dandi/download.py

View check run for this annotation

Codecov / codecov/patch

dandi/download.py#L517

Added line #L517 was not covered by tests
return
ds = Dandiset(dandiset_path, allow_empty=True)
ds.path_obj.mkdir(exist_ok=True) # exist_ok in case of parallel race
Expand Down Expand Up @@ -637,7 +637,7 @@
# but mtime is different
if same == ["mtime", "size"]:
# TODO: add recording and handling of .nwb object_id
yield _skip_file("same time and size")
yield _skip_file("same time and size", size=size)
return
lgr.debug(f"{path!r} - same attributes: {same}. Redownloading")

Expand Down Expand Up @@ -878,33 +878,40 @@
# Avoid heavy import by importing within function:
from .support.digests import get_zarr_checksum

download_gens = {}
entries = list(asset.iterfiles())
# we will collect them all while starting the download
# with the first page of entries received from the server.
entries = []
digests = {}
pc = ProgressCombiner(zarr_size=asset.size)

def digest_callback(path: str, algoname: str, d: str) -> None:
if algoname == "md5":
digests[path] = d

for entry in entries:
etag = entry.digest
assert etag.algorithm is DigestType.md5
download_gens[str(entry)] = _download_file(
entry.get_download_file_iter(),
download_path / str(entry),
toplevel_path=toplevel_path,
size=entry.size,
mtime=entry.modified,
existing=existing,
digests={"md5": etag.value},
lock=lock,
digest_callback=partial(digest_callback, str(entry)),
)
def downloads_gen():
for entry in asset.iterfiles():
entries.append(entry)
etag = entry.digest
assert etag.algorithm is DigestType.md5
yield pairing(
str(entry),
_download_file(
entry.get_download_file_iter(),
download_path / str(entry),
toplevel_path=toplevel_path,
size=entry.size,
mtime=entry.modified,
existing=existing,
digests={"md5": etag.value},
lock=lock,
digest_callback=partial(digest_callback, str(entry)),
),
)
pc.file_qty = len(entries)

pc = ProgressCombiner(zarr_size=asset.size, file_qty=len(download_gens))
final_out: dict | None = None
with interleave(
[pairing(p, gen) for p, gen in download_gens.items()],
downloads_gen(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interleave() evaluates/consumes the iterable in its entirety before returning, so you haven't accomplished anything by making downloads_gen() into a generator.

In order to add downloads to interleave() as they're yielded by asset.iterfiles(), you'll need to:

  • Replace the call to interleave() with a directly-constructed Interleaver instance
  • In separate thread, call the Interleaver's submit() method whenever a new file is yielded by asset.iterfiles(), and then call its finalize() method at end of iteration.

See interleave's docs for more information.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would you be so kind to approach this or otherwise address this issue the way you like it while also ideally reducing that delay / not demanding full listing to arrive to start the download?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or may be we could merge this (as resolves the issue) and then improve on top of it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Getting the error handling right for the solution I suggested would be very difficult, so we should probably just merge the current version for now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, let's proceed! Do you want me to create a dedicated issue or you keep this on your plate?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Create a dedicated issue.

onerror=FINISH_CURRENT,
max_workers=jobs or 4,
) as it:
Expand Down Expand Up @@ -988,7 +995,7 @@
@dataclass
class ProgressCombiner:
zarr_size: int
file_qty: int
file_qty: int | None = None # set to specific known value whenever full sweep is complete
files: dict[str, DownloadProgress] = field(default_factory=dict)
#: Total size of all files that were not skipped and did not error out
#: during download
Expand Down Expand Up @@ -1021,18 +1028,25 @@
total_downloaded = sum(
s.downloaded
for s in self.files.values()
if s.state in (DLState.DOWNLOADING, DLState.CHECKSUM_ERROR, DLState.DONE)
if s.state
in (
DLState.DOWNLOADING,
DLState.CHECKSUM_ERROR,
DLState.SKIPPED,
DLState.DONE,
)
)
return {
"done": total_downloaded,
"done%": total_downloaded / self.maxsize * 100,
"done%": total_downloaded / self.zarr_size * 100 if self.zarr_size else 0,
}

def set_status(self, statusdict: dict) -> None:
state_qtys = Counter(s.state for s in self.files.values())
total = len(self.files)
if (
total == self.file_qty
self.file_qty is not None # if already known
and total == self.file_qty
and state_qtys[DLState.STARTING] == state_qtys[DLState.DOWNLOADING] == 0
):
# All files have finished
Expand All @@ -1053,16 +1067,25 @@
def feed(self, path: str, status: dict) -> Iterator[dict]:
keys = list(status.keys())
self.files.setdefault(path, DownloadProgress())
size = status.get("size")
if size is not None:
if not self.yielded_size:
# this thread will yield
self.yielded_size = True
yield {"size": self.zarr_size}
if status.get("status") == "skipped":
self.files[path].state = DLState.SKIPPED
out = {"message": self.message}
# Treat skipped as "downloaded" for the matter of accounting
if size is not None:
self.files[path].downloaded = size
self.maxsize += size
self.set_status(out)
yield out
if self.zarr_size:
yield self.get_done()
elif keys == ["size"]:
if not self.yielded_size:
yield {"size": self.zarr_size}
self.yielded_size = True
self.files[path].size = status["size"]
self.files[path].size = size
self.maxsize += status["size"]
if any(s.state is DLState.DOWNLOADING for s in self.files.values()):
yield self.get_done()
Expand Down
99 changes: 58 additions & 41 deletions dandi/tests/test_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,9 +485,10 @@ def test_download_zarr_subdir_has_only_subdirs(


@pytest.mark.parametrize(
"file_qty,inputs,expected",
"zarr_size,file_qty,inputs,expected",
[
(
( # 0
42,
1,
[
("lonely.txt", {"size": 42}),
Expand All @@ -501,7 +502,7 @@ def test_download_zarr_subdir_has_only_subdirs(
("lonely.txt", {"status": "done"}),
],
[
{"size": 69105},
{"size": 42},
{"status": "downloading"},
{"done": 0, "done%": 0.0},
{"done": 20, "done%": 20 / 42 * 100},
Expand All @@ -510,7 +511,8 @@ def test_download_zarr_subdir_has_only_subdirs(
{"status": "done", "message": "1 done"},
],
),
(
( # 1
169,
2,
[
("apple.txt", {"size": 42}),
Expand All @@ -534,7 +536,7 @@ def test_download_zarr_subdir_has_only_subdirs(
("banana.txt", {"status": "done"}),
],
[
{"size": 69105},
{"size": 169},
{"status": "downloading"},
{"done": 0, "done%": 0.0},
{"done": 0, "done%": 0.0},
Expand All @@ -549,7 +551,8 @@ def test_download_zarr_subdir_has_only_subdirs(
{"status": "done", "message": "2 done"},
],
),
(
( # 2
169,
2,
[
("apple.txt", {"size": 42}),
Expand All @@ -573,10 +576,10 @@ def test_download_zarr_subdir_has_only_subdirs(
("banana.txt", {"status": "done"}),
],
[
{"size": 69105},
{"size": 169},
{"status": "downloading"},
{"done": 0, "done%": 0.0},
{"done": 20, "done%": 20 / 42 * 100},
{"done": 20, "done%": 20 / 169 * 100},
{"done": 20, "done%": 20 / 169 * 100},
{"done": 40, "done%": 40 / 169 * 100},
{"done": 42, "done%": 42 / 169 * 100},
Expand All @@ -589,7 +592,8 @@ def test_download_zarr_subdir_has_only_subdirs(
{"status": "done", "message": "2 done"},
],
),
(
( # 3
169,
2,
[
("apple.txt", {"size": 42}),
Expand All @@ -613,12 +617,12 @@ def test_download_zarr_subdir_has_only_subdirs(
("banana.txt", {"status": "done"}),
],
[
{"size": 69105},
{"size": 169},
{"status": "downloading"},
{"done": 0, "done%": 0.0},
{"done": 20, "done%": 20 / 42 * 100},
{"done": 40, "done%": 40 / 42 * 100},
{"done": 42, "done%": 42 / 42 * 100},
{"done": 20, "done%": 20 / 169 * 100},
{"done": 40, "done%": 40 / 169 * 100},
{"done": 42, "done%": 42 / 169 * 100},
{"message": "1 done"},
{"done": 42, "done%": 42 / 169 * 100},
{"done": 82, "done%": 82 / 169 * 100},
Expand All @@ -628,7 +632,8 @@ def test_download_zarr_subdir_has_only_subdirs(
{"status": "done", "message": "2 done"},
],
),
(
( # 4
169,
2,
[
("apple.txt", {"size": 42}),
Expand All @@ -647,29 +652,34 @@ def test_download_zarr_subdir_has_only_subdirs(
("apple.txt", {"status": "done"}),
],
[
{"size": 69105},
{"size": 169},
{"status": "downloading"},
{"done": 0, "done%": 0.0},
{"done": 0, "done%": 0.0},
{"done": 20, "done%": 20 / 169 * 100},
{"done": 60, "done%": 60 / 169 * 100},
{"done": 80, "done%": 80 / 169 * 100},
{"message": "1 errored"},
{"done": 40, "done%": 40 / 42 * 100},
{"done": 42, "done%": 100.0},
{"done": 40, "done%": 40 / 169 * 100},
{"done": 42, "done%": 42 / 169 * 100},
{"status": "error", "message": "1 done, 1 errored"},
],
),
(
( # 5
0,
1,
[("lonely.txt", {"status": "skipped", "message": "already exists"})],
[{"status": "skipped", "message": "1 skipped"}],
),
(
( # 6
169,
2,
[
("apple.txt", {"size": 42}),
("banana.txt", {"status": "skipped", "message": "already exists"}),
(
"banana.txt",
{"size": 127, "status": "skipped", "message": "already exists"},
),
("apple.txt", {"status": "downloading"}),
("apple.txt", {"done": 0, "done%": 0.0}),
("apple.txt", {"done": 20, "done%": 20 / 42 * 100}),
Expand All @@ -680,17 +690,19 @@ def test_download_zarr_subdir_has_only_subdirs(
("apple.txt", {"status": "done"}),
],
[
{"size": 69105},
{"size": 169},
{"message": "1 skipped"},
{"done": 127, "done%": (127 + 0) / 169 * 100},
{"status": "downloading"},
{"done": 0, "done%": 0.0},
{"done": 20, "done%": 20 / 42 * 100},
{"done": 40, "done%": 40 / 42 * 100},
{"done": 42, "done%": 100.0},
{"done": 127 + 0, "done%": (127 + 0) / 169 * 100},
{"done": 127 + 20, "done%": (127 + 20) / 169 * 100},
{"done": 127 + 40, "done%": (127 + 40) / 169 * 100},
{"done": 127 + 42, "done%": 100.0},
{"status": "done", "message": "1 done, 1 skipped"},
],
),
(
( # 7
169,
2,
[
("apple.txt", {"size": 42}),
Expand Down Expand Up @@ -719,7 +731,7 @@ def test_download_zarr_subdir_has_only_subdirs(
("apple.txt", {"status": "done"}),
],
[
{"size": 69105},
{"size": 169},
{"status": "downloading"},
{"done": 0, "done%": 0.0},
{"done": 0, "done%": 0.0},
Expand All @@ -734,14 +746,18 @@ def test_download_zarr_subdir_has_only_subdirs(
{"status": "error", "message": "1 done, 1 errored"},
],
),
(
( # 8
179,
3,
[
("apple.txt", {"size": 42}),
("banana.txt", {"size": 127}),
("apple.txt", {"status": "downloading"}),
("banana.txt", {"status": "downloading"}),
("coconut", {"status": "skipped", "message": "already exists"}),
(
"coconut",
{"size": 10, "status": "skipped", "message": "already exists"},
),
("apple.txt", {"done": 0, "done%": 0.0}),
("banana.txt", {"done": 0, "done%": 0.0}),
("apple.txt", {"done": 20, "done%": 20 / 42 * 100}),
Expand All @@ -764,28 +780,29 @@ def test_download_zarr_subdir_has_only_subdirs(
("banana.txt", {"status": "done"}),
],
[
{"size": 69105},
{"size": 179},
{"status": "downloading"},
{"message": "1 skipped"},
{"done": 0, "done%": 0.0},
{"done": 0, "done%": 0.0},
{"done": 20, "done%": 20 / 169 * 100},
{"done": 60, "done%": 60 / 169 * 100},
{"done": 80, "done%": 80 / 169 * 100},
{"done": 120, "done%": 120 / 169 * 100},
{"done": 122, "done%": 122 / 169 * 100},
{"done": 10, "done%": 10 / 179 * 100},
{"done": 10, "done%": 10 / 179 * 100},
{"done": 10, "done%": 10 / 179 * 100},
{"done": 10 + 20, "done%": (10 + 20) / 179 * 100},
{"done": 10 + 60, "done%": (10 + 60) / 179 * 100},
{"done": 10 + 80, "done%": (10 + 80) / 179 * 100},
{"done": 10 + 120, "done%": (10 + 120) / 179 * 100},
{"done": 10 + 122, "done%": (10 + 122) / 179 * 100},
{"message": "1 errored, 1 skipped"},
{"done": 162, "done%": 162 / 169 * 100},
{"done": 169, "done%": 100.0},
{"done": 10 + 162, "done%": (10 + 162) / 179 * 100},
{"done": 179, "done%": 100.0},
{"status": "error", "message": "1 done, 1 errored, 1 skipped"},
],
),
],
)
def test_progress_combiner(
file_qty: int, inputs: list[tuple[str, dict]], expected: list[dict]
zarr_size: int, file_qty: int, inputs: list[tuple[str, dict]], expected: list[dict]
) -> None:
pc = ProgressCombiner(zarr_size=69105, file_qty=file_qty)
pc = ProgressCombiner(zarr_size=zarr_size, file_qty=file_qty)
outputs: list[dict] = []
for path, status in inputs:
outputs.extend(pc.feed(path, status))
Expand Down
Loading