Skip to content

Commit

Permalink
change Dict and List to dict and list
Browse files Browse the repository at this point in the history
  • Loading branch information
Phil Varner committed Apr 22, 2024
1 parent 3b1ab5b commit 553b5f2
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 76 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,18 @@ This library is based on a [branch of cirrus-lib](https://github.com/cirrus-geo/
## Quickstart for Creating New Tasks

```python
from typing import Any, Dict, List
from typing import Any

from stactask import Task

class MyTask(Task):
name = "my-task"
description = "this task does it all"

def validate(self, payload: Dict[str, Any]) -> bool:
def validate(self, payload: dict[str, Any]) -> bool:
return len(self.items) == 1

def process(self, **kwargs: Any) -> List[Dict[str, Any]]:
def process(self, **kwargs: Any) -> list[dict[str, Any]]:
item = self.items[0]

# download a datafile
Expand Down
26 changes: 13 additions & 13 deletions stactask/asset_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging
import os
from os import path as op
from typing import Any, Dict, Iterable, List, Optional, Union
from typing import Any, Iterable, Optional, Union
from urllib.parse import urlparse

import fsspec
Expand Down Expand Up @@ -37,7 +37,7 @@ async def download_file(fs: AbstractFileSystem, src: str, dest: str) -> None:

async def download_item_assets(
item: Item,
assets: Optional[List[str]] = None,
assets: Optional[list[str]] = None,
save_item: bool = True,
overwrite: bool = False,
path_template: str = "${collection}/${id}",
Expand Down Expand Up @@ -91,40 +91,40 @@ async def download_item_assets(
return new_item


async def download_items_assets(items: Iterable[Item], **kwargs: Any) -> List[Item]:
async def download_items_assets(items: Iterable[Item], **kwargs: Any) -> list[Item]:
tasks = []
for item in items:
tasks.append(asyncio.create_task(download_item_assets(item, **kwargs)))
new_items: List[Item] = await asyncio.gather(*tasks)
new_items: list[Item] = await asyncio.gather(*tasks)
return new_items


def upload_item_assets_to_s3(
item: Item,
assets: Optional[List[str]] = None,
public_assets: Union[None, List[str], str] = None,
assets: Optional[list[str]] = None,
public_assets: Union[None, list[str], str] = None,
path_template: str = "${collection}/${id}",
s3_urls: bool = False,
headers: Optional[Dict[str, Any]] = None,
headers: Optional[dict[str, Any]] = None,
s3_client: Optional[s3] = None,
**kwargs: Any,
) -> Item:
"""Upload Item assets to s3 bucket
"""Upload Item assets to an S3 bucket
Args:
item (Dict): STAC Item
assets (List[str], optional): List of asset keys to upload. Defaults to None.
public_assets (List[str], optional): List of assets keys that should be
item (Item): STAC Item
assets (list[str], optional): List of asset keys to upload. Defaults to None.
public_assets (list[str], optional): List of assets keys that should be
public. Defaults to [].
path_template (str, optional): Path string template. Defaults to
'${collection}/${id}'.
s3_urls (bool, optional): Return s3 URLs instead of http URLs. Defaults
to False.
headers (Dict, optional): Dictionary of headers to set on uploaded
headers (dict, optional): Dictionary of headers to set on uploaded
assets. Defaults to {}.
s3_client (boto3utils.s3, optional): Use this s3 object instead of the default
global one. Defaults to None.
Returns:
Dict: A new STAC Item with uploaded assets pointing to newly uploaded file URLs
Item: A new STAC Item with uploaded assets pointing to newly uploaded file URLs
"""

if s3_client is None:
Expand Down
52 changes: 26 additions & 26 deletions stactask/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from pathlib import Path
from shutil import rmtree
from tempfile import mkdtemp
from typing import Any, Callable, Dict, Iterable, List, Optional, Union
from typing import Any, Callable, Iterable, Optional, Union

import fsspec
from boto3utils import s3
Expand Down Expand Up @@ -66,7 +66,7 @@ class Task(ABC):

def __init__(
self: "Task",
payload: Dict[str, Any],
payload: dict[str, Any],
workdir: Optional[PathLike] = None,
save_workdir: Optional[bool] = None,
skip_upload: bool = False, # deprecated
Expand Down Expand Up @@ -105,17 +105,17 @@ def __init__(
)

@property
def process_definition(self) -> Dict[str, Any]:
def process_definition(self) -> dict[str, Any]:
process = self._payload.get("process", {})
if isinstance(process, dict):
return process
else:
raise ValueError(f"process is not a dict: {type(process)}")

@property
def parameters(self) -> Dict[str, Any]:
def parameters(self) -> dict[str, Any]:
task_configs = self.process_definition.get("tasks", [])
if isinstance(task_configs, List):
if isinstance(task_configs, list):
warnings.warn(
"task configs is list, use a dictionary instead",
DeprecationWarning,
Expand All @@ -125,13 +125,13 @@ def parameters(self) -> Dict[str, Any]:
if len(task_config_list) == 0:
return {}
else:
task_config: Dict[str, Any] = task_config_list[0]
task_config: dict[str, Any] = task_config_list[0]
parameters = task_config.get("parameters", {})
if isinstance(parameters, dict):
return parameters
else:
raise ValueError(f"parameters is not a dict: {type(parameters)}")
elif isinstance(task_configs, Dict):
elif isinstance(task_configs, dict):
config = task_configs.get(self.name, {})
if isinstance(config, dict):
return config
Expand All @@ -143,23 +143,23 @@ def parameters(self) -> Dict[str, Any]:
raise ValueError(f"unexpected value for 'tasks': {task_configs}")

@property
def upload_options(self) -> Dict[str, Any]:
def upload_options(self) -> dict[str, Any]:
upload_options = self.process_definition.get("upload_options", {})
if isinstance(upload_options, dict):
return upload_options
else:
raise ValueError(f"upload_options is not a dict: {type(upload_options)}")

@property
def collection_mapping(self) -> Dict[str, str]:
def collection_mapping(self) -> dict[str, str]:
collection_mapping = self.upload_options.get("collections", {})
if isinstance(collection_mapping, dict):
return collection_mapping
else:
raise ValueError(f"collections is not a dict: {type(collection_mapping)}")

@property
def items_as_dicts(self) -> List[Dict[str, Any]]:
def items_as_dicts(self) -> list[dict[str, Any]]:
features = self._payload.get("features", [])
if isinstance(features, list):
return features
Expand All @@ -172,14 +172,14 @@ def items(self) -> ItemCollection:
return ItemCollection.from_dict(items_dict, preserve_dict=True)

@classmethod
def validate(cls, payload: Dict[str, Any]) -> bool:
def validate(cls, payload: dict[str, Any]) -> bool:
"""Validates the payload and returns True if valid. If invalid, raises
``stactask.exceptions.FailedValidation`` or returns False."""
# put validation logic on input Items and process definition here
return True

@classmethod
def add_software_version(cls, items: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
def add_software_version(cls, items: list[dict[str, Any]]) -> list[dict[str, Any]]:
warnings.warn(
"add_software_version is deprecated, "
"use add_software_version_to_item instead",
Expand All @@ -191,7 +191,7 @@ def add_software_version(cls, items: List[Dict[str, Any]]) -> List[Dict[str, Any
return modified_items

@classmethod
def add_software_version_to_item(cls, item: Dict[str, Any]) -> Dict[str, Any]:
def add_software_version_to_item(cls, item: dict[str, Any]) -> dict[str, Any]:
"""Adds software version information to a single item.
Uses the processing extension.
Expand All @@ -200,7 +200,7 @@ def add_software_version_to_item(cls, item: Dict[str, Any]) -> Dict[str, Any]:
item: A single STAC item
Returns:
Dict[str, Any]: The same item with processing information applied.
dict[str, Any]: The same item with processing information applied.
"""
processing_ext = (
"https://stac-extensions.github.io/processing/v1.1.0/schema.json"
Expand Down Expand Up @@ -249,7 +249,7 @@ def download_item_assets(
Args:
item (pystac.Item): STAC Item for which assets need be downloaded.
assets (Optional[List[str]]): List of asset keys to download.
assets (Optional[list[str]]): List of asset keys to download.
Defaults to all assets.
path_template (Optional[str]): String to be interpolated to specify
where to store downloaded files.
Expand All @@ -274,15 +274,15 @@ def download_items_assets(
path_template: str = "${collection}/${id}",
keep_original_filenames: bool = False,
**kwargs: Any,
) -> List[Item]:
) -> list[Item]:
"""Download provided asset keys for the given items. Assets are
saved in workdir in a directory (as specified by path_template), and
the items are updated with the new asset hrefs.
Args:
items (List[pystac.Item]): List of STAC Items for which assets need
items (list[pystac.Item]): List of STAC Items for which assets need
be downloaded.
assets (Optional[List[str]]): List of asset keys to download.
assets (Optional[list[str]]): List of asset keys to download.
Defaults to all assets.
path_template (Optional[str]): String to be interpolated to specify
where to store downloaded files.
Expand All @@ -304,7 +304,7 @@ def download_items_assets(
def upload_item_assets_to_s3(
self,
item: Item,
assets: Optional[List[str]] = None,
assets: Optional[list[str]] = None,
s3_client: Optional[s3] = None,
) -> Item:
if self._upload:
Expand All @@ -319,7 +319,7 @@ def upload_item_assets_to_s3(
def _is_local_asset(self, asset: Asset) -> bool:
return bool(asset.href.startswith(str(self._workdir)))

def _get_local_asset_keys(self, item: Item) -> List[str]:
def _get_local_asset_keys(self, item: Item) -> list[str]:
return [
key for key, asset in item.assets.items() if self._is_local_asset(asset)
]
Expand All @@ -337,7 +337,7 @@ def upload_local_item_assets_to_s3(

# this should be in PySTAC
@staticmethod
def create_item_from_item(item: Dict[str, Any]) -> Dict[str, Any]:
def create_item_from_item(item: dict[str, Any]) -> dict[str, Any]:
new_item = deepcopy(item)
# create a derived output item
links = [
Expand All @@ -356,7 +356,7 @@ def create_item_from_item(item: Dict[str, Any]) -> Dict[str, Any]:
return new_item

@abstractmethod
def process(self, **kwargs: Any) -> List[Dict[str, Any]]:
def process(self, **kwargs: Any) -> list[dict[str, Any]]:
"""Main task logic - virtual
Returns:
Expand All @@ -366,7 +366,7 @@ def process(self, **kwargs: Any) -> List[Dict[str, Any]]:
# do some stuff
pass

def post_process_item(self, item: Dict[str, Any]) -> Dict[str, Any]:
def post_process_item(self, item: dict[str, Any]) -> dict[str, Any]:
"""Perform post-processing operations on an item.
E.g. add software version information.
Expand All @@ -380,15 +380,15 @@ def post_process_item(self, item: Dict[str, Any]) -> Dict[str, Any]:
item: An item produced by :py:meth:`Task.process`
Returns:
Dict[str, Any]: The item with any additional attributes applied.
dict[str, Any]: The item with any additional attributes applied.
"""
assert "stac_extensions" in item
assert isinstance(item["stac_extensions"], list)
item["stac_extensions"].sort()
return item

@classmethod
def handler(cls, payload: Dict[str, Any], **kwargs: Any) -> Dict[str, Any]:
def handler(cls, payload: dict[str, Any], **kwargs: Any) -> dict[str, Any]:
task = None
try:
if "href" in payload or "url" in payload:
Expand All @@ -414,7 +414,7 @@ def handler(cls, payload: Dict[str, Any], **kwargs: Any) -> Dict[str, Any]:
task.cleanup_workdir()

@classmethod
def parse_args(cls, args: List[str]) -> Dict[str, Any]:
def parse_args(cls, args: list[str]) -> dict[str, Any]:
dhf = argparse.ArgumentDefaultsHelpFormatter
parser0 = argparse.ArgumentParser(description=cls.description)
parser0.add_argument(
Expand Down
12 changes: 6 additions & 6 deletions stactask/utils.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
from typing import Any, Dict, Optional
from typing import Any, Optional

from jsonpath_ng.ext import parser


def stac_jsonpath_match(item: Dict[str, Any], expr: str) -> bool:
def stac_jsonpath_match(item: dict[str, Any], expr: str) -> bool:
"""Match jsonpath expression against STAC JSON.
Use https://jsonpath.com to experiment with JSONpath
and https://regex101.com to experiment with regex
Args:
item (Dict): A STAC Item
item (dict): A STAC Item represented as a dict
expr (str): A valid JSONPath expression
Raises:
Expand All @@ -22,15 +22,15 @@ def stac_jsonpath_match(item: Dict[str, Any], expr: str) -> bool:


def find_collection(
collection_mapping: Dict[str, str], item: Dict[str, Any]
collection_mapping: dict[str, str], item: dict[str, Any]
) -> Optional[str]:
"""Find the collection for a given STAC Item represented as a dictionary from a
dictionary of collection names to JSONPath expressions.
Args:
collection_mapping (Dict): A dictionary of collection names to JSONPath
collection_mapping (dict): A dictionary of collection names to JSONPath
expressions.
item (Dict): A STAC Item
item (dict): A STAC Item
Returns:
Optional[str]: Returns None if no JSONPath expression matches, returns a
Expand Down
4 changes: 1 addition & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from typing import List

import pytest


Expand All @@ -23,7 +21,7 @@ def pytest_configure(config: pytest.Config) -> None:


def pytest_collection_modifyitems(
config: pytest.Config, items: List[pytest.Item]
config: pytest.Config, items: list[pytest.Item]
) -> None:
if not config.getoption("--runslow"):
skip_slow = pytest.mark.skip(reason="need --runslow option to run")
Expand Down
10 changes: 5 additions & 5 deletions tests/tasks.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Dict, List
from typing import Any

from stactask import Task
from stactask.exceptions import FailedValidation
Expand All @@ -8,7 +8,7 @@ class NothingTask(Task):
name = "nothing-task"
description = "this task does nothing"

def process(self, **kwargs: Any) -> List[Dict[str, Any]]:
def process(self, **kwargs: Any) -> list[dict[str, Any]]:
return self.items_as_dicts


Expand All @@ -17,19 +17,19 @@ class FailValidateTask(Task):
description = "this task always fails validation"

@classmethod
def validate(self, payload: Dict[str, Any]) -> bool:
def validate(self, payload: dict[str, Any]) -> bool:
if payload:
raise FailedValidation("Extra context about what went wrong")
return True

def process(self, **kwargs: Any) -> List[Dict[str, Any]]:
def process(self, **kwargs: Any) -> list[dict[str, Any]]:
return self.items_as_dicts


class DerivedItemTask(Task):
name = "derived-item-task"
description = "this task creates a derived item"

def process(self, **kwargs: Any) -> List[Dict[str, Any]]:
def process(self, **kwargs: Any) -> list[dict[str, Any]]:
assert kwargs["parameter"] == "value"
return [self.create_item_from_item(self.items_as_dicts[0])]
Loading

0 comments on commit 553b5f2

Please sign in to comment.