diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index f44749a..e09722f 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -32,6 +32,8 @@ jobs: with: python-version: ${{ matrix.python-version }} cache: "pip" + - name: Update pip + run: pip install -U pip - name: Install with development dependencies run: pip install .[cli,dev] - name: Check with pre-commit @@ -48,6 +50,8 @@ jobs: with: python-version: "3.11" cache: "pip" + - name: Update pip + run: pip install -U pip - name: Install with development dependencies run: pip install .[cli,dev] - name: Install minimum versions of dependencies @@ -66,6 +70,8 @@ jobs: with: python-version: "3.11" cache: "pip" + - name: Update pip + run: pip install -U pip - name: Install with development dependencies run: pip install .[cli,dev] - name: Test with coverage diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index ebfb203..bb42a2a 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -12,6 +12,8 @@ repos: - pystac - pytest - types-aiofiles + - types-python-dateutil + - types-tqdm - repo: https://github.com/charliermarsh/ruff-pre-commit rev: "v0.0.278" hooks: diff --git a/CHANGELOG.md b/CHANGELOG.md index eb41173..6361dd6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - `Client.from_config` and `Client.close` ([#46](https://github.com/stac-utils/stac-asset/pull/46)) - Retry configuration for S3 ([#47](https://github.com/stac-utils/stac-asset/pull/47)) - `Collection` download ([#50](https://github.com/stac-utils/stac-asset/pull/50)) +- Progress reporting ([#55](https://github.com/stac-utils/stac-asset/pull/55)) ### Changed diff --git a/pyproject.toml b/pyproject.toml index d8fe907..96dee52 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,11 +21,12 @@ dependencies = [ "aiobotocore>=2.5.0", "aiohttp>=3.8.4", "pystac>=1.7.3", + "python-dateutil>=2.7.0", "yarl>=1.9.2", ] [project.optional-dependencies] -cli = ["click~=8.1.5", "click-logging~=1.0.1"] +cli = ["click~=8.1.5", "click-logging~=1.0.1", "tqdm~=4.65.1"] dev = [ "black~=23.3", "mypy~=1.3", @@ -35,6 +36,8 @@ dev = [ "pytest-cov~=4.1", "ruff==0.0.282", "types-aiofiles~=23.1", + "types-python-dateutil~=2.8.19", + "types-tqdm~=4.65.0", ] docs = ["pydata-sphinx-theme~=0.13", "sphinx~=7.0"] diff --git a/src/stac_asset/_cli.py b/src/stac_asset/_cli.py index 438f6cb..4306afa 100644 --- a/src/stac_asset/_cli.py +++ b/src/stac_asset/_cli.py @@ -3,18 +3,36 @@ import logging import os import sys -from typing import List, Optional, Union +from asyncio import Queue +from dataclasses import dataclass +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union import click import click_logging +import tqdm from pystac import Item, ItemCollection from . import Config, functions from .config import DEFAULT_S3_MAX_ATTEMPTS, DEFAULT_S3_RETRY_MODE +from .messages import ( + ErrorAssetDownload, + FinishAssetDownload, + OpenUrl, + StartAssetDownload, + WriteChunk, +) logger = logging.getLogger(__name__) click_logging.basic_config(logger) +# Needed until we drop Python 3.8 +if TYPE_CHECKING: + AnyQueue = Queue[Any] + Tqdm = tqdm.tqdm[Any] +else: + AnyQueue = Queue + Tqdm = tqdm.tqdm + @click.group() def cli() -> None: @@ -111,6 +129,36 @@ def download( $ stac-asset download -i asset-key-to-include item.json """ + asyncio.run( + download_async( + href, + directory, + alternate_assets, + include, + exclude, + file_name, + quiet, + s3_requester_pays, + s3_retry_mode, + s3_max_attempts, + warn, + ) + ) + + +async def download_async( + href: Optional[str], + directory: Optional[str], + alternate_assets: List[str], + include: List[str], + exclude: List[str], + file_name: Optional[str], + quiet: bool, + s3_requester_pays: bool, + s3_retry_mode: str, + s3_max_attempts: int, + warn: bool, +) -> None: config = Config( alternate_assets=alternate_assets, include=include, @@ -125,39 +173,58 @@ def download( if href is None or href == "-": input_dict = json.load(sys.stdin) else: - input_dict = json.loads(asyncio.run(read_file(href, config))) + input_dict = json.loads(await read_file(href, config)) if directory is None: - directory = os.getcwd() + directory_str = os.getcwd() + else: + directory_str = str(directory) + + if quiet: + queue = None + else: + queue = Queue() type_ = input_dict.get("type") if type_ is None: - print("ERROR: missing 'type' field on input dictionary", file=sys.stderr) + if not quiet: + print("ERROR: missing 'type' field on input dictionary", file=sys.stderr) sys.exit(1) elif type_ == "Feature": item = Item.from_dict(input_dict) if href: item.set_self_href(href) item.make_asset_hrefs_absolute() - output: Union[Item, ItemCollection] = asyncio.run( - functions.download_item( + + async def download() -> Union[Item, ItemCollection]: + return await functions.download_item( item, - directory, + directory_str, config=config, + queue=queue, ) - ) + elif type_ == "FeatureCollection": item_collection = ItemCollection.from_dict(input_dict) - output = asyncio.run( - functions.download_item_collection( + + async def download() -> Union[Item, ItemCollection]: + return await functions.download_item_collection( item_collection, - directory, + directory_str, config=config, + queue=queue, ) - ) + else: - print(f"ERROR: unsupported 'type' field: {type_}", file=sys.stderr) + if not quiet: + print(f"ERROR: unsupported 'type' field: {type_}", file=sys.stderr) sys.exit(2) + task = asyncio.create_task(report_progress(queue)) + output = await download() + if queue: + await queue.put(None) + await task + if not quiet: json.dump(output.to_dict(transform_hrefs=False), sys.stdout) @@ -170,3 +237,61 @@ async def read_file(href: str, config: Config) -> bytes: async for chunk in client.open_href(href): data += chunk return data + + +async def report_progress(queue: Optional[AnyQueue]) -> None: + if queue is None: + return + downloads: Dict[str, Download] = dict() + while True: + message = await queue.get() + if isinstance(message, StartAssetDownload): + progress_bar = tqdm.tqdm( + position=len(downloads), + unit="B", + unit_scale=True, + unit_divisor=1024, + leave=False, + ) + if message.item_id: + description = f"{message.item_id} [{message.key}]" + else: + description = message.key + progress_bar.set_description_str(description) + downloads[message.href] = Download( + key=message.key, + item_id=message.item_id, + href=message.href, + path=str(message.path), + progress_bar=progress_bar, + ) + elif isinstance(message, OpenUrl): + download = downloads.get(str(message.url)) + if download: + if message.size: + download.progress_bar.reset(total=message.size) + elif isinstance(message, FinishAssetDownload): + download = downloads.get(message.href) + if download: + download.progress_bar.close() + elif isinstance(message, ErrorAssetDownload): + download = downloads.get(message.href) + if download: + download.progress_bar.close() + elif isinstance(message, WriteChunk): + download = downloads.get(message.href) + if download: + download.progress_bar.update(message.size) + elif message is None: + for download in downloads.values(): + download.progress_bar.close() + return + + +@dataclass +class Download: + key: str + item_id: Optional[str] + href: str + path: str + progress_bar: Tqdm diff --git a/src/stac_asset/client.py b/src/stac_asset/client.py index 38200a6..566602d 100644 --- a/src/stac_asset/client.py +++ b/src/stac_asset/client.py @@ -1,15 +1,22 @@ from __future__ import annotations from abc import ABC, abstractmethod +from asyncio import Queue, QueueFull from pathlib import Path from types import TracebackType -from typing import AsyncIterator, Optional, Type, TypeVar +from typing import Any, AsyncIterator, Optional, Type, TypeVar import aiofiles from pystac import Asset from yarl import URL from .config import Config +from .messages import ( + ErrorAssetDownload, + FinishAssetDownload, + StartAssetDownload, + WriteChunk, +) from .types import PathLikeObject T = TypeVar("T", bound="Client") @@ -34,7 +41,10 @@ def __init__(self) -> None: @abstractmethod async def open_url( - self, url: URL, content_type: Optional[str] = None + self, + url: URL, + content_type: Optional[str] = None, + queue: Optional[Queue[Any]] = None, ) -> AsyncIterator[bytes]: """Opens a url and yields an iterator over its bytes. @@ -44,6 +54,7 @@ async def open_url( url: The input url content_type: The expected content type, to be checked by the client implementations + queue: An optional queue to use for progress reporting Yields: AsyncIterator[bytes]: An iterator over chunks of the read file @@ -53,18 +64,24 @@ async def open_url( yield async def open_href( - self, href: str, content_type: Optional[str] = None + self, + href: str, + content_type: Optional[str] = None, + queue: Optional[Queue[Any]] = None, ) -> AsyncIterator[bytes]: """Opens a href and yields an iterator over its bytes. Args: href: The input href content_type: The expected content type + queue: An optional queue to use for progress reporting Yields: AsyncIterator[bytes]: An iterator over chunks of the read file """ - async for chunk in self.open_url(URL(href), content_type=content_type): + async for chunk in self.open_url( + URL(href), content_type=content_type, queue=queue + ): yield chunk async def download_href( @@ -73,6 +90,7 @@ async def download_href( path: PathLikeObject, clean: bool = True, content_type: Optional[str] = None, + queue: Optional[Queue[Any]] = None, ) -> None: """Downloads a file to the local filesystem. @@ -81,11 +99,21 @@ async def download_href( path: The output file path clean: If an error occurs, delete the output file if it exists content_type: The expected content type + queue: An optional queue to use for progress reporting """ try: async with aiofiles.open(path, mode="wb") as f: - async for chunk in self.open_href(href, content_type=content_type): + async for chunk in self.open_href( + href, content_type=content_type, queue=queue + ): await f.write(chunk) + if queue: + try: + queue.put_nowait( + WriteChunk(href=href, path=Path(path), size=len(chunk)) + ) + except QueueFull: + pass except Exception as err: path_as_path = Path(path) if clean and path_as_path.exists(): @@ -96,7 +124,12 @@ async def download_href( raise err async def download_asset( - self, key: str, asset: Asset, path: Path, clean: bool = True + self, + key: str, + asset: Asset, + path: Path, + clean: bool = True, + queue: Optional[Queue[Any]] = None, ) -> Asset: """Downloads an asset. @@ -105,6 +138,7 @@ async def download_asset( asset: The asset clean: If an error occurs, delete the output file if it exists path: The path to which the asset will be downloaded + queue: An optional queue to use for progress reporting Returns: Asset: The asset with an updated href @@ -117,7 +151,25 @@ async def download_asset( raise ValueError( f"asset '{key}' does not have an absolute href: {asset.href}" ) - await self.download_href(href, path, clean=clean, content_type=asset.media_type) + if queue: + if asset.owner: + item_id = asset.owner.id + else: + item_id = None + await queue.put( + StartAssetDownload(key=key, href=href, path=path, item_id=item_id) + ) + try: + await self.download_href( + href, path, clean=clean, content_type=asset.media_type, queue=queue + ) + except Exception as err: + if queue: + await queue.put(ErrorAssetDownload(key=key, href=href, path=path)) + raise err + + if queue: + await queue.put(FinishAssetDownload(key=key, href=href, path=path)) asset.href = str(path) return asset diff --git a/src/stac_asset/config.py b/src/stac_asset/config.py index fda7ba4..037a5f2 100644 --- a/src/stac_asset/config.py +++ b/src/stac_asset/config.py @@ -49,6 +49,9 @@ class Config: warn: bool = False """When downloading, warn instead of erroring.""" + clean: bool = True + """If true, clean up the downloaded file if it errors.""" + earthdata_token: Optional[str] = None """A token for logging in to Earthdata.""" diff --git a/src/stac_asset/filesystem_client.py b/src/stac_asset/filesystem_client.py index d4f0d5a..d666ac8 100644 --- a/src/stac_asset/filesystem_client.py +++ b/src/stac_asset/filesystem_client.py @@ -1,12 +1,15 @@ from __future__ import annotations +import os.path +from asyncio import Queue from types import TracebackType -from typing import AsyncIterator, Optional, Type +from typing import Any, AsyncIterator, Optional, Type import aiofiles from yarl import URL from .client import Client +from .messages import OpenUrl class FilesystemClient(Client): @@ -16,7 +19,10 @@ class FilesystemClient(Client): """ async def open_url( - self, url: URL, content_type: Optional[str] = None + self, + url: URL, + content_type: Optional[str] = None, + queue: Optional[Queue[Any]] = None, ) -> AsyncIterator[bytes]: """Iterates over data from a local url. @@ -24,6 +30,7 @@ async def open_url( url: The url to read bytes from content_type: The expected content type. Ignored by this client, because filesystems don't have content types. + queue: An optional queue to use for progress reporting Yields: AsyncIterator[bytes]: An iterator over the file's bytes. @@ -37,6 +44,8 @@ async def open_url( "cannot read a file with the filesystem client if it has a url scheme: " + str(url) ) + if queue: + await queue.put(OpenUrl(size=os.path.getsize(url.path), url=url)) async with aiofiles.open(url.path, "rb") as f: async for chunk in f: yield chunk diff --git a/src/stac_asset/functions.py b/src/stac_asset/functions.py index 48e9ba6..18b6965 100644 --- a/src/stac_asset/functions.py +++ b/src/stac_asset/functions.py @@ -1,8 +1,9 @@ import asyncio import os.path import warnings +from asyncio import Queue from pathlib import Path -from typing import Dict, Optional, Set, Tuple, Type, TypeVar +from typing import TYPE_CHECKING, Any, Dict, Optional, Set, Tuple, Type, TypeVar import pystac.utils from pystac import Asset, Collection, Item, ItemCollection, STACError @@ -22,11 +23,18 @@ from .strategy import FileNameStrategy from .types import PathLikeObject +# Needed until we drop Python 3.8 +if TYPE_CHECKING: + AnyQueue = Queue[Any] +else: + AnyQueue = Queue + async def download_item( item: Item, directory: PathLikeObject, config: Optional[Config] = None, + queue: Optional[AnyQueue] = None, ) -> Item: """Downloads an item to the local filesystem. @@ -34,6 +42,7 @@ async def download_item( item: The :py:class:`pystac.Item`. directory: The output directory that will hold the items and assets. config: The download configuration + queue: An optional queue to use for progress reporting Returns: Item: The `~pystac.Item`, with the updated asset hrefs and self href. @@ -41,13 +50,14 @@ async def download_item( Raises: ValueError: Raised if the item doesn't have any assets. """ - return await _download(item, directory, config or Config()) + return await _download(item, directory, config=config or Config(), queue=queue) async def download_item_collection( item_collection: ItemCollection, directory: PathLikeObject, config: Optional[Config] = None, + queue: Optional[AnyQueue] = None, ) -> ItemCollection: """Downloads an item collection to the local filesystem. @@ -55,6 +65,7 @@ async def download_item_collection( item_collection: The item collection to download directory: The destination directory config: The download configuration + queue: An optional queue to use for progress reporting Returns: ItemCollection: The item collection, with updated asset hrefs @@ -87,6 +98,7 @@ async def download_item_collection( item=item, directory=item_directory, config=item_config, + queue=queue, ) ) ) @@ -113,7 +125,10 @@ async def download_item_collection( async def download_collection( - collection: Collection, directory: PathLikeObject, config: Optional[Config] = None + collection: Collection, + directory: PathLikeObject, + config: Optional[Config] = None, + queue: Optional[AnyQueue] = None, ) -> Collection: """Downloads a collection to the local filesystem. @@ -124,6 +139,7 @@ async def download_collection( collection: A pystac collection directory: The destination directory config: The download configuration + queue: An optional queue to use for progress reporting Returns: Collection: The colleciton, with updated asset hrefs @@ -131,7 +147,7 @@ async def download_collection( Raises: CantIncludeAndExclude: Raised if both include and exclude are not None. """ - return await _download(collection, directory, config or Config()) + return await _download(collection, directory, config or Config(), queue=queue) def guess_client_class(asset: Asset, config: Config) -> Type[Client]: @@ -190,7 +206,12 @@ def guess_client_class_from_href(href: str) -> Type[Client]: _T = TypeVar("_T", Collection, Item) -async def _download(stac_object: _T, directory: PathLikeObject, config: Config) -> _T: +async def _download( + stac_object: _T, + directory: PathLikeObject, + config: Config, + queue: Optional[AnyQueue], +) -> _T: config.validate() if not stac_object.assets: @@ -246,8 +267,12 @@ async def _download(stac_object: _T, directory: PathLikeObject, config: Config) else: client = await client_class.from_config(config) clients[client_class] = client + cloned_asset = asset.clone() + cloned_asset.set_owner(stac_object) tasks[key] = asyncio.create_task( - client.download_asset(key, asset.clone(), path) + client.download_asset( + key, cloned_asset, path, clean=config.clean, queue=queue + ) ) if stac_object.get_self_href(): stac_object.assets[key].href = pystac.utils.make_relative_href( diff --git a/src/stac_asset/http_client.py b/src/stac_asset/http_client.py index 1e0dec6..5f72111 100644 --- a/src/stac_asset/http_client.py +++ b/src/stac_asset/http_client.py @@ -1,7 +1,8 @@ from __future__ import annotations +from asyncio import Queue from types import TracebackType -from typing import AsyncIterator, Optional, Type, TypeVar +from typing import Any, AsyncIterator, Optional, Type, TypeVar from aiohttp import ClientSession from yarl import URL @@ -9,6 +10,7 @@ from . import validate from .client import Client from .config import Config +from .messages import OpenUrl T = TypeVar("T", bound="HttpClient") @@ -36,13 +38,17 @@ def __init__(self, session: ClientSession, check_content_type: bool = True) -> N self.check_content_type = check_content_type async def open_url( - self, url: URL, content_type: Optional[str] = None + self, + url: URL, + content_type: Optional[str] = None, + queue: Optional[Queue[Any]] = None, ) -> AsyncIterator[bytes]: """Opens a url with this client's session and iterates over its bytes. Args: url: The url to open content_type: The expected content type + queue: An optional queue to use for progress reporting Yields: AsyncIterator[bytes]: An iterator over the file's bytes @@ -56,6 +62,8 @@ async def open_url( validate.content_type( actual=response.content_type, expected=content_type ) + if queue: + await queue.put(OpenUrl(url=url, size=response.content_length)) async for chunk, _ in response.content.iter_chunks(): yield chunk diff --git a/src/stac_asset/messages.py b/src/stac_asset/messages.py new file mode 100644 index 0000000..bba44a0 --- /dev/null +++ b/src/stac_asset/messages.py @@ -0,0 +1,75 @@ +from dataclasses import dataclass +from pathlib import Path +from typing import Optional + +from yarl import URL + + +@dataclass +class StartAssetDownload: + """Sent when an asset starts downloading.""" + + key: str + """The asset key.""" + + item_id: Optional[str] + """The item id.""" + + href: str + """The asset href.""" + + path: Path + """The local path that the asset is being downloaded to.""" + + +@dataclass +class ErrorAssetDownload: + """Sent when an asset starts downloading.""" + + key: str + """The asset key.""" + + href: str + """The asset href.""" + + path: Path + """The local path that the asset is being downloaded to.""" + + +@dataclass +class FinishAssetDownload: + """Sent when an asset finishes downloading.""" + + key: str + """The asset key.""" + + href: str + """The asset href.""" + + path: Path + """The local path that the asset is being downloaded to.""" + + +@dataclass +class WriteChunk: + """Sent when a chunk is written to disk.""" + + href: str + """The asset href.""" + + path: Path + """The local path that the asset is being downloaded to.""" + + size: int + """The number of bytes written.""" + + +@dataclass +class OpenUrl: + """Sent when a url is first opened.""" + + url: URL + """The URL""" + + size: Optional[int] + """The file size.""" diff --git a/src/stac_asset/planetary_computer_client.py b/src/stac_asset/planetary_computer_client.py index 44527d1..c1b4de8 100644 --- a/src/stac_asset/planetary_computer_client.py +++ b/src/stac_asset/planetary_computer_client.py @@ -1,11 +1,12 @@ from __future__ import annotations import datetime -from asyncio import Lock +from asyncio import Lock, Queue from datetime import timezone from types import TracebackType from typing import Any, AsyncIterator, Dict, Optional, Type +import dateutil.parser from aiohttp import ClientSession from yarl import URL @@ -21,7 +22,7 @@ class _Token: @classmethod def from_dict(cls, data: Dict[str, Any]) -> _Token: try: - expiry = datetime.datetime.fromisoformat(data["msft:expiry"]) + expiry = dateutil.parser.isoparse(data["msft:expiry"]) except KeyError: raise ValueError(f"missing 'msft:expiry' key in dict: {data}") @@ -66,7 +67,10 @@ def __init__( self.sas_token_endpoint = URL(sas_token_endpoint) async def open_url( - self, url: URL, content_type: Optional[str] = None + self, + url: URL, + content_type: Optional[str] = None, + queue: Optional[Queue[Any]] = None, ) -> AsyncIterator[bytes]: """Opens a url and iterates over its bytes. @@ -84,6 +88,7 @@ async def open_url( Args: url: The url to open content_type: The expected content type + queue: An optional queue to use for progress reporting Yields: AsyncIterator[bytes]: An iterator over the file's bytes @@ -95,7 +100,9 @@ async def open_url( and not set(url.query) & {"st", "se", "sp"} ): url = await self._sign(url) - async for chunk in super().open_url(url, content_type=content_type): + async for chunk in super().open_url( + url, content_type=content_type, queue=queue + ): yield chunk async def _sign(self, url: URL) -> URL: diff --git a/src/stac_asset/s3_client.py b/src/stac_asset/s3_client.py index 831e1f4..82d88ce 100644 --- a/src/stac_asset/s3_client.py +++ b/src/stac_asset/s3_client.py @@ -1,7 +1,8 @@ from __future__ import annotations +from asyncio import Queue from types import TracebackType -from typing import AsyncIterator, Optional, Type +from typing import Any, AsyncIterator, Optional, Type import aiobotocore.session import botocore.config @@ -17,6 +18,7 @@ DEFAULT_S3_RETRY_MODE, Config, ) +from .messages import OpenUrl class S3Client(Client): @@ -69,13 +71,17 @@ def __init__( self.max_attempts = max_attempts async def open_url( - self, url: URL, content_type: Optional[str] = None + self, + url: URL, + content_type: Optional[str] = None, + queue: Optional[Queue[Any]] = None, ) -> AsyncIterator[bytes]: """Opens an s3 url and iterates over its bytes. Args: url: The url to open content_type: The expected content type + queue: An optional queue to use for progress reporting Yields: AsyncIterator[bytes]: An iterator over the file's bytes @@ -107,6 +113,8 @@ async def open_url( response = await client.get_object(**params) if content_type: validate.content_type(response["ContentType"], content_type) + if queue: + await queue.put(OpenUrl(url=url, size=response["ContentLength"])) async for chunk in response["Body"]: yield chunk diff --git a/tests/test_cli.py b/tests/test_cli.py index 7be7a29..034e04e 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -24,7 +24,7 @@ def test_download_item_stdin_stdout(tmp_path: Path, item: Item) -> None: item_as_str = json.dumps( item.to_dict(include_self_link=True, transform_hrefs=False) ) - runner = CliRunner() + runner = CliRunner(mix_stderr=False) result = runner.invoke(stac_asset._cli.cli, ["download"], input=item_as_str) assert result.exit_code == 0, result.stdout Item.from_dict(json.loads(result.stdout)) @@ -41,7 +41,7 @@ def test_download_item_collection_stdin_stdout( item_collection_as_str = json.dumps( item_collection.to_dict(transform_hrefs=False) ) - runner = CliRunner() + runner = CliRunner(mix_stderr=False) result = runner.invoke( stac_asset._cli.cli, ["download"], input=item_collection_as_str ) diff --git a/tests/test_functions.py b/tests/test_functions.py index 24923ce..d3f7a25 100644 --- a/tests/test_functions.py +++ b/tests/test_functions.py @@ -1,5 +1,7 @@ import os.path +from asyncio import Queue from pathlib import Path +from typing import Any import pytest import stac_asset @@ -108,3 +110,9 @@ async def test_multiple_clients(tmp_path: Path, item: Item) -> None: item = await stac_asset.download_item( item, tmp_path, Config(asset_file_name_strategy=FileNameStrategy.KEY) ) + + +async def test_queue(tmp_path: Path, item: Item) -> None: + queue: Queue[Any] = Queue() + item = await stac_asset.download_item(item, tmp_path, queue=queue) + assert not queue.empty()