Skip to content

Commit

Permalink
Merge branch 'main' into pv/stac-asset
Browse files Browse the repository at this point in the history
  • Loading branch information
Phil Varner committed May 7, 2024
2 parents bd526bf + 7e66cf5 commit 134847b
Show file tree
Hide file tree
Showing 11 changed files with 77 additions and 79 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,6 @@ repos:
- pytest
- types-setuptools == 65.7.0.3
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.3.2
rev: v0.4.1
hooks:
- id: ruff
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,18 @@ This library is based on a [branch of cirrus-lib](https://github.com/cirrus-geo/
## Quickstart for Creating New Tasks

```python
from typing import Any, Dict, List
from typing import Any

from stactask import Task, DownloadConfig

class MyTask(Task):
name = "my-task"
description = "this task does it all"

def validate(self, payload: Dict[str, Any]) -> bool:
def validate(self, payload: dict[str, Any]) -> bool:
return len(self.items) == 1

def process(self, **kwargs: Any) -> List[Dict[str, Any]]:
def process(self, **kwargs: Any) -> list[dict[str, Any]]:
item = self.items[0]

# download a datafile
Expand Down Expand Up @@ -74,7 +74,7 @@ ItemCollection.
| -------------- | ------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| description | string | Optional description of the process configuration |
| upload_options | UploadOptions | Options used when uploading assets to a remote server |
| tasks | Map<str, Map> | Dictionary of task configurations. A List of [task configurations](#taskconfig-object) is supported for backwards compatibility reasons, but a dictionary should be preferred. |
| tasks | Map<str, Map> | Dictionary of task configurations. A list of [task configurations](#taskconfig-object) is supported for backwards compatibility reasons, but a dictionary should be preferred. |

#### UploadOptions Object

Expand Down
2 changes: 1 addition & 1 deletion docs/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
sphinx~=7.2.6
sphinx~=7.3.7
sphinx-rtd-theme~=2.0.0
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ dev = [
"codespell~=2.2.5",
"mypy~=1.9",
"pre-commit~=3.7",
"ruff~=0.3.1",
"ruff~=0.4.1",
"types-setuptools~=69.0",
"boto3-stubs",
]
Expand Down
22 changes: 11 additions & 11 deletions stactask/asset_io.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
import logging
from os import path as op
from typing import Any, Dict, Iterable, List, Optional, Union
from typing import Any, Iterable, Optional, Union

import stac_asset
from boto3utils import s3
Expand Down Expand Up @@ -36,7 +36,7 @@ async def download_items_assets(
path_template: str = "${collection}/${id}",
config: Optional[DownloadConfig] = None,
keep_non_downloaded: bool = True,
) -> List[Item]:
) -> list[Item]:
return await asyncio.gather(
*[
asyncio.create_task(
Expand All @@ -54,30 +54,30 @@ async def download_items_assets(

def upload_item_assets_to_s3(
item: Item,
assets: Optional[List[str]] = None,
public_assets: Union[None, List[str], str] = None,
assets: Optional[list[str]] = None,
public_assets: Union[None, list[str], str] = None,
path_template: str = "${collection}/${id}",
s3_urls: bool = False,
headers: Optional[Dict[str, Any]] = None,
headers: Optional[dict[str, Any]] = None,
s3_client: Optional[s3] = None,
**kwargs: Any, # unused, but retain to permit unused attributes from upload_options
) -> Item:
"""Upload Item assets to s3 bucket
"""Upload Item assets to an S3 bucket
Args:
item (Dict): STAC Item
assets (List[str], optional): List of asset keys to upload. Defaults to None.
public_assets (List[str], optional): List of assets keys that should be
item (Item): STAC Item
assets (list[str], optional): List of asset keys to upload. Defaults to None.
public_assets (list[str], optional): List of assets keys that should be
public. Defaults to [].
path_template (str, optional): Path string template. Defaults to
'${collection}/${id}'.
s3_urls (bool, optional): Return s3 URLs instead of http URLs. Defaults
to False.
headers (Dict, optional): Dictionary of headers to set on uploaded
headers (dict, optional): Dictionary of headers to set on uploaded
assets. Defaults to {}.
s3_client (boto3utils.s3, optional): Use this s3 object instead of the default
global one. Defaults to None.
Returns:
Dict: A new STAC Item with uploaded assets pointing to newly uploaded file URLs
Item: A new STAC Item with uploaded assets pointing to newly uploaded file URLs
"""

if s3_client is None:
Expand Down
52 changes: 26 additions & 26 deletions stactask/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from pathlib import Path
from shutil import rmtree
from tempfile import mkdtemp
from typing import Any, Callable, Dict, Iterable, List, Optional, Union
from typing import Any, Callable, Iterable, Optional, Union

import fsspec
from boto3utils import s3
Expand Down Expand Up @@ -67,7 +67,7 @@ class Task(ABC):

def __init__(
self: "Task",
payload: Dict[str, Any],
payload: dict[str, Any],
workdir: Optional[PathLike] = None,
save_workdir: Optional[bool] = None,
skip_upload: bool = False, # deprecated
Expand Down Expand Up @@ -106,17 +106,17 @@ def __init__(
)

@property
def process_definition(self) -> Dict[str, Any]:
def process_definition(self) -> dict[str, Any]:
process = self._payload.get("process", {})
if isinstance(process, dict):
return process
else:
raise ValueError(f"process is not a dict: {type(process)}")

@property
def parameters(self) -> Dict[str, Any]:
def parameters(self) -> dict[str, Any]:
task_configs = self.process_definition.get("tasks", [])
if isinstance(task_configs, List):
if isinstance(task_configs, list):
warnings.warn(
"task configs is list, use a dictionary instead",
DeprecationWarning,
Expand All @@ -126,13 +126,13 @@ def parameters(self) -> Dict[str, Any]:
if len(task_config_list) == 0:
return {}
else:
task_config: Dict[str, Any] = task_config_list[0]
task_config: dict[str, Any] = task_config_list[0]
parameters = task_config.get("parameters", {})
if isinstance(parameters, dict):
return parameters
else:
raise ValueError(f"parameters is not a dict: {type(parameters)}")
elif isinstance(task_configs, Dict):
elif isinstance(task_configs, dict):
config = task_configs.get(self.name, {})
if isinstance(config, dict):
return config
Expand All @@ -144,23 +144,23 @@ def parameters(self) -> Dict[str, Any]:
raise ValueError(f"unexpected value for 'tasks': {task_configs}")

@property
def upload_options(self) -> Dict[str, Any]:
def upload_options(self) -> dict[str, Any]:
upload_options = self.process_definition.get("upload_options", {})
if isinstance(upload_options, dict):
return upload_options
else:
raise ValueError(f"upload_options is not a dict: {type(upload_options)}")

@property
def collection_mapping(self) -> Dict[str, str]:
def collection_mapping(self) -> dict[str, str]:
collection_mapping = self.upload_options.get("collections", {})
if isinstance(collection_mapping, dict):
return collection_mapping
else:
raise ValueError(f"collections is not a dict: {type(collection_mapping)}")

@property
def items_as_dicts(self) -> List[Dict[str, Any]]:
def items_as_dicts(self) -> list[dict[str, Any]]:
features = self._payload.get("features", [])
if isinstance(features, list):
return features
Expand All @@ -173,14 +173,14 @@ def items(self) -> ItemCollection:
return ItemCollection.from_dict(items_dict, preserve_dict=True)

@classmethod
def validate(cls, payload: Dict[str, Any]) -> bool:
def validate(cls, payload: dict[str, Any]) -> bool:
"""Validates the payload and returns True if valid. If invalid, raises
``stactask.exceptions.FailedValidation`` or returns False."""
# put validation logic on input Items and process definition here
return True

@classmethod
def add_software_version(cls, items: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
def add_software_version(cls, items: list[dict[str, Any]]) -> list[dict[str, Any]]:
warnings.warn(
"add_software_version is deprecated, "
"use add_software_version_to_item instead",
Expand All @@ -192,7 +192,7 @@ def add_software_version(cls, items: List[Dict[str, Any]]) -> List[Dict[str, Any
return modified_items

@classmethod
def add_software_version_to_item(cls, item: Dict[str, Any]) -> Dict[str, Any]:
def add_software_version_to_item(cls, item: dict[str, Any]) -> dict[str, Any]:
"""Adds software version information to a single item.
Uses the processing extension.
Expand All @@ -201,7 +201,7 @@ def add_software_version_to_item(cls, item: Dict[str, Any]) -> Dict[str, Any]:
item: A single STAC item
Returns:
Dict[str, Any]: The same item with processing information applied.
dict[str, Any]: The same item with processing information applied.
"""
processing_ext = (
"https://stac-extensions.github.io/processing/v1.1.0/schema.json"
Expand Down Expand Up @@ -250,7 +250,7 @@ def download_item_assets(
Args:
item (pystac.Item): STAC Item for which assets need be downloaded.
assets (Optional[List[str]]): List of asset keys to download.
assets (Optional[list[str]]): List of asset keys to download.
Defaults to all assets.
path_template (Optional[str]): String to be interpolated to specify
where to store downloaded files.
Expand All @@ -272,15 +272,15 @@ def download_items_assets(
path_template: str = "${collection}/${id}",
config: Optional[DownloadConfig] = None,
keep_non_downloaded: bool = True,
) -> List[Item]:
) -> list[Item]:
"""Download provided asset keys for the given items. Assets are
saved in workdir in a directory (as specified by path_template), and
the items are updated with the new asset hrefs.
Args:
items (List[pystac.Item]): List of STAC Items for which assets need
items (list[pystac.Item]): List of STAC Items for which assets need
be downloaded.
assets (Optional[List[str]]): List of asset keys to download.
assets (Optional[list[str]]): List of asset keys to download.
Defaults to all assets.
path_template (Optional[str]): String to be interpolated to specify
where to store downloaded files.
Expand All @@ -301,7 +301,7 @@ def download_items_assets(
def upload_item_assets_to_s3(
self,
item: Item,
assets: Optional[List[str]] = None,
assets: Optional[list[str]] = None,
s3_client: Optional[s3] = None,
) -> Item:
if self._upload:
Expand All @@ -316,7 +316,7 @@ def upload_item_assets_to_s3(
def _is_local_asset(self, asset: Asset) -> bool:
return bool(asset.href.startswith(str(self._workdir)))

def _get_local_asset_keys(self, item: Item) -> List[str]:
def _get_local_asset_keys(self, item: Item) -> list[str]:
return [
key for key, asset in item.assets.items() if self._is_local_asset(asset)
]
Expand All @@ -334,7 +334,7 @@ def upload_local_item_assets_to_s3(

# this should be in PySTAC
@staticmethod
def create_item_from_item(item: Dict[str, Any]) -> Dict[str, Any]:
def create_item_from_item(item: dict[str, Any]) -> dict[str, Any]:
new_item = deepcopy(item)
# create a derived output item
links = [
Expand All @@ -353,7 +353,7 @@ def create_item_from_item(item: Dict[str, Any]) -> Dict[str, Any]:
return new_item

@abstractmethod
def process(self, **kwargs: Any) -> List[Dict[str, Any]]:
def process(self, **kwargs: Any) -> list[dict[str, Any]]:
"""Main task logic - virtual
Returns:
Expand All @@ -363,7 +363,7 @@ def process(self, **kwargs: Any) -> List[Dict[str, Any]]:
# do some stuff
pass

def post_process_item(self, item: Dict[str, Any]) -> Dict[str, Any]:
def post_process_item(self, item: dict[str, Any]) -> dict[str, Any]:
"""Perform post-processing operations on an item.
E.g. add software version information.
Expand All @@ -377,15 +377,15 @@ def post_process_item(self, item: Dict[str, Any]) -> Dict[str, Any]:
item: An item produced by :py:meth:`Task.process`
Returns:
Dict[str, Any]: The item with any additional attributes applied.
dict[str, Any]: The item with any additional attributes applied.
"""
assert "stac_extensions" in item
assert isinstance(item["stac_extensions"], list)
item["stac_extensions"].sort()
return item

@classmethod
def handler(cls, payload: Dict[str, Any], **kwargs: Any) -> Dict[str, Any]:
def handler(cls, payload: dict[str, Any], **kwargs: Any) -> dict[str, Any]:
task = None
try:
if "href" in payload or "url" in payload:
Expand All @@ -411,7 +411,7 @@ def handler(cls, payload: Dict[str, Any], **kwargs: Any) -> Dict[str, Any]:
task.cleanup_workdir()

@classmethod
def parse_args(cls, args: List[str]) -> Dict[str, Any]:
def parse_args(cls, args: list[str]) -> dict[str, Any]:
dhf = argparse.ArgumentDefaultsHelpFormatter
parser0 = argparse.ArgumentParser(description=cls.description)
parser0.add_argument(
Expand Down
12 changes: 6 additions & 6 deletions stactask/utils.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
from typing import Any, Dict, Optional
from typing import Any, Optional

from jsonpath_ng.ext import parser


def stac_jsonpath_match(item: Dict[str, Any], expr: str) -> bool:
def stac_jsonpath_match(item: dict[str, Any], expr: str) -> bool:
"""Match jsonpath expression against STAC JSON.
Use https://jsonpath.com to experiment with JSONpath
and https://regex101.com to experiment with regex
Args:
item (Dict): A STAC Item
item (dict): A STAC Item represented as a dict
expr (str): A valid JSONPath expression
Raises:
Expand All @@ -22,15 +22,15 @@ def stac_jsonpath_match(item: Dict[str, Any], expr: str) -> bool:


def find_collection(
collection_mapping: Dict[str, str], item: Dict[str, Any]
collection_mapping: dict[str, str], item: dict[str, Any]
) -> Optional[str]:
"""Find the collection for a given STAC Item represented as a dictionary from a
dictionary of collection names to JSONPath expressions.
Args:
collection_mapping (Dict): A dictionary of collection names to JSONPath
collection_mapping (dict): A dictionary of collection names to JSONPath
expressions.
item (Dict): A STAC Item
item (dict): A STAC Item
Returns:
Optional[str]: Returns None if no JSONPath expression matches, returns a
Expand Down
4 changes: 1 addition & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from typing import List

import pytest


Expand All @@ -23,7 +21,7 @@ def pytest_configure(config: pytest.Config) -> None:


def pytest_collection_modifyitems(
config: pytest.Config, items: List[pytest.Item]
config: pytest.Config, items: list[pytest.Item]
) -> None:
if not config.getoption("--runslow"):
skip_slow = pytest.mark.skip(reason="need --runslow option to run")
Expand Down
Loading

0 comments on commit 134847b

Please sign in to comment.