Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

download: more consistent and exhaustive logging, new DANDI_DEVEL_AGGRESSIVE_RETRY mode, respect (?) Retry-After #1509

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions dandi/cli/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,20 @@
lambda r: r.name != "pyout" and not r.name.startswith("pyout.")
)
root.addHandler(handler)
exts = (

Check warning on line 117 in dandi/cli/command.py

View check run for this annotation

Codecov / codecov/patch

dandi/cli/command.py#L117

Added line #L117 was not covered by tests
"dandischema",
"h5py",
"hdmf",
"pynwb",
"requests",
"urllib3",
)

lgr.info(
"dandi v%s, hdmf v%s, pynwb v%s, h5py v%s",
"python %s, dandi %s, "
+ ", ".join("%s %s" % (e, get_module_version(e)) for e in sorted(exts)),
sys.version.split()[0],
__version__,
get_module_version("hdmf"),
get_module_version("pynwb"),
get_module_version("h5py"),
extra={"file_only": True},
)
lgr.info("sys.argv = %r", sys.argv, extra={"file_only": True})
Expand Down
15 changes: 14 additions & 1 deletion dandi/dandiapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -1592,10 +1592,23 @@
# TODO: apparently we might need retries here as well etc
# if result.status_code not in (200, 201):
result.raise_for_status()
nbytes, nchunks = 0, 0
for chunk in result.iter_content(chunk_size=chunk_size):
nchunks += 1
if chunk: # could be some "keep alive"?
nbytes += len(chunk)
yield chunk
lgr.info("Asset %s successfully downloaded", self.identifier)
else:
lgr.debug("'Empty' chunk downloaded for %s", url)

Check warning on line 1602 in dandi/dandiapi.py

View check run for this annotation

Codecov / codecov/patch

dandi/dandiapi.py#L1602

Added line #L1602 was not covered by tests
lgr.info(
"Asset %s (%d bytes in %d chunks starting from %d) successfully "
"downloaded from %s",
self.identifier,
nbytes,
nchunks,
start_at,
url,
)

return downloader

Expand Down
141 changes: 119 additions & 22 deletions dandi/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from pathlib import Path
import random
from shutil import rmtree
import sys
from threading import Lock
import time
from types import TracebackType
Expand Down Expand Up @@ -685,22 +686,33 @@
if digester is not None:
break
if digester is None:
lgr.warning("Found no digests in hashlib for any of %s", str(digests))
lgr.warning(

Check warning on line 689 in dandi/download.py

View check run for this annotation

Codecov / codecov/patch

dandi/download.py#L689

Added line #L689 was not covered by tests
"%s - found no digests in hashlib for any of %s", path, str(digests)
)

# TODO: how do we discover the total size????
# TODO: do not do it in-place, but rather into some "hidden" file
resuming = False
for attempt in range(3):
attempt = 0
nattempts = 3 # number to do, could be incremented if we downloaded a little
while attempt <= nattempts:
attempt += 1
try:
if digester:
downloaded_digest = digester() # start empty
warned = False
# I wonder if we could make writing async with downloader
with DownloadDirectory(path, digests or {}) as dldir:
assert dldir.offset is not None
downloaded_in_attempt = 0
downloaded = dldir.offset
resuming = downloaded > 0
if size is not None and downloaded == size:
lgr.debug(
"%s - downloaded size matches target size of %d, exiting the loop",
path,
size,
)
# Exit early when downloaded == size, as making a Range
# request in such a case results in a 416 error from S3.
# Problems will result if `size` is None but we've already
Expand All @@ -711,14 +723,16 @@
assert downloaded_digest is not None
downloaded_digest.update(block)
downloaded += len(block)
downloaded_in_attempt += len(block)
# TODO: yield progress etc
out: dict[str, Any] = {"done": downloaded}
if size:
if downloaded > size and not warned:
warned = True
# Yield ERROR?
lgr.warning(
"Downloaded %d bytes although size was told to be just %d",
"%s - downloaded %d bytes although size was told to be just %d",
path,
downloaded,
size,
)
Expand All @@ -735,29 +749,83 @@
# Catching RequestException lets us retry on timeout & connection
# errors (among others) in addition to HTTP status errors.
except requests.RequestException as exc:
sleep_amount = random.random() * 5 * attempt
if os.environ.get("DANDI_DEVEL_AGGRESSIVE_RETRY"):

Check warning on line 753 in dandi/download.py

View check run for this annotation

Codecov / codecov/patch

dandi/download.py#L752-L753

Added lines #L752 - L753 were not covered by tests
# in such a case if we downloaded a little more --
# consider it a successful attempt
if downloaded_in_attempt > 0:
lgr.debug(

Check warning on line 757 in dandi/download.py

View check run for this annotation

Codecov / codecov/patch

dandi/download.py#L756-L757

Added lines #L756 - L757 were not covered by tests
"%s - download failed on attempt #%d: %s, "
"but did download %d bytes, so considering "
"it a success and incrementing number of allowed attempts.",
path,
attempt,
exc,
downloaded_in_attempt,
)
nattempts += 1

Check warning on line 766 in dandi/download.py

View check run for this annotation

Codecov / codecov/patch

dandi/download.py#L766

Added line #L766 was not covered by tests
# TODO: actually we should probably retry only on selected codes,
# and also respect Retry-After
if attempt >= 2 or (
exc.response is not None
and exc.response.status_code
not in (
if exc.response is not None:
if exc.response.status_code not in (

Check warning on line 769 in dandi/download.py

View check run for this annotation

Codecov / codecov/patch

dandi/download.py#L768-L769

Added lines #L768 - L769 were not covered by tests
400, # Bad Request, but happened with gider:
# https://github.com/dandi/dandi-cli/issues/87
*RETRY_STATUSES,
):
lgr.debug(

Check warning on line 774 in dandi/download.py

View check run for this annotation

Codecov / codecov/patch

dandi/download.py#L774

Added line #L774 was not covered by tests
"%s - download failed due to response %d: %s",
path,
exc.response.status_code,
exc,
)
yield {"status": "error", "message": str(exc)}
return
elif retry_after := exc.response.headers.get("Retry-After"):

Check warning on line 782 in dandi/download.py

View check run for this annotation

Codecov / codecov/patch

dandi/download.py#L780-L782

Added lines #L780 - L782 were not covered by tests
# playing safe
if not str(retry_after).isdigit():

Check warning on line 784 in dandi/download.py

View check run for this annotation

Codecov / codecov/patch

dandi/download.py#L784

Added line #L784 was not covered by tests
# our code is wrong, do not crash but issue warning so
# we might get report/fix it up
lgr.warning(

Check warning on line 787 in dandi/download.py

View check run for this annotation

Codecov / codecov/patch

dandi/download.py#L787

Added line #L787 was not covered by tests
"%s - download failed due to response %d with non-integer"
" Retry-After=%r: %s",
path,
exc.response.status_code,
retry_after,
exc,
)
yield {"status": "error", "message": str(exc)}
return
sleep_amount = int(retry_after)
lgr.debug(

Check warning on line 798 in dandi/download.py

View check run for this annotation

Codecov / codecov/patch

dandi/download.py#L795-L798

Added lines #L795 - L798 were not covered by tests
"%s - download failed due to response %d with "
"Retry-After=%d: %s, will sleep and retry",
path,
exc.response.status_code,
sleep_amount,
exc,
)
else:
lgr.debug("%s - download failed: %s", path, exc)
yield {"status": "error", "message": str(exc)}
return
elif attempt >= nattempts:
lgr.debug(

Check warning on line 811 in dandi/download.py

View check run for this annotation

Codecov / codecov/patch

dandi/download.py#L807-L811

Added lines #L807 - L811 were not covered by tests
"%s - download failed after %d attempts: %s", path, attempt, exc
)
):
lgr.debug("Download failed: %s", exc)
yield {"status": "error", "message": str(exc)}
return
# if is_access_denied(exc) or attempt >= 2:
# raise
# sleep a little and retry
lgr.debug(
"Failed to download on attempt #%d: %s, will sleep a bit and retry",
attempt,
exc,
)
time.sleep(random.random() * 5)
else:
lgr.debug(

Check warning on line 820 in dandi/download.py

View check run for this annotation

Codecov / codecov/patch

dandi/download.py#L820

Added line #L820 was not covered by tests
"%s - download failed on attempt #%d: %s, will sleep a bit and retry",
path,
attempt,
exc,
)
time.sleep(sleep_amount)

Check warning on line 826 in dandi/download.py

View check run for this annotation

Codecov / codecov/patch

dandi/download.py#L826

Added line #L826 was not covered by tests
else:
lgr.warning("downloader logic: We should not be here!")

Check warning on line 828 in dandi/download.py

View check run for this annotation

Codecov / codecov/patch

dandi/download.py#L828

Added line #L828 was not covered by tests

if downloaded_digest and not resuming:
assert downloaded_digest is not None
Expand All @@ -768,11 +836,11 @@
if digest != final_digest:
msg = f"{algo}: downloaded {final_digest} != {digest}"
yield {"checksum": "differs", "status": "error", "message": msg}
lgr.debug("%s is different: %s.", path, msg)
lgr.debug("%s - is different: %s.", path, msg)

Check warning on line 839 in dandi/download.py

View check run for this annotation

Codecov / codecov/patch

dandi/download.py#L839

Added line #L839 was not covered by tests
return
else:
yield {"checksum": "ok"}
lgr.debug("Verified that %s has correct %s %s", path, algo, digest)
lgr.debug("%s - verified that has correct %s %s", path, algo, digest)
else:
# shouldn't happen with more recent metadata etc
yield {
Expand Down Expand Up @@ -825,16 +893,22 @@
):
# Pick up where we left off, writing to the end of the file
lgr.debug(
"Download directory exists and has matching checksum; resuming download"
"%s - download directory exists and has matching checksum(s) %s; resuming download",
self.dirpath,
matching_algs,
)
self.fp = self.writefile.open("ab")
else:
# Delete the file (if it even exists) and start anew
if not chkpath.exists():
lgr.debug("Starting new download in new download directory")
lgr.debug(
"%s - no prior digests found; starting new download", self.dirpath
)
else:
lgr.debug(
"Download directory found, but digests do not match; starting new download"
"%s - download directory found, but digests do not match;"
" starting new download",
self.dirpath,
)
try:
self.writefile.unlink()
Expand All @@ -853,12 +927,35 @@
exc_tb: TracebackType | None,
) -> None:
assert self.fp is not None
if exc_type is not None or exc_val is not None or exc_tb is not None:
lgr.debug(
"%s - entered __exit__ with position %d with exception: %s, %s",
self.dirpath,
self.fp.tell(),
exc_type,
exc_val,
)
else:
lgr.debug(
"%s - entered __exit__ with position %d without any exception",
self.dirpath,
self.fp.tell(),
)
self.fp.close()
try:
if exc_type is None:
try:
self.writefile.replace(self.filepath)
except IsADirectoryError:
except (IsADirectoryError, PermissionError) as exc:
if isinstance(exc, PermissionError):
if not (

Check warning on line 951 in dandi/download.py

View check run for this annotation

Codecov / codecov/patch

dandi/download.py#L951

Added line #L951 was not covered by tests
sys.platform.startswith("win") and self.filepath.is_dir()
):
raise

Check warning on line 954 in dandi/download.py

View check run for this annotation

Codecov / codecov/patch

dandi/download.py#L954

Added line #L954 was not covered by tests
lgr.debug(
"Destination path %s is a directory; removing it and retrying",
self.filepath,
)
rmtree(self.filepath)
self.writefile.replace(self.filepath)
finally:
Expand Down
89 changes: 89 additions & 0 deletions dandi/tests/test_download.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from __future__ import annotations

from collections.abc import Callable
from glob import glob
import json
import logging
from multiprocessing import Manager, Process
import os
import os.path
from pathlib import Path
Expand All @@ -21,6 +24,7 @@
from ..consts import DRAFT, dandiset_metadata_file
from ..dandiarchive import DandisetURL
from ..download import (
DownloadDirectory,
Downloader,
DownloadExisting,
DownloadFormat,
Expand Down Expand Up @@ -1038,3 +1042,88 @@
assert len(done) == 2
else:
assert done[-1] == f"ETA: {10 - i} seconds<"


def test_DownloadDirectory_basic(tmp_path: Path) -> None:
with DownloadDirectory(tmp_path, digests={}) as dl:
assert dl.dirpath.exists()
assert dl.writefile.exists()
assert dl.writefile.stat().st_size == 0
assert dl.offset == 0

dl.append(b"123")
assert dl.fp is not None
dl.fp.flush() # appends are not flushed automatically
assert dl.writefile.stat().st_size == 3
assert dl.offset == 0 # doesn't change

dl.append(b"456")
inode_number = dl.writefile.stat().st_ino
assert inode_number != tmp_path.stat().st_ino

# but after we are done - should be a full file!
assert tmp_path.stat().st_size == 6
assert tmp_path.read_bytes() == b"123456"
# we moved the file, didn't copy (expensive)
assert inode_number == tmp_path.stat().st_ino

# no problem with overwriting with new content
with DownloadDirectory(tmp_path, digests={}) as dl:
dl.append(b"789")
assert tmp_path.read_bytes() == b"789"

# even if path is a directory which we "overwrite"
tmp_path.unlink()
tmp_path.mkdir()
(tmp_path / "somedata.dat").write_text("content")
with DownloadDirectory(tmp_path, digests={}) as dl:
assert set(glob(f"{tmp_path}*")) == {str(tmp_path), str(dl.dirpath)}
dl.append(b"123")
assert tmp_path.read_bytes() == b"123"

# no temp .dandidownload folder is left behind
assert set(glob(f"{tmp_path}*")) == {str(tmp_path)}

# test locking
with Manager() as manager:
results = manager.list()
with DownloadDirectory(tmp_path, digests={}) as dl:
dl.append(b"123")
p1 = Process(target=_download_directory_subproc, args=(tmp_path, results))
p1.start()
p1.join()
assert len(results) == 1
assert results[0] == f"Could not acquire download lock for {tmp_path}"
assert tmp_path.read_bytes() == b"123"


# needs to be a top-level function for pickling
def _download_directory_subproc(path, results):
try:
with DownloadDirectory(path, digests={}):
results.append("re-entered fine")

Check warning on line 1104 in dandi/tests/test_download.py

View check run for this annotation

Codecov / codecov/patch

dandi/tests/test_download.py#L1104

Added line #L1104 was not covered by tests
except Exception as exc:
results.append(str(exc))


def test_DownloadDirectory_exc(
tmp_path: Path, caplog: pytest.LogCaptureFixture
) -> None:
caplog.set_level(logging.DEBUG, logger="dandi")
# and now let's exit with exception
with pytest.raises(RuntimeError):
with DownloadDirectory(tmp_path, digests={}) as dl:
dl.append(b"456")
raise RuntimeError("Boom")
assert (
"dandi",
10,
f"{dl.dirpath} - entered __exit__ with position 3 with exception: "
"<class 'RuntimeError'>, Boom",
) == caplog.record_tuples[-1]
# and we left without cleanup but closed things up after ourselves
assert tmp_path.exists()
assert tmp_path.is_dir()
assert dl.dirpath.exists()
assert dl.fp is None
assert dl.writefile.read_bytes() == b"456"
Loading
Loading