diff --git a/dandi/cli/cmd_move.py b/dandi/cli/cmd_move.py
index 188dc89d9..b7bae6fb9 100644
--- a/dandi/cli/cmd_move.py
+++ b/dandi/cli/cmd_move.py
@@ -1,7 +1,5 @@
from __future__ import annotations
-from typing import Optional
-
import click
from .base import devel_debug_option, instance_option, map_to_click_exceptions
@@ -47,10 +45,10 @@
@map_to_click_exceptions
def move(
paths: tuple[str],
- dandiset: Optional[str],
+ dandiset: str | None,
dry_run: bool,
existing: str,
- jobs: Optional[int],
+ jobs: int | None,
regex: bool,
work_on: str,
dandi_instance: str,
diff --git a/dandi/cli/cmd_validate.py b/dandi/cli/cmd_validate.py
index 1e13c85b4..2bc9946e8 100644
--- a/dandi/cli/cmd_validate.py
+++ b/dandi/cli/cmd_validate.py
@@ -4,7 +4,7 @@
import logging
import os
import re
-from typing import List, Optional, cast
+from typing import cast
import warnings
import click
@@ -90,10 +90,10 @@ def validate_bids(
@map_to_click_exceptions
def validate(
paths: tuple[str, ...],
- ignore: Optional[str],
+ ignore: str | None,
grouping: str,
min_severity: str,
- schema: Optional[str] = None,
+ schema: str | None = None,
devel_debug: bool = False,
allow_any_path: bool = False,
) -> None:
@@ -139,7 +139,7 @@ def validate(
def _process_issues(
validator_result: Iterable[ValidationResult],
grouping: str,
- ignore: Optional[str] = None,
+ ignore: str | None = None,
) -> None:
issues = [i for i in validator_result if i.severity is not None]
if ignore is not None:
@@ -149,7 +149,7 @@ def _process_issues(
display_errors(
purviews,
[i.id for i in issues],
- cast(List[Severity], [i.severity for i in issues]),
+ cast("list[Severity]", [i.severity for i in issues]),
[i.message for i in issues],
)
elif grouping == "path":
@@ -161,7 +161,7 @@ def _process_issues(
display_errors(
[purview],
[i.id for i in applies_to],
- cast(List[Severity], [i.severity for i in applies_to]),
+ cast("list[Severity]", [i.severity for i in applies_to]),
[i.message for i in applies_to],
)
else:
@@ -185,10 +185,10 @@ def _get_severity_color(severities: list[Severity]) -> str:
def display_errors(
- purviews: list[Optional[str]],
+ purviews: list[str | None],
errors: list[str],
severities: list[Severity],
- messages: list[Optional[str]],
+ messages: list[str | None],
) -> None:
"""
Unified error display for validation CLI, which auto-resolves grouping
diff --git a/dandi/conftest.py b/dandi/conftest.py
index 347f31f1b..3500a7f6c 100644
--- a/dandi/conftest.py
+++ b/dandi/conftest.py
@@ -1,4 +1,4 @@
-from typing import List
+from __future__ import annotations
from _pytest.config import Config
from _pytest.config.argparsing import Parser
@@ -16,7 +16,7 @@ def pytest_addoption(parser: Parser) -> None:
)
-def pytest_collection_modifyitems(items: List[Item], config: Config) -> None:
+def pytest_collection_modifyitems(items: list[Item], config: Config) -> None:
# Based on
if config.getoption("--dandi-api"):
diff --git a/dandi/consts.py b/dandi/consts.py
index 7c0691daf..d8e47350d 100644
--- a/dandi/consts.py
+++ b/dandi/consts.py
@@ -4,7 +4,6 @@
from dataclasses import dataclass
from enum import Enum
import os
-from typing import Optional
#: A list of metadata fields which dandi extracts from .nwb files.
#: Additional fields (such as ``number_of_*``) might be added by
@@ -99,7 +98,7 @@ class EmbargoStatus(Enum):
@dataclass(frozen=True)
class DandiInstance:
name: str
- gui: Optional[str]
+ gui: str | None
api: str
@property
diff --git a/dandi/dandiapi.py b/dandi/dandiapi.py
index 171a44c06..c0f86a064 100644
--- a/dandi/dandiapi.py
+++ b/dandi/dandiapi.py
@@ -1,6 +1,7 @@
from __future__ import annotations
from abc import ABC, abstractmethod
+from collections.abc import Callable, Iterable, Iterator, Sequence
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from datetime import datetime
@@ -12,22 +13,7 @@
import re
from time import sleep, time
from types import TracebackType
-from typing import (
- Any,
- Callable,
- ClassVar,
- Dict,
- FrozenSet,
- Iterable,
- Iterator,
- List,
- Optional,
- Sequence,
- Type,
- TypeVar,
- Union,
- cast,
-)
+from typing import TYPE_CHECKING, Any, ClassVar, Dict, List, Optional
from urllib.parse import quote_plus, urlparse, urlunparse
import click
@@ -59,9 +45,11 @@
is_page2_url,
)
-lgr = get_logger()
+if TYPE_CHECKING:
+ from typing_extensions import Self
-T = TypeVar("T")
+
+lgr = get_logger()
class AssetType(Enum):
@@ -105,8 +93,8 @@ class RESTFullAPIClient:
def __init__(
self,
api_url: str,
- session: Optional[requests.Session] = None,
- headers: Optional[dict] = None,
+ session: requests.Session | None = None,
+ headers: dict | None = None,
) -> None:
"""
:param str api_url: The base HTTP(S) URL to prepend to request paths
@@ -123,18 +111,18 @@ def __init__(
self.session = session
#: Default number of items to request per page when paginating (`None`
#: means to use the server's default)
- self.page_size: Optional[int] = None
+ self.page_size: int | None = None
#: How many pages to fetch at once when parallelizing pagination
self.page_workers: int = 5
- def __enter__(self: T) -> T:
+ def __enter__(self) -> Self:
return self
def __exit__(
self,
- exc_type: Optional[Type[BaseException]],
- exc_val: Optional[BaseException],
- exc_tb: Optional[TracebackType],
+ exc_type: type[BaseException] | None,
+ exc_val: BaseException | None,
+ exc_tb: TracebackType | None,
) -> None:
self.session.close()
@@ -142,14 +130,14 @@ def request(
self,
method: str,
path: str,
- params: Optional[dict] = None,
+ params: dict | None = None,
data: Any = None,
- files: Optional[dict] = None,
+ files: dict | None = None,
json: Any = None,
- headers: Optional[dict] = None,
+ headers: dict | None = None,
json_resp: bool = True,
retry_statuses: Sequence[int] = (),
- retry_if: Optional[Callable[[requests.Response], Any]] = None,
+ retry_if: Callable[[requests.Response], Any] | None = None,
**kwargs: Any,
) -> Any:
"""
@@ -340,8 +328,8 @@ def patch(self, path: str, **kwargs: Any) -> Any:
def paginate(
self,
path: str,
- page_size: Optional[int] = None,
- params: Optional[dict] = None,
+ page_size: int | None = None,
+ params: dict | None = None,
) -> Iterator:
"""
Paginate through the resources at the given path: GET the path, yield
@@ -354,7 +342,7 @@ def paginate(
(default 5) at a time. This behavior requires the initial response to
contain a ``"count"`` key giving the number of items across all pages.
- :param Optional[int] page_size:
+ :param page_size:
If non-`None`, overrides the client's `page_size` attribute for
this sequence of pages
"""
@@ -412,9 +400,9 @@ class DandiAPIClient(RESTFullAPIClient):
def __init__(
self,
- api_url: Optional[str] = None,
- token: Optional[str] = None,
- dandi_instance: Optional[DandiInstance] = None,
+ api_url: str | None = None,
+ token: str | None = None,
+ dandi_instance: DandiInstance | None = None,
) -> None:
"""
Construct a client instance for the given API URL or Dandi instance
@@ -447,8 +435,8 @@ def __init__(
@classmethod
def for_dandi_instance(
cls,
- instance: Union[str, DandiInstance],
- token: Optional[str] = None,
+ instance: str | DandiInstance,
+ token: str | None = None,
authenticate: bool = False,
) -> "DandiAPIClient":
"""
@@ -541,7 +529,7 @@ def _instance_id(self) -> str:
return self.dandi_instance.name.upper()
def get_dandiset(
- self, dandiset_id: str, version_id: Optional[str] = None, lazy: bool = True
+ self, dandiset_id: str, version_id: str | None = None, lazy: bool = True
) -> "RemoteDandiset":
"""
Fetches the Dandiset with the given ``dandiset_id``. If ``version_id``
@@ -577,13 +565,13 @@ def get_dandisets(self) -> Iterator["RemoteDandiset"]:
for data in self.paginate("/dandisets/"):
yield RemoteDandiset.from_data(self, data)
- def create_dandiset(self, name: str, metadata: Dict[str, Any]) -> "RemoteDandiset":
+ def create_dandiset(self, name: str, metadata: dict[str, Any]) -> "RemoteDandiset":
"""Creates a Dandiset with the given name & metadata"""
return RemoteDandiset.from_data(
self, self.post("/dandisets/", json={"name": name, "metadata": metadata})
)
- def check_schema_version(self, schema_version: Optional[str] = None) -> None:
+ def check_schema_version(self, schema_version: str | None = None) -> None:
"""
Confirms that the server is using the same version of the Dandi schema
as the client. If it is not, a `SchemaVersionError` is raised.
@@ -633,17 +621,16 @@ class APIBase(BaseModel):
detail; do not rely on it.
"""
- JSON_EXCLUDE: ClassVar[FrozenSet[str]] = frozenset(["client"])
+ JSON_EXCLUDE: ClassVar[frozenset[str]] = frozenset(["client"])
- def json_dict(self) -> Dict[str, Any]:
+ def json_dict(self) -> dict[str, Any]:
"""
Convert to a JSONable `dict`, omitting the ``client`` attribute and
using the same field names as in the API
"""
- return cast(
- Dict[str, Any],
- json.loads(self.json(exclude=self.JSON_EXCLUDE, by_alias=True)),
- )
+ data = json.loads(self.json(exclude=self.JSON_EXCLUDE, by_alias=True))
+ assert isinstance(data, dict)
+ return data
class Config:
allow_population_by_field_name = True
@@ -743,16 +730,16 @@ def __init__(
self,
client: DandiAPIClient,
identifier: str,
- version: Union[str, Version, None] = None,
- data: Union[Dict[str, Any], RemoteDandisetData, None] = None,
+ version: str | Version | None = None,
+ data: dict[str, Any] | RemoteDandisetData | None = None,
) -> None:
#: The `DandiAPIClient` instance that returned this `RemoteDandiset`
#: and which the latter will use for API requests
self.client: DandiAPIClient = client
#: The Dandiset identifier
self.identifier: str = identifier
- self._version_id: Optional[str]
- self._version: Optional[Version]
+ self._version_id: str | None
+ self._version: Version | None
if version is None:
self._version_id = None
self._version = None
@@ -762,7 +749,7 @@ def __init__(
else:
self._version_id = version.identifier
self._version = version
- self._data: Optional[RemoteDandisetData]
+ self._data: RemoteDandisetData | None
if data is not None:
self._data = RemoteDandisetData.parse_obj(data)
else:
@@ -830,7 +817,7 @@ def embargo_status(self) -> EmbargoStatus:
return self._get_data().embargo_status
@property
- def most_recent_published_version(self) -> Optional[Version]:
+ def most_recent_published_version(self) -> Version | None:
"""
The most recent published (non-draft) version of the Dandiset, or
`None` if no versions have been published
@@ -876,9 +863,7 @@ def version_api_url(self) -> str:
return self.client.get_url(self.version_api_path)
@classmethod
- def from_data(
- cls, client: "DandiAPIClient", data: Dict[str, Any]
- ) -> "RemoteDandiset":
+ def from_data(cls, client: DandiAPIClient, data: dict[str, Any]) -> RemoteDandiset:
"""
Construct a `RemoteDandiset` instance from a `DandiAPIClient` and a
`dict` of raw string fields in the same format as returned by the API.
@@ -896,7 +881,7 @@ def from_data(
client=client, identifier=data["identifier"], version=version, data=data
)
- def json_dict(self) -> Dict[str, Any]:
+ def json_dict(self) -> dict[str, Any]:
"""
Convert to a JSONable `dict`, omitting the ``client`` attribute and
using the same field names as in the API
@@ -918,7 +903,7 @@ def refresh(self) -> None:
# Clear _version so it will be refetched the next time it is accessed
self._version = None
- def get_versions(self, order: Optional[str] = None) -> Iterator[Version]:
+ def get_versions(self, order: str | None = None) -> Iterator[Version]:
"""
Returns an iterator of all available `Version`\\s for the Dandiset
@@ -956,7 +941,7 @@ def get_version(self, version_id: str) -> VersionInfo:
f"No such version: {version_id!r} of Dandiset {self.identifier}"
)
- def for_version(self, version_id: Union[str, Version]) -> "RemoteDandiset":
+ def for_version(self, version_id: str | Version) -> RemoteDandiset:
"""
Returns a copy of the `RemoteDandiset` with the `version` attribute set
to the given `Version` object or the `Version` with the given version
@@ -988,13 +973,15 @@ def get_metadata(self) -> models.Dandiset:
"""
return models.Dandiset.parse_obj(self.get_raw_metadata())
- def get_raw_metadata(self) -> Dict[str, Any]:
+ def get_raw_metadata(self) -> dict[str, Any]:
"""
Fetch the metadata for this version of the Dandiset as an unprocessed
`dict`
"""
try:
- return cast(Dict[str, Any], self.client.get(self.version_api_path))
+ data = self.client.get(self.version_api_path)
+ assert isinstance(data, dict)
+ return data
except HTTP404Error:
raise NotFoundError(f"No such asset: {self}")
@@ -1004,7 +991,7 @@ def set_metadata(self, metadata: models.Dandiset) -> None:
"""
self.set_raw_metadata(metadata.json_dict())
- def set_raw_metadata(self, metadata: Dict[str, Any]) -> None:
+ def set_raw_metadata(self, metadata: dict[str, Any]) -> None:
"""
Set the metadata for this version of the Dandiset to the given value
"""
@@ -1067,7 +1054,7 @@ def publish(self, max_time: float = 120) -> "RemoteDandiset":
f"No published versions found for Dandiset {self.identifier}"
)
- def get_assets(self, order: Optional[str] = None) -> Iterator["RemoteAsset"]:
+ def get_assets(self, order: str | None = None) -> Iterator[RemoteAsset]:
"""
Returns an iterator of all assets in this version of the Dandiset.
@@ -1099,8 +1086,8 @@ def get_asset(self, asset_id: str) -> "RemoteAsset":
return RemoteAsset.from_data(self, info, metadata)
def get_assets_with_path_prefix(
- self, path: str, order: Optional[str] = None
- ) -> Iterator["RemoteAsset"]:
+ self, path: str, order: str | None = None
+ ) -> Iterator[RemoteAsset]:
"""
Returns an iterator of all assets in this version of the Dandiset whose
`~RemoteAsset.path` attributes start with ``path``
@@ -1122,8 +1109,8 @@ def get_assets_with_path_prefix(
)
def get_assets_by_glob(
- self, pattern: str, order: Optional[str] = None
- ) -> Iterator["RemoteAsset"]:
+ self, pattern: str, order: str | None = None
+ ) -> Iterator[RemoteAsset]:
"""
.. versionadded:: 0.44.0
@@ -1166,7 +1153,7 @@ def get_asset_by_path(self, path: str) -> "RemoteAsset":
def download_directory(
self,
assets_dirpath: str,
- dirpath: Union[str, Path],
+ dirpath: str | Path,
chunk_size: int = MAX_CHUNK_SIZE,
) -> None:
"""
@@ -1183,11 +1170,11 @@ def download_directory(
def upload_raw_asset(
self,
- filepath: Union[str, Path],
- asset_metadata: Dict[str, Any],
- jobs: Optional[int] = None,
- replace_asset: Optional["RemoteAsset"] = None,
- ) -> "RemoteAsset":
+ filepath: str | Path,
+ asset_metadata: dict[str, Any],
+ jobs: int | None = None,
+ replace_asset: RemoteAsset | None = None,
+ ) -> RemoteAsset:
"""
Upload the file at ``filepath`` with metadata ``asset_metadata`` to
this version of the Dandiset and return the resulting asset. Blocks
@@ -1218,10 +1205,10 @@ def upload_raw_asset(
def iter_upload_raw_asset(
self,
- filepath: Union[str, Path],
- asset_metadata: Dict[str, Any],
- jobs: Optional[int] = None,
- replace_asset: Optional["RemoteAsset"] = None,
+ filepath: str | Path,
+ asset_metadata: dict[str, Any],
+ jobs: int | None = None,
+ replace_asset: RemoteAsset | None = None,
) -> Iterator[dict]:
"""
Upload the file at ``filepath`` with metadata ``asset_metadata`` to
@@ -1304,10 +1291,10 @@ def __str__(self) -> str:
@classmethod
def from_base_data(
cls,
- client: "DandiAPIClient",
- data: Dict[str, Any],
- metadata: Optional[Dict[str, Any]] = None,
- ) -> "BaseRemoteAsset":
+ client: DandiAPIClient,
+ data: dict[str, Any],
+ metadata: dict[str, Any] | None = None,
+ ) -> BaseRemoteAsset:
"""
Construct a `BaseRemoteAsset` instance from a `DandiAPIClient`, a
`dict` of raw data in the same format as returned by the API's
@@ -1316,7 +1303,7 @@ def from_base_data(
This is a low-level method that non-developers would normally only use
when acquiring data using means outside of this library.
"""
- klass: Type[BaseRemoteAsset]
+ klass: type[BaseRemoteAsset]
if data.get("blob") is not None:
klass = BaseRemoteBlobAsset
if data.pop("zarr", None) is not None:
@@ -1360,19 +1347,19 @@ def get_metadata(self) -> models.Asset:
"""
return models.Asset.parse_obj(self.get_raw_metadata())
- def get_raw_metadata(self) -> Dict[str, Any]:
+ def get_raw_metadata(self) -> dict[str, Any]:
"""Fetch the metadata for the asset as an unprocessed `dict`"""
if self._metadata is not None:
return self._metadata
else:
try:
- return cast(Dict[str, Any], self.client.get(self.api_path))
+ data = self.client.get(self.api_path)
+ assert isinstance(data, dict)
+ return data
except HTTP404Error:
raise NotFoundError(f"No such asset: {self}")
- def get_raw_digest(
- self, digest_type: Union[str, models.DigestType, None] = None
- ) -> str:
+ def get_raw_digest(self, digest_type: str | models.DigestType | None = None) -> str:
"""
Retrieves the value of the given type of digest from the asset's
metadata. Raises `NotFoundError` if there is no entry for the given
@@ -1390,9 +1377,11 @@ def get_raw_digest(
digest_type = digest_type.value
metadata = self.get_raw_metadata()
try:
- return cast(str, metadata["digest"][digest_type])
+ digest = metadata["digest"][digest_type]
except KeyError:
raise NotFoundError(f"No {digest_type} digest found in metadata")
+ assert isinstance(digest, str)
+ return digest
def get_digest(self) -> Digest:
"""
@@ -1410,7 +1399,7 @@ def get_digest(self) -> Digest:
def get_content_url(
self,
regex: str = r".*",
- follow_redirects: Union[bool, int] = False,
+ follow_redirects: bool | int = False,
strip_query: bool = False,
) -> str:
"""
@@ -1492,9 +1481,7 @@ def downloader(start_at: int = 0) -> Iterator[bytes]:
return downloader
- def download(
- self, filepath: Union[str, Path], chunk_size: int = MAX_CHUNK_SIZE
- ) -> None:
+ def download(self, filepath: str | Path, chunk_size: int = MAX_CHUNK_SIZE) -> None:
"""
Download the asset to ``filepath``. Blocks until the download is
complete.
@@ -1569,7 +1556,7 @@ def as_readable(self) -> RemoteReadableAsset:
r = requests.head(url)
r.raise_for_status()
size = int(r.headers["Content-Length"])
- mtime: Optional[datetime]
+ mtime: datetime | None
try:
mtime = ensure_datetime(md["blobDateModified"])
except (KeyError, TypeError, ValueError):
@@ -1598,7 +1585,7 @@ def asset_type(self) -> AssetType:
"""
return AssetType.ZARR
- def iterfiles(self, prefix: Optional[str] = None) -> Iterator[RemoteZarrEntry]:
+ def iterfiles(self, prefix: str | None = None) -> Iterator[RemoteZarrEntry]:
"""
Returns a generator of all `RemoteZarrEntry`\\s within the Zarr,
optionally limited to those whose path starts with the given prefix
@@ -1677,9 +1664,9 @@ class RemoteAsset(BaseRemoteAsset):
def from_data(
cls,
dandiset: RemoteDandiset,
- data: Dict[str, Any],
- metadata: Optional[Dict[str, Any]] = None,
- ) -> "RemoteAsset":
+ data: dict[str, Any],
+ metadata: dict[str, Any] | None = None,
+ ) -> RemoteAsset:
"""
Construct a `RemoteAsset` instance from a `RemoteDandiset`, a `dict` of
raw data in the same format as returned by the API's pagination
@@ -1688,7 +1675,7 @@ def from_data(
This is a low-level method that non-developers would normally only use
when acquiring data using means outside of this library.
"""
- klass: Type[RemoteAsset]
+ klass: type[RemoteAsset]
if data.get("blob") is not None:
klass = RemoteBlobAsset
if data.pop("zarr", None) is not None:
@@ -1739,7 +1726,7 @@ def set_metadata(self, metadata: models.Asset) -> None:
return self.set_raw_metadata(metadata.json_dict())
@abstractmethod
- def set_raw_metadata(self, metadata: Dict[str, Any]) -> None:
+ def set_raw_metadata(self, metadata: dict[str, Any]) -> None:
"""
Set the metadata for the asset on the server to the given value and
update the `RemoteAsset` in place.
@@ -1770,7 +1757,7 @@ class RemoteBlobAsset(RemoteAsset, BaseRemoteBlobAsset):
A `RemoteAsset` whose actual data is a blob resource
"""
- def set_raw_metadata(self, metadata: Dict[str, Any]) -> None:
+ def set_raw_metadata(self, metadata: dict[str, Any]) -> None:
"""
Set the metadata for the asset on the server to the given value and
update the `RemoteBlobAsset` in place.
@@ -1793,7 +1780,7 @@ class RemoteZarrAsset(RemoteAsset, BaseRemoteZarrAsset):
A `RemoteAsset` whose actual data is a Zarr resource
"""
- def set_raw_metadata(self, metadata: Dict[str, Any]) -> None:
+ def set_raw_metadata(self, metadata: dict[str, Any]) -> None:
"""
Set the metadata for the asset on the server to the given value and
update the `RemoteZarrAsset` in place.
@@ -1868,7 +1855,7 @@ def suffix(self) -> str:
return ""
@property
- def suffixes(self) -> List[str]:
+ def suffixes(self) -> list[str]:
"""A list of the basename's file extensions"""
if self.name.endswith("."):
return []
diff --git a/dandi/dandiarchive.py b/dandi/dandiarchive.py
index ebbd6d96b..3bed6549e 100644
--- a/dandi/dandiarchive.py
+++ b/dandi/dandiarchive.py
@@ -28,11 +28,12 @@
from __future__ import annotations
from abc import ABC, abstractmethod
+from collections.abc import Iterable, Iterator
from contextlib import contextmanager
import posixpath
import re
from time import sleep
-from typing import Any, Dict, Iterable, Iterator, List, Optional, Tuple, cast
+from typing import Any
from urllib.parse import unquote as urlunquote
from pydantic import AnyHttpUrl, BaseModel, parse_obj_as
@@ -69,17 +70,19 @@ class ParsedDandiURL(ABC, BaseModel):
#: The Dandi Archive instance that the URL points to
instance: DandiInstance
#: The ID of the Dandiset given in the URL
- dandiset_id: Optional[str]
+ dandiset_id: str | None
#: The version of the Dandiset, if specified. If this is not set, the
#: version will be defaulted using the rules described under
#: `DandiAPIClient.get_dandiset()`.
- version_id: Optional[str] = None
+ version_id: str | None = None
@property
def api_url(self) -> AnyHttpUrl:
"""The base URL of the Dandi API service, without a trailing slash"""
# Kept for backwards compatibility
- return cast(AnyHttpUrl, parse_obj_as(AnyHttpUrl, self.instance.api.rstrip("/")))
+ r = parse_obj_as(AnyHttpUrl, self.instance.api.rstrip("/"))
+ assert isinstance(r, AnyHttpUrl)
+ return r # type: ignore[no-any-return]
def get_client(self) -> DandiAPIClient:
"""
@@ -90,7 +93,7 @@ def get_client(self) -> DandiAPIClient:
def get_dandiset(
self, client: DandiAPIClient, lazy: bool = True
- ) -> Optional[RemoteDandiset]:
+ ) -> RemoteDandiset | None:
"""
Returns information about the specified (or default) version of the
specified Dandiset. Returns `None` if the URL did not contain a
@@ -106,7 +109,7 @@ def get_dandiset(
@abstractmethod
def get_assets(
- self, client: DandiAPIClient, order: Optional[str] = None, strict: bool = False
+ self, client: DandiAPIClient, order: str | None = None, strict: bool = False
) -> Iterator[BaseRemoteAsset]:
"""
Returns an iterator of asset structures for the assets referred to by
@@ -136,9 +139,9 @@ def get_asset_ids(self, client: DandiAPIClient) -> Iterator[str]:
@contextmanager
def navigate(
- self, *, strict: bool = False, authenticate: Optional[bool] = None
+ self, *, strict: bool = False, authenticate: bool | None = None
) -> Iterator[
- Tuple[DandiAPIClient, Optional[RemoteDandiset], Iterable[BaseRemoteAsset]]
+ tuple[DandiAPIClient, RemoteDandiset | None, Iterable[BaseRemoteAsset]]
]:
"""
A context manager that returns a triple of a
@@ -218,7 +221,7 @@ class DandisetURL(ParsedDandiURL):
"""
def get_assets(
- self, client: DandiAPIClient, order: Optional[str] = None, strict: bool = False
+ self, client: DandiAPIClient, order: str | None = None, strict: bool = False
) -> Iterator[BaseRemoteAsset]:
"""Returns all assets in the Dandiset"""
with _maybe_strict(strict):
@@ -270,7 +273,7 @@ class BaseAssetIDURL(SingleAssetURL):
asset_id: str
def get_assets(
- self, client: DandiAPIClient, order: Optional[str] = None, strict: bool = False
+ self, client: DandiAPIClient, order: str | None = None, strict: bool = False
) -> Iterator[BaseRemoteAsset]:
"""
Yields the asset with the given ID. If the asset does not exist, then
@@ -286,9 +289,9 @@ def get_asset_ids(self, client: DandiAPIClient) -> Iterator[str]:
@contextmanager
def navigate(
- self, *, strict: bool = False, authenticate: Optional[bool] = None
+ self, *, strict: bool = False, authenticate: bool | None = None
) -> Iterator[
- Tuple[DandiAPIClient, Optional[RemoteDandiset], Iterable[BaseRemoteAsset]]
+ tuple[DandiAPIClient, RemoteDandiset | None, Iterable[BaseRemoteAsset]]
]:
with self.get_client() as client:
if authenticate:
@@ -318,7 +321,7 @@ class AssetIDURL(SingleAssetURL):
asset_id: str
def get_assets(
- self, client: DandiAPIClient, order: Optional[str] = None, strict: bool = False
+ self, client: DandiAPIClient, order: str | None = None, strict: bool = False
) -> Iterator[BaseRemoteAsset]:
"""
Yields the asset with the given ID. If the Dandiset or asset does not
@@ -341,7 +344,7 @@ class AssetPathPrefixURL(MultiAssetURL):
"""
def get_assets(
- self, client: DandiAPIClient, order: Optional[str] = None, strict: bool = False
+ self, client: DandiAPIClient, order: str | None = None, strict: bool = False
) -> Iterator[BaseRemoteAsset]:
"""
Returns the assets whose paths start with `path`. If `strict` is true
@@ -369,7 +372,7 @@ class AssetItemURL(SingleAssetURL):
path: str
def get_assets(
- self, client: DandiAPIClient, order: Optional[str] = None, strict: bool = False
+ self, client: DandiAPIClient, order: str | None = None, strict: bool = False
) -> Iterator[BaseRemoteAsset]:
"""
Yields the asset whose path equals `path`. If there is no such asset:
@@ -417,7 +420,7 @@ class AssetFolderURL(MultiAssetURL):
"""
def get_assets(
- self, client: DandiAPIClient, order: Optional[str] = None, strict: bool = False
+ self, client: DandiAPIClient, order: str | None = None, strict: bool = False
) -> Iterator[BaseRemoteAsset]:
"""
Returns all assets under the folder at `path`. If the folder does not
@@ -451,7 +454,7 @@ class AssetGlobURL(MultiAssetURL):
"""
def get_assets(
- self, client: DandiAPIClient, order: Optional[str] = None, strict: bool = False
+ self, client: DandiAPIClient, order: str | None = None, strict: bool = False
) -> Iterator[BaseRemoteAsset]:
"""
Returns all assets whose paths match the glob pattern `path`. If
@@ -493,10 +496,8 @@ def _maybe_strict(strict: bool) -> Iterator[None]:
@contextmanager
def navigate_url(
- url: str, *, strict: bool = False, authenticate: Optional[bool] = None
-) -> Iterator[
- Tuple[DandiAPIClient, Optional[RemoteDandiset], Iterable[BaseRemoteAsset]]
-]:
+ url: str, *, strict: bool = False, authenticate: bool | None = None
+) -> Iterator[tuple[DandiAPIClient, RemoteDandiset | None, Iterable[BaseRemoteAsset]]]:
"""
A context manager that takes a URL pointing to a DANDI Archive and
returns a triple of a `~dandi.dandiapi.DandiAPIClient` (with an open
@@ -512,7 +513,7 @@ def navigate_url(
If true, then `get_dandiset()` is called with ``lazy=False`` and
`get_assets()` is called with ``strict=True``; if false, the opposite
occurs.
- :param Optional[bool] authenticate:
+ :param authenticate:
If true, then `~dandi.dandiapi.DandiAPIClient.dandi_authenticate()`
will be called on the client before returning it. If `None` (the
default), the method will only be called if the URL requires
@@ -537,7 +538,7 @@ class _dandi_url_parser:
dandiset_id_grp = f"(?P{DANDISET_ID_REGEX})"
# Should absorb port and "api/":
server_grp = "(?P(?Phttps?)://(?P[^/]+)/(api/)?)"
- known_urls: List[Tuple[re.Pattern[str], Dict[str, Any], str]] = [
+ known_urls: list[tuple[re.Pattern[str], dict[str, Any], str]] = [
# List of (regex, settings, display string) triples
#
# Settings:
diff --git a/dandi/dandiset.py b/dandi/dandiset.py
index 0a02a903d..63ebbfb1b 100644
--- a/dandi/dandiset.py
+++ b/dandi/dandiset.py
@@ -5,7 +5,7 @@
from dataclasses import dataclass
import os.path
from pathlib import Path, PurePath, PurePosixPath
-from typing import Optional, TypeVar
+from typing import TYPE_CHECKING
from dandischema.models import get_schema_version
@@ -14,9 +14,10 @@
from .files import DandisetMetadataFile, LocalAsset, dandi_file, find_dandi_files
from .utils import find_parent_directory_containing, under_paths, yaml_dump, yaml_load
-lgr = get_logger()
+if TYPE_CHECKING:
+ from typing_extensions import Self
-D = TypeVar("D", bound="Dandiset")
+lgr = get_logger()
class Dandiset:
@@ -28,7 +29,7 @@ def __init__(
self,
path: str | Path,
allow_empty: bool = False,
- schema_version: Optional[str] = None,
+ schema_version: str | None = None,
) -> None:
if schema_version is not None:
current_version = get_schema_version()
@@ -42,12 +43,12 @@ def __init__(
self.path_obj / dandiset_metadata_file
):
raise ValueError(f"No dandiset at {path}")
- self.metadata: Optional[dict] = None
+ self.metadata: dict | None = None
self._metadata_file_obj = self.path_obj / dandiset_metadata_file
self._load_metadata()
@classmethod
- def find(cls: type[D], path: str | Path | None) -> Optional[D]:
+ def find(cls, path: str | Path | None) -> Self | None:
"""Find a dandiset possibly pointing to a directory within it"""
dandiset_path = find_parent_directory_containing(dandiset_metadata_file, path)
if dandiset_path is not None:
@@ -112,7 +113,7 @@ def update_metadata(self, meta: dict) -> None:
self._load_metadata()
@staticmethod
- def _get_identifier(metadata: dict) -> Optional[str]:
+ def _get_identifier(metadata: dict) -> str | None:
"""Given a metadata record, determine identifier"""
# ATM since we have dichotomy in dandiset metadata schema from drafts
# and from published versions, we will just test both locations
diff --git a/dandi/delete.py b/dandi/delete.py
index 4d2486c0d..eb3684bf1 100644
--- a/dandi/delete.py
+++ b/dandi/delete.py
@@ -1,9 +1,9 @@
from __future__ import annotations
+from collections.abc import Iterable, Iterator
from dataclasses import dataclass, field
from operator import attrgetter
from pathlib import Path
-from typing import Iterable, Iterator, List, Optional, Tuple
import click
@@ -20,12 +20,12 @@ class Deleter:
Class for registering assets & Dandisets to delete and then deleting them
"""
- client: Optional[DandiAPIClient] = None
- dandiset: Optional[RemoteDandiset] = None
+ client: DandiAPIClient | None = None
+ dandiset: RemoteDandiset | None = None
#: Whether we are deleting an entire Dandiset (true) or just assets (false)
deleting_dandiset: bool = False
skip_missing: bool = False
- remote_assets: List[RemoteAsset] = field(default_factory=list)
+ remote_assets: list[RemoteAsset] = field(default_factory=list)
def __bool__(self) -> bool:
return self.deleting_dandiset or bool(self.remote_assets)
@@ -191,7 +191,7 @@ def delete(
paths: Iterable[str],
dandi_instance: str | DandiInstance = "dandi",
devel_debug: bool = False,
- jobs: Optional[int] = None,
+ jobs: int | None = None,
force: bool = False,
skip_missing: bool = False,
) -> None:
@@ -226,7 +226,7 @@ def delete(
out(r)
-def find_local_asset(filepath: str) -> Tuple[str, str]:
+def find_local_asset(filepath: str) -> tuple[str, str]:
"""
Given a path to a local file, return the ID of the Dandiset in which it is
located and the path to the file relative to the root of said Dandiset. If
diff --git a/dandi/download.py b/dandi/download.py
index f0c444f81..f29328e67 100644
--- a/dandi/download.py
+++ b/dandi/download.py
@@ -1,6 +1,7 @@
from __future__ import annotations
from collections import Counter, deque
+from collections.abc import Callable, Iterable, Iterator, Sequence
from dataclasses import InitVar, dataclass, field
from datetime import datetime
from enum import Enum
@@ -16,22 +17,7 @@
from threading import Lock
import time
from types import TracebackType
-from typing import (
- IO,
- Any,
- Callable,
- Dict,
- Iterable,
- Iterator,
- List,
- Literal,
- Optional,
- Protocol,
- Sequence,
- Tuple,
- Type,
- Union,
-)
+from typing import IO, Any, Literal, Protocol
from dandischema.models import DigestType
from fasteners import InterProcessLock
@@ -64,13 +50,13 @@
def download(
- urls: Union[str, Sequence[str]],
- output_dir: Union[str, Path],
+ urls: str | Sequence[str],
+ output_dir: str | Path,
*,
format: str = "pyout",
existing: str = "error",
jobs: int = 1,
- jobs_per_zarr: Optional[int] = None,
+ jobs_per_zarr: int | None = None,
get_metadata: bool = True,
get_assets: bool = True,
sync: bool = False,
@@ -184,13 +170,13 @@ class Downloader:
existing: str
get_metadata: bool
get_assets: bool
- jobs_per_zarr: Optional[int]
+ jobs_per_zarr: int | None
on_error: Literal["raise", "yield"]
#: which will be set .gen to assets. Purpose is to make it possible to get
#: summary statistics while already downloading. TODO: reimplement
#: properly!
- assets_it: Optional[IteratorWithAggregation] = None
- yield_generator_for_fields: Optional[tuple[str, ...]] = None
+ assets_it: IteratorWithAggregation | None = None
+ yield_generator_for_fields: tuple[str, ...] | None = None
asset_download_paths: set[str] = field(init=False, default_factory=set)
def __post_init__(self, output_dir: str | Path) -> None:
@@ -350,7 +336,7 @@ class ItemsSummary:
def __init__(self) -> None:
self.files = 0
# TODO: get rid of needing it
- self.t0: Optional[float] = None # when first record is seen
+ self.t0: float | None = None # when first record is seen
self.size = 0
self.has_unknown_sizes = False
@@ -362,7 +348,7 @@ def as_dict(self) -> dict:
}
# TODO: Determine the proper annotation for `rec`
- def __call__(self, rec: Any, prior: Optional[ItemsSummary] = None) -> ItemsSummary:
+ def __call__(self, rec: Any, prior: ItemsSummary | None = None) -> ItemsSummary:
assert prior in (None, self)
if not self.files:
self.t0 = time.time()
@@ -397,7 +383,7 @@ def agg_files(self, *ignored: Any) -> str:
ret += "+"
return ret
- def agg_size(self, sizes: Iterable[int]) -> Union[str, List[str]]:
+ def agg_size(self, sizes: Iterable[int]) -> str | list[str]:
"""Formatter for "size" column where it would show
how much is "active" (or done)
@@ -422,7 +408,7 @@ def agg_size(self, sizes: Iterable[int]) -> Union[str, List[str]]:
v.append(extra_str)
return v
- def agg_done(self, done_sizes: Iterator[int]) -> List[str]:
+ def agg_done(self, done_sizes: Iterator[int]) -> list[str]:
"""Formatter for "DONE" column"""
done = sum(done_sizes)
if self.it.finished and done == 0 and self.items_summary.size == 0:
@@ -461,7 +447,7 @@ def _skip_file(msg: Any) -> dict:
def _populate_dandiset_yaml(
- dandiset_path: Union[str, Path], dandiset: RemoteDandiset, existing: str
+ dandiset_path: str | Path, dandiset: RemoteDandiset, existing: str
) -> Iterator[dict]:
metadata = dandiset.get_raw_metadata()
if not metadata:
@@ -513,13 +499,13 @@ def hexdigest(self) -> str:
def _download_file(
downloader: Callable[[int], Iterator[bytes]],
path: Path,
- toplevel_path: Union[str, Path],
+ toplevel_path: str | Path,
lock: Lock,
- size: Optional[int] = None,
- mtime: Optional[datetime] = None,
+ size: int | None = None,
+ mtime: datetime | None = None,
existing: str = "error",
- digests: Optional[Dict[str, str]] = None,
- digest_callback: Optional[Callable[[str, str], Any]] = None,
+ digests: dict[str, str] | None = None,
+ digest_callback: Callable[[str, str], Any] | None = None,
) -> Iterator[dict]:
"""
Common logic for downloading a single file.
@@ -636,10 +622,10 @@ def _download_file(
yield {"status": "downloading"}
- algo: Optional[str] = None
- digester: Optional[Callable[[], Hasher]] = None
- digest: Optional[str] = None
- downloaded_digest: Optional[Hasher] = None
+ algo: str | None = None
+ digester: Callable[[], Hasher] | None = None
+ digest: str | None = None
+ downloaded_digest: Hasher | None = None
if digests:
# choose first available for now.
# TODO: reuse that sorting based on speed
@@ -683,7 +669,7 @@ def _download_file(
downloaded_digest.update(block)
downloaded += len(block)
# TODO: yield progress etc
- out: Dict[str, Any] = {"done": downloaded}
+ out: dict[str, Any] = {"done": downloaded}
if size:
if downloaded > size and not warned:
warned = True
@@ -753,7 +739,7 @@ def _download_file(
class DownloadDirectory:
- def __init__(self, filepath: Union[str, Path], digests: Dict[str, str]) -> None:
+ def __init__(self, filepath: str | Path, digests: dict[str, str]) -> None:
#: The path to which to save the file after downloading
self.filepath = Path(filepath)
#: Expected hashes of the downloaded data, as a mapping from algorithm
@@ -766,11 +752,11 @@ def __init__(self, filepath: Union[str, Path], digests: Dict[str, str]) -> None:
#: received
self.writefile = self.dirpath / "file"
#: A `fasteners.InterProcessLock` on `dirpath`
- self.lock: Optional[InterProcessLock] = None
+ self.lock: InterProcessLock | None = None
#: An open filehandle to `writefile`
- self.fp: Optional[IO[bytes]] = None
+ self.fp: IO[bytes] | None = None
#: How much of the data has been downloaded so far
- self.offset: Optional[int] = None
+ self.offset: int | None = None
def __enter__(self) -> DownloadDirectory:
self.dirpath.mkdir(parents=True, exist_ok=True)
@@ -812,9 +798,9 @@ def __enter__(self) -> DownloadDirectory:
def __exit__(
self,
- exc_type: Optional[Type[BaseException]],
- exc_val: Optional[BaseException],
- exc_tb: Optional[TracebackType],
+ exc_type: type[BaseException] | None,
+ exc_val: BaseException | None,
+ exc_tb: TracebackType | None,
) -> None:
assert self.fp is not None
self.fp.close()
@@ -845,10 +831,10 @@ def append(self, blob: bytes) -> None:
def _download_zarr(
asset: BaseRemoteZarrAsset,
download_path: Path,
- toplevel_path: Union[str, Path],
+ toplevel_path: str | Path,
existing: str,
lock: Lock,
- jobs: Optional[int] = None,
+ jobs: int | None = None,
) -> Iterator[dict]:
download_gens = {}
entries = list(asset.iterfiles())
@@ -874,7 +860,7 @@ def digest_callback(path: str, algoname: str, d: str) -> None:
)
pc = ProgressCombiner(zarr_size=asset.size, file_qty=len(download_gens))
- final_out: Optional[dict] = None
+ final_out: dict | None = None
with interleave(
[pairing(p, gen) for p, gen in download_gens.items()],
onerror=FINISH_CURRENT,
@@ -942,7 +928,7 @@ def digest_callback(path: str, algoname: str, d: str) -> None:
yield {"status": "done"}
-def pairing(p: str, gen: Iterator[dict]) -> Iterator[Tuple[str, dict]]:
+def pairing(p: str, gen: Iterator[dict]) -> Iterator[tuple[str, dict]]:
for d in gen:
yield (p, d)
@@ -954,14 +940,14 @@ def pairing(p: str, gen: Iterator[dict]) -> Iterator[Tuple[str, dict]]:
class DownloadProgress:
state: DLState = DLState.STARTING
downloaded: int = 0
- size: Optional[int] = None
+ size: int | None = None
@dataclass
class ProgressCombiner:
zarr_size: int
file_qty: int
- files: Dict[str, DownloadProgress] = field(default_factory=dict)
+ files: dict[str, DownloadProgress] = field(default_factory=dict)
#: Total size of all files that were not skipped and did not error out
#: during download
maxsize: int = 0
diff --git a/dandi/exceptions.py b/dandi/exceptions.py
index 1a6c6111f..c317e5b9a 100644
--- a/dandi/exceptions.py
+++ b/dandi/exceptions.py
@@ -1,4 +1,4 @@
-from typing import List
+from __future__ import annotations
import requests
from semantic_version import Version
@@ -41,7 +41,7 @@ class CliVersionError(RuntimeError):
"""Base class for `CliVersionTooOldError` and `BadCliVersionError`"""
def __init__(
- self, our_version: Version, minversion: Version, bad_versions: List[Version]
+ self, our_version: Version, minversion: Version, bad_versions: list[Version]
) -> None:
self.our_version = our_version
self.minversion = minversion
diff --git a/dandi/files/__init__.py b/dandi/files/__init__.py
index 4122d7afb..1d20fbc54 100644
--- a/dandi/files/__init__.py
+++ b/dandi/files/__init__.py
@@ -15,7 +15,6 @@
from collections.abc import Iterator
import os.path
from pathlib import Path
-from typing import Optional
from dandi import get_logger
from dandi.consts import BIDS_DATASET_DESCRIPTION, dandiset_metadata_file
@@ -68,7 +67,7 @@
def find_dandi_files(
*paths: str | Path,
- dandiset_path: Optional[str | Path] = None,
+ dandiset_path: str | Path | None = None,
allow_all: bool = False,
include_metadata: bool = False,
) -> Iterator[DandiFile]:
@@ -97,7 +96,7 @@ def find_dandi_files(
# A pair of each file or directory being considered plus the most recent
# BIDS dataset_description.json file at the path (if a directory) or in a
# parent path
- path_queue: deque[tuple[Path, Optional[BIDSDatasetDescriptionAsset]]] = deque()
+ path_queue: deque[tuple[Path, BIDSDatasetDescriptionAsset | None]] = deque()
for p in map(Path, paths):
if dandiset_path is not None:
try:
@@ -156,8 +155,8 @@ def find_dandi_files(
def dandi_file(
filepath: str | Path,
- dandiset_path: Optional[str | Path] = None,
- bids_dataset_description: Optional[BIDSDatasetDescriptionAsset] = None,
+ dandiset_path: str | Path | None = None,
+ bids_dataset_description: BIDSDatasetDescriptionAsset | None = None,
) -> DandiFile:
"""
Return a `DandiFile` instance of the appropriate type for the file at
@@ -197,8 +196,8 @@ def dandi_file(
def find_bids_dataset_description(
- dirpath: str | Path, dandiset_path: Optional[str | Path] = None
-) -> Optional[BIDSDatasetDescriptionAsset]:
+ dirpath: str | Path, dandiset_path: str | Path | None = None
+) -> BIDSDatasetDescriptionAsset | None:
"""
.. versionadded:: 0.46.0
@@ -206,7 +205,7 @@ def find_bids_dataset_description(
``dirpath`` and each of its parents, stopping when a :file:`dandiset.yaml`
file is found or ``dandiset_path`` is reached.
"""
- topmost: Optional[BIDSDatasetDescriptionAsset] = None
+ topmost: BIDSDatasetDescriptionAsset | None = None
dirpath = Path(dirpath)
for d in (dirpath, *dirpath.parents):
bids_marker = d / BIDS_DATASET_DESCRIPTION
diff --git a/dandi/files/_private.py b/dandi/files/_private.py
index 3a34df87f..4d8848059 100644
--- a/dandi/files/_private.py
+++ b/dandi/files/_private.py
@@ -4,7 +4,7 @@
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
-from typing import ClassVar, Optional
+from typing import ClassVar
import weakref
from dandi.consts import (
@@ -66,7 +66,7 @@ class DandiFileFactory:
}
def __call__(
- self, filepath: Path, path: str, dandiset_path: Optional[Path]
+ self, filepath: Path, path: str, dandiset_path: Path | None
) -> DandiFile:
return self.CLASSES[DandiFileType.classify(filepath)](
filepath=filepath, path=path, dandiset_path=dandiset_path
@@ -87,7 +87,7 @@ class BIDSFileFactory(DandiFileFactory):
}
def __call__(
- self, filepath: Path, path: str, dandiset_path: Optional[Path]
+ self, filepath: Path, path: str, dandiset_path: Path | None
) -> DandiFile:
ftype = DandiFileType.classify(filepath)
if ftype is DandiFileType.BIDS_DATASET_DESCRIPTION:
diff --git a/dandi/files/bases.py b/dandi/files/bases.py
index baac4931f..93dfa3233 100644
--- a/dandi/files/bases.py
+++ b/dandi/files/bases.py
@@ -10,7 +10,7 @@
from pathlib import Path
import re
from threading import Lock
-from typing import Any, BinaryIO, Generic, Optional, cast
+from typing import IO, Any, Generic
from xml.etree.ElementTree import fromstring
import dandischema
@@ -57,7 +57,7 @@ class DandiFile(ABC):
filepath: Path
#: The path to the root of the Dandiset, if there is one
- dandiset_path: Optional[Path]
+ dandiset_path: Path | None
@property
def size(self) -> int:
@@ -73,7 +73,7 @@ def modified(self) -> datetime:
@abstractmethod
def get_metadata(
self,
- digest: Optional[Digest] = None,
+ digest: Digest | None = None,
ignore_errors: bool = True,
) -> CommonModel:
"""Return the Dandi metadata for the file"""
@@ -82,7 +82,7 @@ def get_metadata(
@abstractmethod
def get_validation_errors(
self,
- schema_version: Optional[str] = None,
+ schema_version: str | None = None,
devel_debug: bool = False,
) -> list[ValidationResult]:
"""
@@ -96,7 +96,7 @@ class DandisetMetadataFile(DandiFile):
def get_metadata(
self,
- digest: Optional[Digest] = None,
+ digest: Digest | None = None,
ignore_errors: bool = True,
) -> DandisetMeta:
"""Return the Dandiset metadata inside the file"""
@@ -107,7 +107,7 @@ def get_metadata(
# TODO: @validate_cache.memoize_path
def get_validation_errors(
self,
- schema_version: Optional[str] = None,
+ schema_version: str | None = None,
devel_debug: bool = False,
) -> list[ValidationResult]:
with open(self.filepath) as f:
@@ -167,7 +167,7 @@ def get_digest(self) -> Digest:
@abstractmethod
def get_metadata(
self,
- digest: Optional[Digest] = None,
+ digest: Digest | None = None,
ignore_errors: bool = True,
) -> BareAsset:
"""Return the Dandi metadata for the asset"""
@@ -176,7 +176,7 @@ def get_metadata(
# TODO: @validate_cache.memoize_path
def get_validation_errors(
self,
- schema_version: Optional[str] = None,
+ schema_version: str | None = None,
devel_debug: bool = False,
) -> list[ValidationResult]:
current_version = get_schema_version()
@@ -226,8 +226,8 @@ def upload(
self,
dandiset: RemoteDandiset,
metadata: dict[str, Any],
- jobs: Optional[int] = None,
- replacing: Optional[RemoteAsset] = None,
+ jobs: int | None = None,
+ replacing: RemoteAsset | None = None,
) -> RemoteAsset:
"""
Upload the file as an asset with the given metadata to the given
@@ -260,8 +260,8 @@ def iter_upload(
self,
dandiset: RemoteDandiset,
metadata: dict[str, Any],
- jobs: Optional[int] = None,
- replacing: Optional[RemoteAsset] = None,
+ jobs: int | None = None,
+ replacing: RemoteAsset | None = None,
) -> Iterator[dict]:
"""
Upload the asset with the given metadata to the given Dandiset,
@@ -294,7 +294,7 @@ class LocalFileAsset(LocalAsset):
def get_metadata(
self,
- digest: Optional[Digest] = None,
+ digest: Digest | None = None,
ignore_errors: bool = True,
) -> BareAsset:
metadata = get_default_metadata(self.filepath, digest=digest)
@@ -310,8 +310,8 @@ def iter_upload(
self,
dandiset: RemoteDandiset,
metadata: dict[str, Any],
- jobs: Optional[int] = None,
- replacing: Optional[RemoteAsset] = None,
+ jobs: int | None = None,
+ replacing: RemoteAsset | None = None,
) -> Iterator[dict]:
"""
Upload the file as an asset with the given metadata to the given
@@ -466,7 +466,7 @@ class NWBAsset(LocalFileAsset):
def get_metadata(
self,
- digest: Optional[Digest] = None,
+ digest: Digest | None = None,
ignore_errors: bool = True,
) -> BareAsset:
try:
@@ -488,7 +488,7 @@ def get_metadata(
# TODO: @validate_cache.memoize_path
def get_validation_errors(
self,
- schema_version: Optional[str] = None,
+ schema_version: str | None = None,
devel_debug: bool = False,
) -> list[ValidationResult]:
"""
@@ -612,7 +612,7 @@ def size(self) -> int:
def _upload_blob_part(
storage_session: RESTFullAPIClient,
- fp: BinaryIO,
+ fp: IO[bytes],
lock: Lock,
etagger: DandiETag,
asset_path: str,
@@ -746,14 +746,18 @@ def _pydantic_errors_to_validation_results(
) -> list[ValidationResult]:
"""Convert list of dict from pydantic into our custom object."""
out = []
- for e in (
- errors.errors() if isinstance(errors, ValidationError) else cast(list, errors)
- ):
+ errorlist: list
+ if isinstance(errors, ValidationError):
+ errorlist = errors.errors()
+ else:
+ errorlist = errors
+ for e in errorlist:
if isinstance(e, Exception):
message = getattr(e, "message", str(e))
id = "exception"
scope = Scope.FILE
else:
+ assert isinstance(e, dict)
id = ".".join(
filter(
bool,
diff --git a/dandi/files/bids.py b/dandi/files/bids.py
index a875ba1cc..6f19345df 100644
--- a/dandi/files/bids.py
+++ b/dandi/files/bids.py
@@ -6,7 +6,6 @@
import os.path
from pathlib import Path
from threading import Lock
-from typing import List, Optional
import weakref
from dandischema.models import BareAsset
@@ -36,21 +35,21 @@ class BIDSDatasetDescriptionAsset(LocalFileAsset):
#: A list of validation error messages pertaining to the dataset as a
#: whole, populated by `_validate()`
- _dataset_errors: Optional[list[ValidationResult]] = None
+ _dataset_errors: list[ValidationResult] | None = None
#: A list of validation error messages for individual assets in the
#: dataset, keyed by `bids_path` properties; populated by `_validate()`
- _asset_errors: Optional[dict[str, list[ValidationResult]]] = None
+ _asset_errors: dict[str, list[ValidationResult]] | None = None
#: Asset metadata for individual assets in the dataset, keyed by
#: `bids_path` properties; populated by `_validate()`
- _asset_metadata: Optional[dict[str, BareAsset]] = None
+ _asset_metadata: dict[str, BareAsset] | None = None
#: Version of BIDS used for the validation;
#: populated by `_validate()`
#: In future this might be removed and the information included in the
#: BareAsset via dandischema.
- _bids_version: Optional[str] = None
+ _bids_version: str | None = None
#: Threading lock needed in case multiple assets are validated in parallel
#: during upload
@@ -130,7 +129,7 @@ def get_asset_metadata(self, asset: BIDSAsset) -> BareAsset:
def get_validation_errors(
self,
- schema_version: Optional[str] = None,
+ schema_version: str | None = None,
devel_debug: bool = False,
) -> list[ValidationResult]:
self._validate()
@@ -185,14 +184,14 @@ def bids_path(self) -> str:
def get_validation_errors(
self,
- schema_version: Optional[str] = None,
+ schema_version: str | None = None,
devel_debug: bool = False,
) -> list[ValidationResult]:
return self.bids_dataset_description.get_asset_errors(self)
def get_metadata(
self,
- digest: Optional[Digest] = None,
+ digest: Digest | None = None,
ignore_errors: bool = True,
) -> BareAsset:
metadata = self.bids_dataset_description.get_asset_metadata(self)
@@ -216,7 +215,7 @@ class NWBBIDSAsset(BIDSAsset, NWBAsset):
def get_validation_errors(
self,
- schema_version: Optional[str] = None,
+ schema_version: str | None = None,
devel_debug: bool = False,
) -> list[ValidationResult]:
return NWBAsset.get_validation_errors(
@@ -225,7 +224,7 @@ def get_validation_errors(
def get_metadata(
self,
- digest: Optional[Digest] = None,
+ digest: Digest | None = None,
ignore_errors: bool = True,
) -> BareAsset:
bids_metadata = BIDSAsset.get_metadata(self, digest, ignore_errors)
@@ -244,7 +243,7 @@ class ZarrBIDSAsset(ZarrAsset, BIDSAsset):
def get_validation_errors(
self,
- schema_version: Optional[str] = None,
+ schema_version: str | None = None,
devel_debug: bool = False,
) -> list[ValidationResult]:
return ZarrAsset.get_validation_errors(
@@ -253,7 +252,7 @@ def get_validation_errors(
def get_metadata(
self,
- digest: Optional[Digest] = None,
+ digest: Digest | None = None,
ignore_errors: bool = True,
) -> BareAsset:
metadata = self.bids_dataset_description.get_asset_metadata(self)
@@ -275,9 +274,9 @@ class GenericBIDSAsset(BIDSAsset, GenericAsset):
def get_validation_errors(
self,
- schema_version: Optional[str] = None,
+ schema_version: str | None = None,
devel_debug: bool = False,
- ) -> List[ValidationResult]:
+ ) -> list[ValidationResult]:
return GenericAsset.get_validation_errors(
self, schema_version, devel_debug
) + BIDSAsset.get_validation_errors(self)
diff --git a/dandi/files/zarr.py b/dandi/files/zarr.py
index 34199793d..b675a489c 100644
--- a/dandi/files/zarr.py
+++ b/dandi/files/zarr.py
@@ -10,7 +10,7 @@
import os.path
from pathlib import Path
from time import sleep
-from typing import Any, Optional
+from typing import Any
from dandischema.digests.zarr import get_checksum
from dandischema.models import BareAsset, DigestType
@@ -179,7 +179,7 @@ def get_digest(self) -> Digest:
def get_metadata(
self,
- digest: Optional[Digest] = None,
+ digest: Digest | None = None,
ignore_errors: bool = True,
) -> BareAsset:
metadata = get_default_metadata(self.filepath, digest=digest)
@@ -189,7 +189,7 @@ def get_metadata(
def get_validation_errors(
self,
- schema_version: Optional[str] = None,
+ schema_version: str | None = None,
devel_debug: bool = False,
) -> list[ValidationResult]:
errors: list[ValidationResult] = []
@@ -257,8 +257,8 @@ def iter_upload(
self,
dandiset: RemoteDandiset,
metadata: dict[str, Any],
- jobs: Optional[int] = None,
- replacing: Optional[RemoteAsset] = None,
+ jobs: int | None = None,
+ replacing: RemoteAsset | None = None,
) -> Iterator[dict]:
"""
Upload the Zarr directory as an asset with the given metadata to the
@@ -575,7 +575,7 @@ class EntryUploadTracker:
digested_entries: list[UploadItem] = field(default_factory=list)
fresh_entries: list[LocalZarrEntry] = field(default_factory=list)
- def register(self, e: LocalZarrEntry, digest: Optional[str] = None) -> None:
+ def register(self, e: LocalZarrEntry, digest: str | None = None) -> None:
if digest is not None:
self.digested_entries.append(UploadItem.from_entry(e, digest))
else:
diff --git a/dandi/keyring.py b/dandi/keyring.py
index cc0579644..dcf3d61fb 100644
--- a/dandi/keyring.py
+++ b/dandi/keyring.py
@@ -3,7 +3,7 @@
from collections.abc import Callable
import os.path as op
from pathlib import Path
-from typing import Optional, TypeVar
+from typing import TypeVar
import click
from keyring.backend import KeyringBackend, get_all_keyring
@@ -21,7 +21,7 @@
def keyring_lookup(
service_name: str, username: str
-) -> tuple[KeyringBackend, Optional[str]]:
+) -> tuple[KeyringBackend, str | None]:
"""
Returns an appropriate keyring backend and the password it holds (if any)
for the given service and username.
@@ -44,9 +44,7 @@ def save(kb: KeyringBackend) -> tuple[()]:
save(kb)
-def keyring_op(
- func: Callable[[KeyringBackend], T]
-) -> tuple[KeyringBackend, Optional[T]]:
+def keyring_op(func: Callable[[KeyringBackend], T]) -> tuple[KeyringBackend, T | None]:
"""
Determine a keyring backend to use for storing & retrieving credentials,
perform an operation on the backend, and return the backend and the results
diff --git a/dandi/metadata.py b/dandi/metadata.py
index 08502d078..159892534 100644
--- a/dandi/metadata.py
+++ b/dandi/metadata.py
@@ -1,24 +1,13 @@
from __future__ import annotations
+from collections.abc import Callable, Iterable
from datetime import datetime, timedelta
from functools import lru_cache
import os
import os.path
from pathlib import Path
import re
-from typing import (
- Any,
- Callable,
- Dict,
- Iterable,
- List,
- Optional,
- Tuple,
- Type,
- TypedDict,
- TypeVar,
- Union,
-)
+from typing import Any, TypedDict, TypeVar
from uuid import uuid4
from xml.dom.minidom import parseString
@@ -51,9 +40,8 @@
# Disable this for clean hacking
@metadata_cache.memoize_path
def get_metadata(
- path: str | Path | Readable,
- digest: Optional[Digest] = None,
-) -> Optional[dict]:
+ path: str | Path | Readable, digest: Digest | None = None
+) -> dict | None:
"""
Get "flatdata" from a .nwb file
@@ -158,7 +146,7 @@ def get_metadata(
return meta
-def _parse_iso8601(age: str) -> List[str]:
+def _parse_iso8601(age: str) -> list[str]:
"""checking if age is proper iso8601, additional formatting"""
# allowing for comma instead of ., e.g. P1,5D
age = age.replace(",", ".")
@@ -188,7 +176,7 @@ def _parse_iso8601(age: str) -> List[str]:
raise ValueError(f"ISO 8601 expected, but {age!r} was received")
-def _parse_age_re(age: str, unit: str, tp: str = "date") -> Tuple[str, Optional[str]]:
+def _parse_age_re(age: str, unit: str, tp: str = "date") -> tuple[str, str | None]:
"""finding parts that have in various forms"""
if unit == "Y":
@@ -229,7 +217,7 @@ def _parse_age_re(age: str, unit: str, tp: str = "date") -> Tuple[str, Optional[
return (age[: m.start()] + age[m.end() :]).strip(), f"{qty}{unit}"
-def _parse_hours_format(age: str) -> Tuple[str, List[str]]:
+def _parse_hours_format(age: str) -> tuple[str, list[str]]:
"""parsing format 0:30:10"""
m = re.match(r"\s*(\d\d?):(\d\d):(\d\d)", age)
if m:
@@ -239,7 +227,7 @@ def _parse_hours_format(age: str) -> Tuple[str, List[str]]:
return age, []
-def _check_decimal_parts(age_parts: List[str]) -> None:
+def _check_decimal_parts(age_parts: list[str]) -> None:
"""checking if decimal parts are only in the lowest order component"""
# if the last part is the T component I have to separate the parts
decim_part = ["." in el for el in age_parts]
@@ -247,7 +235,7 @@ def _check_decimal_parts(age_parts: List[str]) -> None:
raise ValueError("Decimal fraction allowed in the lowest order part only.")
-def _check_range_limits(limits: List[List[str]]) -> None:
+def _check_range_limits(limits: list[list[str]]) -> None:
"""checking if the upper limit is bigger than the lower limit"""
ok = True
units_t = dict(zip(["S", "M", "H"], range(3)))
@@ -289,7 +277,7 @@ def _check_range_limits(limits: List[List[str]]) -> None:
)
-def parse_age(age: Optional[str]) -> Tuple[str, str]:
+def parse_age(age: str | None) -> tuple[str, str]:
"""
Parsing age field and converting into an ISO 8601 duration
@@ -299,7 +287,7 @@ def parse_age(age: Optional[str]) -> Tuple[str, str]:
Returns
-------
- Tuple[str, str]
+ tuple[str, str]
"""
if not age:
@@ -342,7 +330,7 @@ def parse_age(age: Optional[str]) -> Tuple[str, str]:
if not age:
raise ValueError("Age doesn't have any information")
- date_f: List[str] = []
+ date_f: list[str] = []
for unit in ["Y", "M", "W", "D"]:
if not age:
break
@@ -353,7 +341,7 @@ def parse_age(age: Optional[str]) -> Tuple[str, str]:
date_f = ["P", part_f]
if ref == "Birth":
- time_f: List[str] = []
+ time_f: list[str] = []
for un in ["H", "M", "S"]:
if not age:
break
@@ -380,7 +368,7 @@ def parse_age(age: Optional[str]) -> Tuple[str, str]:
return "".join(age_f), ref
-def extract_age(metadata: dict) -> Optional[models.PropertyValue]:
+def extract_age(metadata: dict) -> models.PropertyValue | None:
try:
dob = ensure_datetime(metadata["date_of_birth"])
start = ensure_datetime(metadata["session_start_time"])
@@ -416,7 +404,7 @@ def timedelta2duration(delta: timedelta) -> str:
if delta.days:
s += f"{delta.days}D"
if delta.seconds or delta.microseconds:
- sec: Union[int, float] = delta.seconds
+ sec: int | float = delta.seconds
if delta.microseconds:
# Don't add when microseconds is 0, so that sec will be an int then
sec += delta.microseconds / 1e6
@@ -426,7 +414,7 @@ def timedelta2duration(delta: timedelta) -> str:
return s
-def extract_sex(metadata: dict) -> Optional[models.SexType]:
+def extract_sex(metadata: dict) -> models.SexType | None:
value = metadata.get("sex", None)
if value is not None and value != "":
value = value.lower()
@@ -452,7 +440,7 @@ def extract_sex(metadata: dict) -> Optional[models.SexType]:
return None
-def extract_strain(metadata: dict) -> Optional[models.StrainType]:
+def extract_strain(metadata: dict) -> models.StrainType | None:
value = metadata.get("strain", None)
if value:
# Don't assign cell lines to strain
@@ -463,7 +451,7 @@ def extract_strain(metadata: dict) -> Optional[models.StrainType]:
return None
-def extract_cellLine(metadata: dict) -> Optional[str]:
+def extract_cellLine(metadata: dict) -> str | None:
value: str = metadata.get("strain", "")
if value and value.lower().startswith("cellline:"):
return value.split(":", 1)[1].strip()
@@ -549,8 +537,8 @@ def extract_cellLine(metadata: dict) -> Optional[str]:
wait=tenacity.wait_exponential(exp_base=1.25, multiplier=1.25),
)
def parse_purlobourl(
- url: str, lookup: Optional[Tuple[str, ...]] = None
-) -> Optional[Dict[str, str]]:
+ url: str, lookup: tuple[str, ...] | None = None
+) -> dict[str, str] | None:
"""Parse an Ontobee URL to return properties of a Class node
:param url: Ontobee URL
@@ -578,7 +566,7 @@ def parse_purlobourl(
return values
-def extract_species(metadata: dict) -> Optional[models.SpeciesType]:
+def extract_species(metadata: dict) -> models.SpeciesType | None:
value_orig = metadata.get("species", None)
value_id = None
if value_orig is not None and value_orig != "":
@@ -593,7 +581,7 @@ def extract_species(metadata: dict) -> Optional[models.SpeciesType]:
value_id = value_orig
lookup = ("rdfs:label", "oboInOwl:hasExactSynonym")
try:
- result: Optional[Dict[str, str]] = parse_purlobourl(
+ result: dict[str, str] | None = parse_purlobourl(
value_orig, lookup=lookup
)
except ConnectionError:
@@ -628,14 +616,14 @@ def extract_species(metadata: dict) -> Optional[models.SpeciesType]:
return None
-def extract_assay_type(metadata: dict) -> Optional[List[models.AssayType]]:
+def extract_assay_type(metadata: dict) -> list[models.AssayType] | None:
if "assayType" in metadata:
return [models.AssayType(identifier="assayType", name=metadata["assayType"])]
else:
return None
-def extract_anatomy(metadata: dict) -> Optional[List[models.Anatomy]]:
+def extract_anatomy(metadata: dict) -> list[models.Anatomy] | None:
if "anatomy" in metadata:
return [models.Anatomy(identifier="anatomy", name=metadata["anatomy"])]
else:
@@ -645,7 +633,7 @@ def extract_anatomy(metadata: dict) -> Optional[List[models.Anatomy]]:
M = TypeVar("M", bound=models.DandiBaseModel)
-def extract_model(modelcls: Type[M], metadata: dict, **kwargs: Any) -> M:
+def extract_model(modelcls: type[M], metadata: dict, **kwargs: Any) -> M:
m = modelcls.unvalidated()
for field in m.__fields__.keys():
value = kwargs.get(field, extract_field(field, metadata))
@@ -656,9 +644,9 @@ def extract_model(modelcls: Type[M], metadata: dict, **kwargs: Any) -> M:
def extract_model_list(
- modelcls: Type[M], id_field: str, id_source: str, **kwargs: Any
-) -> Callable[[dict], List[M]]:
- def func(metadata: dict) -> List[M]:
+ modelcls: type[M], id_field: str, id_source: str, **kwargs: Any
+) -> Callable[[dict], list[M]]:
+ def func(metadata: dict) -> list[M]:
m = extract_model(
modelcls, metadata, **{id_field: metadata.get(id_source)}, **kwargs
)
@@ -670,8 +658,8 @@ def func(metadata: dict) -> List[M]:
return func
-def extract_wasDerivedFrom(metadata: dict) -> Optional[List[models.BioSample]]:
- derived_from: Optional[List[models.BioSample]] = None
+def extract_wasDerivedFrom(metadata: dict) -> list[models.BioSample] | None:
+ derived_from: list[models.BioSample] | None = None
for field, sample_name in [
("tissue_sample_id", "tissuesample"),
("slice_id", "slice"),
@@ -693,7 +681,7 @@ def extract_wasDerivedFrom(metadata: dict) -> Optional[List[models.BioSample]]:
)
-def extract_session(metadata: dict) -> Optional[List[models.Session]]:
+def extract_session(metadata: dict) -> list[models.Session] | None:
probe_ids = metadata.get("probe_ids", [])
if isinstance(probe_ids, str):
probe_ids = [probe_ids]
@@ -717,14 +705,14 @@ def extract_session(metadata: dict) -> Optional[List[models.Session]]:
]
-def extract_digest(metadata: dict) -> Optional[Dict[models.DigestType, str]]:
+def extract_digest(metadata: dict) -> dict[models.DigestType, str] | None:
if "digest" in metadata:
return {models.DigestType[metadata["digest_type"]]: metadata["digest"]}
else:
return None
-FIELD_EXTRACTORS: Dict[str, Callable[[dict], Any]] = {
+FIELD_EXTRACTORS: dict[str, Callable[[dict], Any]] = {
"wasDerivedFrom": extract_wasDerivedFrom,
"wasAttributedTo": extract_wasAttributedTo,
"wasGeneratedBy": extract_session,
@@ -749,11 +737,11 @@ def extract_field(field: str, metadata: dict) -> Any:
class Neurodatum(TypedDict):
module: str
neurodata_type: str
- technique: Optional[str]
- approach: Optional[str]
+ technique: str | None
+ approach: str | None
-neurodata_typemap: Dict[str, Neurodatum] = {
+neurodata_typemap: dict[str, Neurodatum] = {
"ElectricalSeries": {
"module": "ecephys",
"neurodata_type": "ElectricalSeries",
@@ -965,8 +953,8 @@ def process_ndtypes(metadata: models.BareAsset, nd_types: Iterable[str]) -> None
def nwb2asset(
nwb_path: str | Path | Readable,
- digest: Optional[Digest] = None,
- schema_version: Optional[str] = None,
+ digest: Digest | None = None,
+ schema_version: str | None = None,
) -> models.BareAsset:
if schema_version is not None:
current_version = models.get_schema_version()
@@ -990,7 +978,7 @@ def nwb2asset(
def get_default_metadata(
- path: str | Path | Readable, digest: Optional[Digest] = None
+ path: str | Path | Readable, digest: Digest | None = None
) -> models.BareAsset:
metadata = models.BareAsset.unvalidated()
start_time = end_time = datetime.now().astimezone()
@@ -1003,7 +991,7 @@ def add_common_metadata(
path: str | Path | Readable,
start_time: datetime,
end_time: datetime,
- digest: Optional[Digest] = None,
+ digest: Digest | None = None,
) -> None:
"""
Update a `dict` of raw "schemadata" with the fields that are common to both
diff --git a/dandi/misctypes.py b/dandi/misctypes.py
index 49047691a..df1eaf2d2 100644
--- a/dandi/misctypes.py
+++ b/dandi/misctypes.py
@@ -13,7 +13,7 @@
from fnmatch import fnmatchcase
import os.path
from pathlib import Path
-from typing import IO, Optional, TypeVar, cast
+from typing import IO, TypeVar, cast
from dandischema.models import DigestType
@@ -270,7 +270,7 @@ def get_size(self) -> int:
...
@abstractmethod
- def get_mtime(self) -> Optional[datetime]:
+ def get_mtime(self) -> datetime | None:
"""
Returns the time at which the resource's contents were last modified,
if it can be determined
@@ -336,7 +336,7 @@ class RemoteReadableAsset(Readable):
size: int
#: :meta private:
- mtime: Optional[datetime]
+ mtime: datetime | None
#: :meta private:
name: str
@@ -349,7 +349,7 @@ def open(self) -> IO[bytes]:
def get_size(self) -> int:
return self.size
- def get_mtime(self) -> Optional[datetime]:
+ def get_mtime(self) -> datetime | None:
return self.mtime
def get_filename(self) -> str:
diff --git a/dandi/move.py b/dandi/move.py
index 4ef135c3f..7463beb5f 100644
--- a/dandi/move.py
+++ b/dandi/move.py
@@ -9,7 +9,7 @@
from pathlib import Path, PurePosixPath
import posixpath
import re
-from typing import NewType, Optional
+from typing import NewType
from . import get_logger
from .consts import DandiInstance
@@ -536,7 +536,7 @@ class RemoteMover(LocalizedMover):
#: The `~LocalMover.dandiset_path` of the corresponding `LocalMover` when
#: inside a `LocalRemoteMover`
- local_dandiset_path: Optional[Path] = None
+ local_dandiset_path: Path | None = None
#: A collection of all assets in the Dandiset, keyed by their paths
assets: dict[AssetPath, RemoteAsset] = field(init=False)
@@ -765,7 +765,7 @@ def move(
dandiset: Path | str | None = None,
work_on: str = "auto",
devel_debug: bool = False,
- jobs: Optional[int] = None,
+ jobs: int | None = None,
dry_run: bool = False,
) -> None:
if not srcs:
@@ -774,7 +774,7 @@ def move(
dandiset = Path()
with ExitStack() as stack:
mover: Mover
- client: Optional[DandiAPIClient] = None
+ client: DandiAPIClient | None = None
if work_on == "auto":
work_on = "remote" if isinstance(dandiset, str) else "both"
if work_on == "both":
diff --git a/dandi/organize.py b/dandi/organize.py
index 2edab94d9..06c163d0f 100644
--- a/dandi/organize.py
+++ b/dandi/organize.py
@@ -1,5 +1,5 @@
"""
-ATM primarily a sandbox for some functionality for dandi organize
+ATM primarily a sandbox for some functionality for dandi organize
"""
from __future__ import annotations
@@ -12,7 +12,6 @@
import os.path as op
from pathlib import Path, PurePosixPath
import re
-from typing import List, Optional
import uuid
import numpy as np
@@ -66,7 +65,7 @@ def filter_invalid_metadata_rows(metadata_rows):
def create_unique_filenames_from_metadata(
- metadata: list[dict], required_fields: Optional[Sequence[str]] = None
+ metadata: list[dict], required_fields: Sequence[str] | None = None
) -> list[dict]:
"""Create unique filenames given metadata
@@ -187,7 +186,7 @@ def create_unique_filenames_from_metadata(
return metadata
-def _create_external_file_names(metadata: List[dict]) -> List[dict]:
+def _create_external_file_names(metadata: list[dict]) -> list[dict]:
"""Updates the metadata dict with renamed external files.
Renames the external_file attribute in an ImageSeries according to the rule:
@@ -228,7 +227,7 @@ def _create_external_file_names(metadata: List[dict]) -> List[dict]:
def organize_external_files(
- metadata: List[dict], dandiset_path: str, files_mode: str
+ metadata: list[dict], dandiset_path: str, files_mode: str
) -> None:
"""Organizes the external_files into the new Dandiset folder structure.
@@ -814,7 +813,9 @@ def _get_metadata(path):
meta["path"] = path
return meta
- if not devel_debug and jobs != 1: # Do not use joblib at all if number_of_jobs=1
+ if (
+ not devel_debug and jobs != 1
+ ): # Do not use joblib at all if number_of_jobs=1
# Note: It is Python (pynwb) intensive, not IO, so ATM there is little
# to no benefit from Parallel without using multiproc! But that would
# complicate progress bar indication... TODO
diff --git a/dandi/pynwb_utils.py b/dandi/pynwb_utils.py
index b7cfaa6d4..c3a5945bf 100644
--- a/dandi/pynwb_utils.py
+++ b/dandi/pynwb_utils.py
@@ -6,7 +6,7 @@
import os.path as op
from pathlib import Path
import re
-from typing import IO, Any, Dict, List, Optional, Tuple, TypeVar, Union, cast
+from typing import IO, Any, TypeVar, cast
import warnings
import dandischema
@@ -53,7 +53,7 @@
def _sanitize_nwb_version(
v: Any,
filename: str | Path | None = None,
- log: Optional[Callable[[str], Any]] = None,
+ log: Callable[[str], Any] | None = None,
) -> str:
"""Helper to sanitize the value of nwb_version where possible
@@ -91,7 +91,7 @@ def _sanitize_nwb_version(
def get_nwb_version(
filepath: str | Path | Readable, sanitize: bool = False
-) -> Optional[str]:
+) -> str | None:
"""Return a version of the NWB standard used by a file
Parameters
@@ -130,7 +130,7 @@ def _sanitize(v: Any) -> str:
return None
-def get_neurodata_types_to_modalities_map() -> Dict[str, str]:
+def get_neurodata_types_to_modalities_map() -> dict[str, str]:
"""Return a dict to map neurodata types known to pynwb to "modalities"
It is an ugly hack, largely to check feasibility.
@@ -139,7 +139,7 @@ def get_neurodata_types_to_modalities_map() -> Dict[str, str]:
"""
import inspect
- ndtypes: Dict[str, str] = {}
+ ndtypes: dict[str, str] = {}
# TODO: if there are extensions, they might have types subclassed from the base
# types. There might be a map within pynwb (pynwb.get_type_map?) to return
@@ -192,7 +192,7 @@ def get_neurodata_types(filepath: str | Path | Readable) -> list[str]:
return out
-def _scan_neurodata_types(grp: h5py.File) -> List[Tuple[Any, Any]]:
+def _scan_neurodata_types(grp: h5py.File) -> list[tuple[Any, Any]]:
out = []
if "neurodata_type" in grp.attrs:
out.append((grp.attrs["neurodata_type"], grp.attrs.get("description", None)))
@@ -258,7 +258,7 @@ def _get_pynwb_metadata(path: str | Path | Readable) -> dict[str, Any]:
return out
-def _get_image_series(nwb: pynwb.NWBFile) -> List[dict]:
+def _get_image_series(nwb: pynwb.NWBFile) -> list[dict]:
"""Retrieves all ImageSeries related metadata from an open nwb file.
Specifically it pulls out the ImageSeries uuid, name and all the
@@ -270,7 +270,7 @@ def _get_image_series(nwb: pynwb.NWBFile) -> List[dict]:
Returns
-------
- out: List[dict]
+ out: list[dict]
list of dicts : [{id: , name: ,
external_files=[ImageSeries.external_file]}]
if no ImageSeries found in the given modules to check, then it returns an empty list.
@@ -294,7 +294,7 @@ def _get_image_series(nwb: pynwb.NWBFile) -> List[dict]:
return out
-def rename_nwb_external_files(metadata: List[dict], dandiset_path: str) -> None:
+def rename_nwb_external_files(metadata: list[dict], dandiset_path: str) -> None:
"""Renames the external_file attribute in an ImageSeries datatype in an open nwb file.
It pulls information about the ImageSeries objects from metadata:
@@ -302,7 +302,7 @@ def rename_nwb_external_files(metadata: List[dict], dandiset_path: str) -> None:
Parameters
----------
- metadata: List[dict]
+ metadata: list[dict]
list of dictionaries containing the metadata gathered from the nwbfile
dandiset_path: str
base path of dandiset
@@ -340,9 +340,7 @@ def rename_nwb_external_files(metadata: List[dict], dandiset_path: str) -> None:
@validate_cache.memoize_path
-def validate(
- path: Union[str, Path], devel_debug: bool = False
-) -> List[ValidationResult]:
+def validate(path: str | Path, devel_debug: bool = False) -> list[ValidationResult]:
"""Run validation on a file and return errors
In case of an exception being thrown, an error message added to the
@@ -353,7 +351,7 @@ def validate(
path: str or Path
"""
path = str(path) # Might come in as pathlib's PATH
- errors: List[ValidationResult] = []
+ errors: list[ValidationResult] = []
try:
if Version(pynwb.__version__) >= Version(
"2.2.0"
@@ -494,7 +492,7 @@ def make_nwb_file(
return filename
-def copy_nwb_file(src: Union[str, Path], dest: Union[str, Path]) -> str:
+def copy_nwb_file(src: str | Path, dest: str | Path) -> str:
""" "Copy" .nwb file by opening and saving into a new path.
New file (`dest`) then should have new `object_id` attribute, and thus be
diff --git a/dandi/support/digests.py b/dandi/support/digests.py
index e9a5b63b1..b1ce72f4d 100644
--- a/dandi/support/digests.py
+++ b/dandi/support/digests.py
@@ -15,7 +15,6 @@
import logging
import os.path
from pathlib import Path
-from typing import Dict, List, Optional, Tuple, Union, cast
from dandischema.digests.dandietag import DandiETag
from fscacher import PersistentCache
@@ -40,7 +39,7 @@ class Digester:
DEFAULT_DIGESTS = ["md5", "sha1", "sha256", "sha512"]
def __init__(
- self, digests: Optional[List[str]] = None, blocksize: int = 1 << 16
+ self, digests: list[str] | None = None, blocksize: int = 1 << 16
) -> None:
"""
Parameters
@@ -57,10 +56,10 @@ def __init__(
self.blocksize = blocksize
@property
- def digests(self) -> List[str]:
+ def digests(self) -> list[str]:
return self._digests
- def __call__(self, fpath: Union[str, Path]) -> Dict[str, str]:
+ def __call__(self, fpath: str | Path) -> dict[str, str]:
"""
fpath : str
File path for which a checksum shall be computed.
@@ -86,9 +85,11 @@ def __call__(self, fpath: Union[str, Path]) -> Dict[str, str]:
@checksums.memoize_path
-def get_digest(filepath: Union[str, Path], digest: str = "sha256") -> str:
+def get_digest(filepath: str | Path, digest: str = "sha256") -> str:
if digest == "dandi-etag":
- return cast(str, get_dandietag(filepath).as_str())
+ s = get_dandietag(filepath).as_str()
+ assert isinstance(s, str)
+ return s
elif digest == "zarr-checksum":
return get_zarr_checksum(Path(filepath))
else:
@@ -96,11 +97,11 @@ def get_digest(filepath: Union[str, Path], digest: str = "sha256") -> str:
@checksums.memoize_path
-def get_dandietag(filepath: Union[str, Path]) -> DandiETag:
+def get_dandietag(filepath: str | Path) -> DandiETag:
return DandiETag.from_file(filepath)
-def get_zarr_checksum(path: Path, known: Optional[Dict[str, str]] = None) -> str:
+def get_zarr_checksum(path: Path, known: dict[str, str] | None = None) -> str:
"""
Compute the Zarr checksum for a file or directory tree.
@@ -109,11 +110,13 @@ def get_zarr_checksum(path: Path, known: Optional[Dict[str, str]] = None) -> str
slash-separated paths relative to the root of the Zarr to hex digests.
"""
if path.is_file():
- return cast(str, get_digest(path, "md5"))
+ s = get_digest(path, "md5")
+ assert isinstance(s, str)
+ return s
if known is None:
known = {}
- def digest_file(f: Path) -> Tuple[Path, str, int]:
+ def digest_file(f: Path) -> tuple[Path, str, int]:
assert known is not None
relpath = f.relative_to(path).as_posix()
try:
@@ -128,7 +131,7 @@ def digest_file(f: Path) -> Tuple[Path, str, int]:
return str(zcc.process())
-def md5file_nocache(filepath: Union[str, Path]) -> str:
+def md5file_nocache(filepath: str | Path) -> str:
"""
Compute the MD5 digest of a file without caching with fscacher, which has
been shown to slow things down for the large numbers of files typically
diff --git a/dandi/support/threaded_walk.py b/dandi/support/threaded_walk.py
index bf4fd3327..be285812b 100644
--- a/dandi/support/threaded_walk.py
+++ b/dandi/support/threaded_walk.py
@@ -15,20 +15,23 @@
# limitations under the License.
# ==============================================================================
+from __future__ import annotations
+
+from collections.abc import Callable, Iterable
import logging
import os.path
from pathlib import Path
import threading
-from typing import Any, Callable, Iterable, Optional, Union
+from typing import Any
log = logging.getLogger(__name__)
def threaded_walk(
- dirpath: Union[str, Path],
- func: Optional[Callable[[Path], Any]] = None,
+ dirpath: str | Path,
+ func: Callable[[Path], Any] | None = None,
threads: int = 60,
- exclude: Optional[Callable[[Path], Any]] = None,
+ exclude: Callable[[Path], Any] | None = None,
) -> Iterable[Any]:
if not os.path.isdir(dirpath):
return
diff --git a/dandi/tests/fixtures.py b/dandi/tests/fixtures.py
index a00bbba0e..4c3b355e8 100644
--- a/dandi/tests/fixtures.py
+++ b/dandi/tests/fixtures.py
@@ -1,5 +1,6 @@
from __future__ import annotations
+from collections.abc import Callable, Iterator
from dataclasses import dataclass, field
from datetime import datetime, timezone
import logging
@@ -9,7 +10,7 @@
import shutil
from subprocess import DEVNULL, check_output, run
from time import sleep
-from typing import Any, Callable, Dict, Iterator, List, Literal, Optional, Union
+from typing import Any, Literal
from uuid import uuid4
from _pytest.fixtures import FixtureRequest
@@ -68,9 +69,9 @@ def capture_all_logs(caplog: pytest.LogCaptureFixture) -> None:
# TODO: move into some common fixtures. We might produce a number of files
# and also carry some small ones directly in git for regression testing
@pytest.fixture(scope="session")
-def simple1_nwb_metadata() -> Dict[str, Any]:
+def simple1_nwb_metadata() -> dict[str, Any]:
# very simple assignment with the same values as the key with 1 as suffix
- metadata: Dict[str, Any] = {f: f"{f}1" for f in metadata_nwb_file_fields}
+ metadata: dict[str, Any] = {f: f"{f}1" for f in metadata_nwb_file_fields}
metadata["identifier"] = uuid4().hex
# subject_fields
@@ -90,7 +91,7 @@ def simple1_nwb_metadata() -> Dict[str, Any]:
@pytest.fixture(scope="session")
def simple1_nwb(
- simple1_nwb_metadata: Dict[str, Any], tmp_path_factory: pytest.TempPathFactory
+ simple1_nwb_metadata: dict[str, Any], tmp_path_factory: pytest.TempPathFactory
) -> Path:
return make_nwb_file(
tmp_path_factory.mktemp("simple1") / "simple1.nwb",
@@ -100,7 +101,7 @@ def simple1_nwb(
@pytest.fixture(scope="session")
def simple2_nwb(
- simple1_nwb_metadata: Dict[str, Any], tmp_path_factory: pytest.TempPathFactory
+ simple1_nwb_metadata: dict[str, Any], tmp_path_factory: pytest.TempPathFactory
) -> Path:
"""With a subject"""
return make_nwb_file(
@@ -117,7 +118,7 @@ def simple2_nwb(
@pytest.fixture(scope="session")
def simple3_nwb(
- simple1_nwb_metadata: Dict[str, Any], tmp_path_factory: pytest.TempPathFactory
+ simple1_nwb_metadata: dict[str, Any], tmp_path_factory: pytest.TempPathFactory
) -> Path:
"""With a subject, but no subject_id."""
return make_nwb_file(
@@ -215,7 +216,7 @@ def organized_nwb_dir(
@pytest.fixture(scope="session")
def organized_nwb_dir2(
- simple1_nwb_metadata: Dict[str, Any],
+ simple1_nwb_metadata: dict[str, Any],
simple2_nwb: Path,
tmp_path_factory: pytest.TempPathFactory,
) -> Path:
@@ -281,18 +282,12 @@ def organized_nwb_dir4(
return tmp_path
-Scope = Union[
- Literal["session"],
- Literal["package"],
- Literal["module"],
- Literal["class"],
- Literal["function"],
-]
+Scope = Literal["session", "package", "module", "class", "function"]
def get_gitrepo_fixture(
url: str,
- committish: Optional[str] = None,
+ committish: str | None = None,
scope: Scope = "session",
make_subdirs_dandisets: bool = False,
) -> Callable[[pytest.TempPathFactory], Path]:
@@ -316,8 +311,8 @@ def fixture(tmp_path_factory: pytest.TempPathFactory) -> Path:
def get_filtered_gitrepo_fixture(
url: str,
- whitelist: List[str],
- make_subdirs_dandisets: Optional[bool] = False,
+ whitelist: list[str],
+ make_subdirs_dandisets: bool | None = False,
) -> Callable[[pytest.TempPathFactory], Iterator[Path]]:
@pytest.fixture(scope="session")
def fixture(
@@ -376,7 +371,7 @@ def _make_subdirs_dandisets(path: Path) -> None:
@pytest.fixture(scope="session")
-def docker_compose_setup() -> Iterator[Dict[str, str]]:
+def docker_compose_setup() -> Iterator[dict[str, str]]:
skipif.no_network()
skipif.no_docker_engine()
@@ -491,7 +486,7 @@ def api_url(self) -> str:
@pytest.fixture(scope="session")
-def local_dandi_api(docker_compose_setup: Dict[str, str]) -> Iterator[DandiAPI]:
+def local_dandi_api(docker_compose_setup: dict[str, str]) -> Iterator[DandiAPI]:
instance = known_instances["dandi-api-local-docker-tests"]
api_key = docker_compose_setup["django_api_key"]
with DandiAPIClient.for_dandi_instance(instance, token=api_key) as client:
@@ -504,15 +499,13 @@ class SampleDandiset:
dspath: Path
dandiset: RemoteDandiset
dandiset_id: str
- upload_kwargs: Dict[str, Any] = field(default_factory=dict)
+ upload_kwargs: dict[str, Any] = field(default_factory=dict)
@property
def client(self) -> DandiAPIClient:
return self.api.client
- def upload(
- self, paths: Optional[List[Union[str, Path]]] = None, **kwargs: Any
- ) -> None:
+ def upload(self, paths: list[str | Path] | None = None, **kwargs: Any) -> None:
with pytest.MonkeyPatch().context() as m:
m.setenv("DANDI_API_KEY", self.api.api_key)
upload(
diff --git a/dandi/tests/test_dandiapi.py b/dandi/tests/test_dandiapi.py
index ac8b04c06..0aa9e8170 100644
--- a/dandi/tests/test_dandiapi.py
+++ b/dandi/tests/test_dandiapi.py
@@ -1,3 +1,5 @@
+from __future__ import annotations
+
import builtins
from datetime import datetime, timezone
import logging
@@ -5,7 +7,6 @@
import random
import re
from shutil import rmtree
-from typing import List, Union
import anys
import click
@@ -535,7 +536,7 @@ def test_set_dandiset_metadata(text_dandiset: SampleDandiset) -> None:
],
)
def test_get_raw_digest(
- digest_type: Union[str, DigestType, None],
+ digest_type: str | DigestType | None,
digest_regex: str,
text_dandiset: SampleDandiset,
) -> None:
@@ -687,7 +688,7 @@ def test_get_many_pages_of_assets(
) -> None:
new_dandiset.client.page_size = 4
get_spy = mocker.spy(new_dandiset.client, "get")
- paths: List[str] = []
+ paths: list[str] = []
for i in range(26):
p = new_dandiset.dspath / f"{i:04}.txt"
paths.append(p.name)
diff --git a/dandi/tests/test_delete.py b/dandi/tests/test_delete.py
index e297e862e..310b1205e 100644
--- a/dandi/tests/test_delete.py
+++ b/dandi/tests/test_delete.py
@@ -1,5 +1,6 @@
+from __future__ import annotations
+
from pathlib import Path
-from typing import List
import pytest
from pytest_mock import MockerFixture
@@ -62,8 +63,8 @@ def test_delete_paths(
monkeypatch: pytest.MonkeyPatch,
text_dandiset: SampleDandiset,
tmp_path: Path,
- paths: List[str],
- remainder: List[Path],
+ paths: list[str],
+ remainder: list[Path],
) -> None:
monkeypatch.chdir(text_dandiset.dspath)
monkeypatch.setenv("DANDI_API_KEY", text_dandiset.api.api_key)
@@ -139,7 +140,7 @@ def test_delete_dandiset(
mocker: MockerFixture,
monkeypatch: pytest.MonkeyPatch,
text_dandiset: SampleDandiset,
- paths: List[str],
+ paths: list[str],
) -> None:
monkeypatch.chdir(text_dandiset.dspath)
monkeypatch.setenv("DANDI_API_KEY", text_dandiset.api.api_key)
diff --git a/dandi/tests/test_download.py b/dandi/tests/test_download.py
index b4b8651d1..9196ef9a9 100644
--- a/dandi/tests/test_download.py
+++ b/dandi/tests/test_download.py
@@ -1,10 +1,12 @@
+from __future__ import annotations
+
+from collections.abc import Callable
import json
import os
import os.path
from pathlib import Path
import re
from shutil import rmtree
-from typing import Callable, List, Tuple
import time
import numpy as np
@@ -18,7 +20,7 @@
from .test_helpers import assert_dirtrees_eq
from ..consts import DRAFT, dandiset_metadata_file
from ..dandiarchive import DandisetURL
-from ..download import Downloader, ProgressCombiner, download, PYOUTHelper
+from ..download import Downloader, ProgressCombiner, PYOUTHelper, download
from ..exceptions import NotFoundError
from ..utils import list_paths
@@ -754,10 +756,10 @@ def test_download_zarr_subdir_has_only_subdirs(
],
)
def test_progress_combiner(
- file_qty: int, inputs: List[Tuple[str, dict]], expected: List[dict]
+ file_qty: int, inputs: list[tuple[str, dict]], expected: list[dict]
) -> None:
pc = ProgressCombiner(zarr_size=69105, file_qty=file_qty)
- outputs: List[dict] = []
+ outputs: list[dict] = []
for path, status in inputs:
outputs.extend(pc.feed(path, status))
assert outputs == expected
diff --git a/dandi/tests/test_files.py b/dandi/tests/test_files.py
index 3e44cf6b1..e2f445701 100644
--- a/dandi/tests/test_files.py
+++ b/dandi/tests/test_files.py
@@ -1,8 +1,9 @@
+from __future__ import annotations
+
from operator import attrgetter
import os
from pathlib import Path
import subprocess
-from typing import cast
from unittest.mock import ANY
from dandischema.models import get_schema_version
@@ -245,7 +246,8 @@ def test_find_dandi_files_with_bids(tmp_path: Path) -> None:
),
]
- bidsdd = cast(BIDSDatasetDescriptionAsset, files[1])
+ bidsdd = files[1]
+ assert isinstance(bidsdd, BIDSDatasetDescriptionAsset)
assert sorted(bidsdd.dataset_files, key=attrgetter("filepath")) == [
GenericBIDSAsset(
filepath=tmp_path / "bids1" / "file.txt",
@@ -269,7 +271,8 @@ def test_find_dandi_files_with_bids(tmp_path: Path) -> None:
for asset in bidsdd.dataset_files:
assert asset.bids_dataset_description is bidsdd
- bidsdd = cast(BIDSDatasetDescriptionAsset, files[5])
+ bidsdd = files[5]
+ assert isinstance(bidsdd, BIDSDatasetDescriptionAsset)
assert sorted(bidsdd.dataset_files, key=attrgetter("filepath")) == [
GenericBIDSAsset(
filepath=tmp_path / "bids2" / "movie.mp4",
diff --git a/dandi/tests/test_keyring.py b/dandi/tests/test_keyring.py
index 6db9d01f5..1ff202898 100644
--- a/dandi/tests/test_keyring.py
+++ b/dandi/tests/test_keyring.py
@@ -1,5 +1,7 @@
+from __future__ import annotations
+
+from collections.abc import Callable
from pathlib import Path
-from typing import Callable, Optional
from keyring.backend import get_all_keyring
from keyring.backends import fail, null
@@ -66,7 +68,7 @@ def setup_keyringrc_fail() -> None:
@pytest.mark.usefixtures("tmp_home")
def test_keyring_lookup_envvar_no_password(
monkeypatch: pytest.MonkeyPatch,
- rcconfig: Optional[Callable[[], None]],
+ rcconfig: Callable[[], None] | None,
) -> None:
monkeypatch.setenv("PYTHON_KEYRING_BACKEND", "keyring.backends.null.Keyring")
if rcconfig is not None:
@@ -82,7 +84,7 @@ def test_keyring_lookup_envvar_no_password(
@pytest.mark.usefixtures("tmp_home")
def test_keyring_lookup_envvar_password(
monkeypatch: pytest.MonkeyPatch,
- rcconfig: Optional[Callable[[], None]],
+ rcconfig: Callable[[], None] | None,
) -> None:
monkeypatch.setenv("PYTHON_KEYRING_BACKEND", "keyrings.alt.file.PlaintextKeyring")
keyfile.PlaintextKeyring().set_password(
@@ -102,7 +104,7 @@ def test_keyring_lookup_envvar_password(
@pytest.mark.usefixtures("tmp_home")
def test_keyring_lookup_envvar_fail(
monkeypatch: pytest.MonkeyPatch,
- rcconfig: Optional[Callable[[], None]],
+ rcconfig: Callable[[], None] | None,
) -> None:
monkeypatch.setenv("PYTHON_KEYRING_BACKEND", "keyring.backends.fail.Keyring")
if rcconfig is not None:
diff --git a/dandi/tests/test_metadata.py b/dandi/tests/test_metadata.py
index 1d578e8e9..ca52467c1 100644
--- a/dandi/tests/test_metadata.py
+++ b/dandi/tests/test_metadata.py
@@ -1,8 +1,10 @@
+from __future__ import annotations
+
from datetime import datetime, timedelta
import json
from pathlib import Path
import shutil
-from typing import Any, Dict, Optional, Tuple, Union
+from typing import Any
from anys import ANY_AWARE_DATETIME, AnyFullmatch, AnyIn
from dandischema.consts import DANDI_SCHEMA_VERSION
@@ -52,7 +54,7 @@
METADATA_DIR = Path(__file__).with_name("data") / "metadata"
-def test_get_metadata(simple1_nwb: Path, simple1_nwb_metadata: Dict[str, Any]) -> None:
+def test_get_metadata(simple1_nwb: Path, simple1_nwb_metadata: dict[str, Any]) -> None:
target_metadata = simple1_nwb_metadata.copy()
# we will also get some counts
target_metadata["number_of_electrodes"] = 0
@@ -150,7 +152,7 @@ def test_bids_nwb_metadata_integration(bids_examples: Path, tmp_path: Path) -> N
("/", "/"),
],
)
-def test_parse_age(age: str, duration: Union[str, Tuple[str, str]]) -> None:
+def test_parse_age(age: str, duration: str | tuple[str, str]) -> None:
if isinstance(duration, tuple):
duration, ref = duration
else: # birth will be a default ref
@@ -213,7 +215,7 @@ def test_extract_cellLine(s, t):
),
],
)
-def test_parse_error(age: Optional[str], errmsg: str) -> None:
+def test_parse_error(age: str | None, errmsg: str) -> None:
with pytest.raises(ValueError) as excinfo:
parse_age(age)
assert str(excinfo.value) == errmsg
@@ -395,7 +397,7 @@ def test_timedelta2duration(td: timedelta, duration: str) -> None:
),
],
)
-def test_prepare_metadata(filename: str, metadata: Dict[str, Any]) -> None:
+def test_prepare_metadata(filename: str, metadata: dict[str, Any]) -> None:
data = prepare_metadata(metadata).json_dict()
with (METADATA_DIR / filename).open() as fp:
data_as_dict = json.load(fp)
diff --git a/dandi/tests/test_move.py b/dandi/tests/test_move.py
index 70ead9312..1dae78341 100644
--- a/dandi/tests/test_move.py
+++ b/dandi/tests/test_move.py
@@ -1,6 +1,7 @@
+from __future__ import annotations
+
import logging
from pathlib import Path
-from typing import Dict, List, Optional, cast
import pytest
@@ -33,9 +34,9 @@ def moving_dandiset(new_dandiset: SampleDandiset) -> SampleDandiset:
def check_assets(
sample_dandiset: SampleDandiset,
- starting_assets: List[RemoteAsset],
+ starting_assets: list[RemoteAsset],
work_on: str,
- remapping: Dict[str, Optional[str]],
+ remapping: dict[str, str | None],
) -> None:
for asset in starting_assets:
if asset.path in remapping and remapping[asset.path] is None:
@@ -43,19 +44,21 @@ def check_assets(
continue
if work_on in ("local", "both") and asset.path in remapping:
assert not (sample_dandiset.dspath / asset.path).exists()
- assert (
- sample_dandiset.dspath / cast(str, remapping[asset.path])
- ).read_text() == f"{asset.path}\n"
+ remapped = remapping[asset.path]
+ assert isinstance(remapped, str)
+ assert (sample_dandiset.dspath / remapped).read_text() == f"{asset.path}\n"
else:
assert (
sample_dandiset.dspath / asset.path
).read_text() == f"{asset.path}\n"
if work_on in ("remote", "both") and asset.path in remapping:
+ remapped = remapping[asset.path]
+ assert isinstance(remapped, str)
with pytest.raises(NotFoundError):
sample_dandiset.dandiset.get_asset_by_path(asset.path)
assert (
sample_dandiset.dandiset.get_asset_by_path( # type: ignore[attr-defined]
- cast(str, remapping[asset.path])
+ remapped
).blob
== asset.blob # type: ignore[attr-defined]
)
@@ -158,10 +161,10 @@ def check_assets(
def test_move(
monkeypatch: pytest.MonkeyPatch,
moving_dandiset: SampleDandiset,
- srcs: List[str],
+ srcs: list[str],
dest: str,
regex: bool,
- remapping: Dict[str, Optional[str]],
+ remapping: dict[str, str | None],
work_on: str,
) -> None:
starting_assets = list(moving_dandiset.dandiset.get_assets())
@@ -205,7 +208,7 @@ def test_move_error(
monkeypatch: pytest.MonkeyPatch,
moving_dandiset: SampleDandiset,
work_on: str,
- kwargs: Dict[str, str],
+ kwargs: dict[str, str],
) -> None:
starting_assets = list(moving_dandiset.dandiset.get_assets())
monkeypatch.chdir(moving_dandiset.dspath)
diff --git a/dandi/tests/test_pynwb_utils.py b/dandi/tests/test_pynwb_utils.py
index 9f19998bd..0de33d555 100644
--- a/dandi/tests/test_pynwb_utils.py
+++ b/dandi/tests/test_pynwb_utils.py
@@ -1,7 +1,10 @@
+from __future__ import annotations
+
+from collections.abc import Callable
from datetime import datetime, timezone
from pathlib import Path
import re
-from typing import Any, Callable, NoReturn
+from typing import Any, NoReturn
import numpy as np
from pynwb import NWBHDF5IO, NWBFile, TimeSeries
diff --git a/dandi/tests/test_upload.py b/dandi/tests/test_upload.py
index 2d08200cb..6ccdf8990 100644
--- a/dandi/tests/test_upload.py
+++ b/dandi/tests/test_upload.py
@@ -1,7 +1,9 @@
+from __future__ import annotations
+
import os
from pathlib import Path
from shutil import copyfile, rmtree
-from typing import Any, Dict
+from typing import Any
import numpy as np
import pynwb
@@ -261,7 +263,7 @@ def test_upload_sync_zarr(mocker, zarr_dandiset):
def test_upload_invalid_metadata(
- new_dandiset: SampleDandiset, simple1_nwb_metadata: Dict[str, Any]
+ new_dandiset: SampleDandiset, simple1_nwb_metadata: dict[str, Any]
) -> None:
make_nwb_file(
new_dandiset.dspath / "broken.nwb",
diff --git a/dandi/tests/test_utils.py b/dandi/tests/test_utils.py
index 33278a192..cfd3f25a3 100644
--- a/dandi/tests/test_utils.py
+++ b/dandi/tests/test_utils.py
@@ -1,10 +1,10 @@
from __future__ import annotations
+from collections.abc import Iterable
import inspect
import os.path as op
from pathlib import Path
import time
-from typing import Iterable, List
import pytest
import requests
@@ -73,7 +73,7 @@ def test_find_files_dotfiles(tmp_path: Path) -> None:
(tmp_path / ".git" / "config").touch()
(tmpsubdir / ".git").touch() # a ".git" link file
- def relpaths(paths: Iterable[str]) -> List[str]:
+ def relpaths(paths: Iterable[str]) -> list[str]:
return sorted(op.relpath(p, tmp_path) for p in paths)
regular = ["regular", op.join("subdir", "regular")]
diff --git a/dandi/upload.py b/dandi/upload.py
index 60ee14109..bf2e69cd0 100644
--- a/dandi/upload.py
+++ b/dandi/upload.py
@@ -1,13 +1,14 @@
from __future__ import annotations
from collections import defaultdict
+from collections.abc import Iterator
from contextlib import ExitStack
from functools import reduce
import os.path
from pathlib import Path
import re
import time
-from typing import Any, Dict, Iterator, List, Optional, Set, Tuple, TypedDict, Union
+from typing import Any, TypedDict
from unittest.mock import patch
import click
@@ -36,19 +37,19 @@
class Uploaded(TypedDict):
size: int
- errors: List[str]
+ errors: list[str]
def upload(
- paths: Optional[List[Union[str, Path]]] = None,
+ paths: list[str | Path] | None = None,
existing: str = "refresh",
validation: str = "require",
dandi_instance: str | DandiInstance = "dandi",
allow_any_path: bool = False,
upload_dandiset_metadata: bool = False,
devel_debug: bool = False,
- jobs: Optional[int] = None,
- jobs_per_file: Optional[int] = None,
+ jobs: int | None = None,
+ jobs_per_file: int | None = None,
sync: bool = False,
) -> None:
from .dandiapi import DandiAPIClient
@@ -125,13 +126,13 @@ def new_super_len(o):
# we could limit the number of them until
# https://github.com/pyout/pyout/issues/87
# properly addressed
- process_paths: Set[str] = set()
+ process_paths: set[str] = set()
- uploaded_paths: Dict[str, Uploaded] = defaultdict(
+ uploaded_paths: dict[str, Uploaded] = defaultdict(
lambda: {"size": 0, "errors": []}
)
- upload_err: Optional[Exception] = None
+ upload_err: Exception | None = None
validate_ok = True
# TODO: we might want to always yield a full record so no field is not
@@ -213,7 +214,7 @@ def process_path(dfile: DandiFile) -> Iterator[dict]:
#
# Compute checksums
#
- file_etag: Optional[Digest]
+ file_etag: Digest | None
if isinstance(dfile, ZarrAsset):
file_etag = None
else:
@@ -324,7 +325,7 @@ def upload_agg(*ignored: Any) -> str:
process_paths.add(str(dfile.filepath))
- rec: Dict[Any, Any]
+ rec: dict[Any, Any]
if isinstance(dfile, DandisetMetadataFile):
rec = {"path": dandiset_metadata_file}
else:
@@ -365,7 +366,7 @@ def upload_agg(*ignored: Any) -> str:
raise upload_err
if sync:
- relpaths: List[str] = []
+ relpaths: list[str] = []
for p in paths:
rp = os.path.relpath(p, dandiset.path)
relpaths.append("" if rp == "." else rp)
@@ -387,8 +388,8 @@ def check_replace_asset(
local_asset: LocalAsset,
remote_asset: RemoteAsset,
existing: str,
- local_etag: Optional[Digest],
-) -> Tuple[bool, Dict[str, str]]:
+ local_etag: Digest | None,
+) -> tuple[bool, dict[str, str]]:
# Returns a (replace asset, message to yield) tuple
if isinstance(local_asset, ZarrAsset):
return (True, {"message": "exists - reuploading"})
@@ -434,9 +435,9 @@ def check_replace_asset(
return (True, {"message": f"{exists_msg} - reuploading"})
-def skip_file(msg: Any) -> Dict[str, str]:
+def skip_file(msg: Any) -> dict[str, str]:
return {"status": "skipped", "message": str(msg)}
-def error_file(msg: Any) -> Dict[str, str]:
+def error_file(msg: Any) -> dict[str, str]:
return {"status": "ERROR", "message": str(msg)}
diff --git a/dandi/utils.py b/dandi/utils.py
index 2ac9e5bad..a0ba5765c 100644
--- a/dandi/utils.py
+++ b/dandi/utils.py
@@ -1,12 +1,14 @@
from __future__ import annotations
from bisect import bisect
+from collections.abc import Iterable, Iterator
import datetime
from functools import lru_cache
from importlib.metadata import version as importlib_version
import inspect
import io
import itertools
+import json
from mimetypes import guess_type
import os
import os.path as op
@@ -17,19 +19,7 @@
import subprocess
import sys
import types
-from typing import (
- Any,
- Iterable,
- Iterator,
- List,
- Optional,
- Set,
- TextIO,
- Tuple,
- Type,
- TypeVar,
- Union,
-)
+from typing import IO, Any, List, Optional, TypeVar, Union
from urllib.parse import parse_qs, urlparse, urlunparse
import dateutil.parser
@@ -42,6 +32,9 @@
from .consts import DandiInstance, known_instances, known_instances_rev
from .exceptions import BadCliVersionError, CliVersionTooOldError
+AnyPath = Union[str, Path]
+
+
lgr = get_logger()
_sys_excepthook = sys.excepthook # Just in case we ever need original one
@@ -82,13 +75,13 @@ def setup_exceptionhook(ipython: bool = False) -> None:
"""
def _pdb_excepthook(
- type: Type[BaseException],
+ exc_type: type[BaseException],
value: BaseException,
- tb: Optional[types.TracebackType],
+ tb: types.TracebackType | None,
) -> None:
import traceback
- traceback.print_exception(type, value, tb)
+ traceback.print_exception(exc_type, value, tb)
print()
if is_interactive():
import pdb
@@ -122,7 +115,7 @@ def get_utcnow_datetime(microseconds: bool = True) -> datetime.datetime:
def is_same_time(
- *times: Union[datetime.datetime, int, float, str],
+ *times: datetime.datetime | int | float | str,
tolerance: float = 1e-6,
strip_tzinfo: bool = False,
) -> bool:
@@ -155,7 +148,7 @@ def is_same_time(
def ensure_strtime(
- t: Union[str, int, float, datetime.datetime], isoformat: bool = True
+ t: str | int | float | datetime.datetime, isoformat: bool = True
) -> str:
"""Ensures that time is a string in iso format
@@ -189,9 +182,9 @@ def fromisoformat(t: str) -> datetime.datetime:
def ensure_datetime(
- t: Union[datetime.datetime, int, float, str],
+ t: datetime.datetime | int | float | str,
strip_tzinfo: bool = False,
- tz: Optional[datetime.tzinfo] = None,
+ tz: datetime.tzinfo | None = None,
) -> datetime.datetime:
"""Ensures that time is a datetime
@@ -239,15 +232,13 @@ def flattened(it: Iterable) -> list:
#
-def load_jsonl(filename: Union[str, Path]) -> list:
+def load_jsonl(filename: AnyPath) -> list:
"""Load json lines formatted file"""
- import json
-
with open(filename, "r") as f:
return list(map(json.loads, f))
-_encoded_dirsep = r"\\" if on_windows else r"/"
+_encoded_dirsep = re.escape(os.sep)
_VCS_REGEX = r"%s\.(?:git|gitattributes|svn|bzr|hg)(?:%s|$)" % (
_encoded_dirsep,
_encoded_dirsep,
@@ -255,19 +246,16 @@ def load_jsonl(filename: Union[str, Path]) -> list:
_DATALAD_REGEX = r"%s\.(?:datalad)(?:%s|$)" % (_encoded_dirsep, _encoded_dirsep)
-AnyPath = Union[str, Path]
-
-
def find_files(
regex: str,
- paths: Union[List[AnyPath], Tuple[AnyPath, ...], Set[AnyPath], AnyPath] = os.curdir,
- exclude: Optional[str] = None,
+ paths: list[AnyPath] | tuple[AnyPath, ...] | set[AnyPath] | AnyPath = os.curdir,
+ exclude: str | None = None,
exclude_dotfiles: bool = True,
exclude_dotdirs: bool = True,
exclude_vcs: bool = True,
exclude_datalad: bool = False,
dirs: bool = False,
- dirs_avoid: Optional[str] = None,
+ dirs_avoid: str | None = None,
) -> Iterator[str]:
"""Generator to find files matching regex
@@ -358,8 +346,8 @@ def good_file(path: str) -> bool:
def list_paths(
- dirpath: Union[str, Path], dirs: bool = False, exclude_vcs: bool = True
-) -> List[Path]:
+ dirpath: AnyPath, dirs: bool = False, exclude_vcs: bool = True
+) -> list[Path]:
return sorted(
map(
Path,
@@ -375,10 +363,10 @@ def list_paths(
)
-_cp_supports_reflink: Optional[bool] = False if on_windows else None
+_cp_supports_reflink: bool | None = False if on_windows else None
-def copy_file(src: Union[str, Path], dst: Union[str, Path]) -> None:
+def copy_file(src: AnyPath, dst: AnyPath) -> None:
"""Copy file from src to dst"""
global _cp_supports_reflink
if _cp_supports_reflink is None:
@@ -399,14 +387,14 @@ def copy_file(src: Union[str, Path], dst: Union[str, Path]) -> None:
shutil.copy2(src, dst)
-def move_file(src: Union[str, Path], dst: Union[str, Path]) -> Any:
+def move_file(src: AnyPath, dst: AnyPath) -> Any:
"""Move file from src to dst"""
return shutil.move(str(src), str(dst))
def find_parent_directory_containing(
- filename: Union[str, Path], path: Union[str, Path, None] = None
-) -> Optional[Path]:
+ filename: AnyPath, path: AnyPath | None = None
+) -> Path | None:
"""Find a directory, on the path to 'path' containing filename
if no 'path' - path from cwd. If 'path' is not absolute, absolute path
@@ -445,7 +433,7 @@ def yaml_dump(rec: Any) -> str:
return out.getvalue()
-def yaml_load(f: Union[str, TextIO], typ: Optional[str] = None) -> Any:
+def yaml_load(f: str | IO[str], typ: str | None = None) -> Any:
"""
Load YAML source from a file or string.
@@ -475,7 +463,7 @@ def with_pathsep(path: str) -> str:
return path + op.sep if not path.endswith(op.sep) else path
-def _get_normalized_paths(path: str, prefix: str) -> Tuple[str, str]:
+def _get_normalized_paths(path: str, prefix: str) -> tuple[str, str]:
if op.isabs(path) != op.isabs(prefix):
raise ValueError(
"Both paths must either be absolute or relative. "
@@ -524,7 +512,7 @@ def shortened_repr(value: Any, length: int = 30) -> str:
def __auto_repr__(obj: Any) -> str:
- attr_names: Tuple[str, ...] = ()
+ attr_names: tuple[str, ...] = ()
if hasattr(obj, "__dict__"):
attr_names += tuple(obj.__dict__.keys())
if hasattr(obj, "__slots__"):
@@ -624,7 +612,7 @@ def get_instance(dandi_instance_id: str | DandiInstance) -> DandiInstance:
@lru_cache
def _get_instance(
- url: str, is_api: bool, instance: Optional[DandiInstance], dandi_id: Optional[str]
+ url: str, is_api: bool, instance: DandiInstance | None, dandi_id: str | None
) -> DandiInstance:
try:
if is_api:
@@ -684,7 +672,7 @@ def is_url(s: str) -> bool:
# Slashes are not required after "dandi:" so as to support "DANDI:"
-def get_module_version(module: Union[str, types.ModuleType]) -> Optional[str]:
+def get_module_version(module: str | types.ModuleType) -> str | None:
"""Return version of the module
Return module's `__version__` if present, or use importlib
@@ -694,7 +682,7 @@ def get_module_version(module: Union[str, types.ModuleType]) -> Optional[str]:
-------
object
"""
- modobj: Optional[types.ModuleType]
+ modobj: types.ModuleType | None
if isinstance(module, str):
modobj = sys.modules.get(module)
mod_name = module
@@ -716,7 +704,7 @@ def get_module_version(module: Union[str, types.ModuleType]) -> Optional[str]:
return version
-def pluralize(n: int, word: str, plural: Optional[str] = None) -> str:
+def pluralize(n: int, word: str, plural: str | None = None) -> str:
if n == 1:
return f"{n} {word}"
else:
@@ -805,7 +793,7 @@ def check_dandi_version() -> None:
T = TypeVar("T")
-def chunked(iterable: Iterable[T], size: int) -> Iterator[List[T]]:
+def chunked(iterable: Iterable[T], size: int) -> Iterator[list[T]]:
# cf. chunked() from more-itertools
i = iter(iterable)
while True:
diff --git a/dandi/validate.py b/dandi/validate.py
index 1c09ea4fe..3746434f8 100644
--- a/dandi/validate.py
+++ b/dandi/validate.py
@@ -1,8 +1,8 @@
from __future__ import annotations
+from collections.abc import Iterator
import os
from pathlib import Path
-from typing import Iterator, Optional, Union
from . import __version__
from .consts import dandiset_metadata_file
@@ -17,8 +17,8 @@
def validate_bids(
- *paths: Union[str, Path],
- schema_version: Optional[str] = None,
+ *paths: str | Path,
+ schema_version: str | None = None,
) -> list[ValidationResult]:
"""Validate BIDS paths.
@@ -127,7 +127,7 @@ def validate_bids(
def validate(
*paths: str | Path,
- schema_version: Optional[str] = None,
+ schema_version: str | None = None,
devel_debug: bool = False,
allow_any_path: bool = False,
) -> Iterator[ValidationResult]:
diff --git a/dandi/validate_types.py b/dandi/validate_types.py
index cb9c8edfb..3e1044bd1 100644
--- a/dandi/validate_types.py
+++ b/dandi/validate_types.py
@@ -1,14 +1,15 @@
+from __future__ import annotations
+
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
-from typing import Dict, List, Optional
@dataclass
class ValidationOrigin:
name: str
version: str
- bids_version: Optional[str] = None
+ bids_version: str | None = None
class Severity(Enum):
@@ -30,29 +31,29 @@ class ValidationResult:
id: str
origin: ValidationOrigin
scope: Scope
- severity: Optional[Severity] = None
+ severity: Severity | None = None
# asset_paths, if not populated, assumes [.path], but could be smth like
# {"path": "task-broken_bold.json",
# "asset_paths": ["sub-01/func/sub-01_task-broken_bold.json",
# "sub-02/func/sub-02_task-broken_bold.json"]}
- asset_paths: Optional[List[str]] = None
+ asset_paths: list[str] | None = None
# e.g. path within hdf5 file hierarchy
# As a dict we will map asset_paths into location within them
- within_asset_paths: Optional[Dict[str, str]] = None
- dandiset_path: Optional[Path] = None
- dataset_path: Optional[Path] = None
+ within_asset_paths: dict[str, str] | None = None
+ dandiset_path: Path | None = None
+ dataset_path: Path | None = None
# TODO: locations analogous to nwbinspector.InspectorMessage.location
# but due to multiple possible asset_paths, we might want to have it
# as a dict to point to location in some or each affected assets
- message: Optional[str] = None
- metadata: Optional[dict] = None
+ message: str | None = None
+ metadata: dict | None = None
# ??? should it become a list e.g. for errors which rely on
# multiple files, like mismatch between .nii.gz header and .json sidecar
- path: Optional[Path] = None
- path_regex: Optional[str] = None
+ path: Path | None = None
+ path_regex: str | None = None
@property
- def purview(self) -> Optional[str]:
+ def purview(self) -> str | None:
if self.path is not None:
return str(self.path)
elif self.path_regex is not None: