diff --git a/.github/workflows/continuous-integration.yml b/.github/workflows/continuous-integration.yml index a23f734..040fc3f 100644 --- a/.github/workflows/continuous-integration.yml +++ b/.github/workflows/continuous-integration.yml @@ -7,37 +7,33 @@ on: pull_request: jobs: - standard: - name: Standard + test: + name: Test runs-on: ubuntu-latest strategy: matrix: python-version: ["3.8", "3.9", "3.10", "3.11"] - defaults: - run: - shell: bash -l {0} steps: - uses: actions/checkout@v4 - - name: Set up pip cache - uses: actions/cache@v3.3.2 + - uses: actions/setup-python@v4 with: - path: ~/.cache/pip - key: ${{ runner.os }}-pip-${{ hashFiles('**/setup.py', '**/requirements-dev.txt') }} - restore-keys: ${{ runner.os }}-pip- + python-version: ${{ matrix.python-version }} + cache: pip + - name: Install + run: pip install .[dev] - name: Test - run: ./scripts/cibuild - codecov: - name: Codecov - needs: - - standard + run: pytest + example: + name: Example runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - - name: Execute linters and test suites - run: ./scripts/cibuild - - name: Upload coverage to Codecov - uses: codecov/codecov-action@v3 + - uses: actions/setup-python@v4 with: - token: ${{ secrets.CODECOV_TOKEN }} - file: ./coverage.xml - fail_ci_if_error: false \ No newline at end of file + python-version: "3.11" + cache: pip + - name: Install + run: pip install .[example] + - name: Run example + run: scripts/run-example + diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 02a3d9c..fd2295b 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -11,17 +11,14 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - - name: Set up Python 3.10 uses: actions/setup-python@v4 with: - python-version: "3.10" - + python-version: "3.11" - name: Install release dependencies run: | python -m pip install --upgrade pip pip install build twine - - name: Build and publish package env: TWINE_USERNAME: ${{ secrets.PYPI_STACUTILS_USERNAME }} diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index ff6ae86..eda4701 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -2,29 +2,24 @@ # Please run `pre-commit run --all-files` when adding or changing entries. repos: - - repo: https://github.com/psf/black - rev: 23.7.0 - hooks: - - id: black - repo: https://github.com/codespell-project/codespell rev: v2.2.5 hooks: - id: codespell args: [--ignore-words=.codespellignore] types_or: [jupyter, markdown, python, shell] - - repo: https://github.com/PyCQA/flake8 - rev: 6.1.0 - hooks: - - id: flake8 - - repo: https://github.com/pycqa/isort - rev: 5.12.0 + - repo: https://github.com/psf/black + rev: 23.9.1 hooks: - - id: isort - args: ["--profile", "black"] + - id: black - repo: https://github.com/pre-commit/mirrors-mypy - rev: v1.5.0 + rev: v1.5.1 hooks: - id: mypy additional_dependencies: - - pytest - - types-setuptools == 65.7.0.3 + - click + - pydantic + - repo: https://github.com/astral-sh/ruff-pre-commit + rev: v0.0.289 + hooks: + - id: ruff diff --git a/MANIFEST.in b/MANIFEST.in index 967fa8a..200478b 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1 +1 @@ -include stactask/py.typed +include src/stac_task/py.typed diff --git a/README.md b/README.md index 874f780..dfd0ac2 100644 --- a/README.md +++ b/README.md @@ -1,169 +1 @@ -# STAC Task (stactask) - -This Python library consists of the Task class, which is used to create custom tasks based -on a "STAC In, STAC Out" approach. The Task class acts as wrapper around custom code and provides -several convenience methods for modifying STAC Items, creating derived Items, and providing a CLI. - -This library is based on a [branch of cirrus-lib](https://github.com/cirrus-geo/cirrus-lib/tree/features/task-class) except aims to be more generic. - -## Quickstart for Creating New Tasks - -```python -from typing import Any, Dict, List - -from stactask import Task - -class MyTask(Task): - name = "my-task" - description = "this task does it all" - - def validate(self, payload: Dict[str, Any]) -> bool: - return len(self.items) == 1 - - def process(self, **kwargs: Any) -> List[Dict[str, Any]]: - item = self.items[0] - - # download a datafile - item = self.download_item_assets(item, assets=['data']) - - # operate on the local file to create a new asset - item = self.upload_item_assets_to_s3(item) - - # this task returns a single item - return [item.to_dict(include_self_link=True, transform_hrefs=False)] -``` - -## Task Input - -| Field Name | Type | Description | -| ------------- | ---- | ----------- | -| type | string | Must be FeatureCollection | -| features | [Item] | A list of STAC `Item` | -| process | ProcessDefinition | A Process Definition | - -### ProcessDefinition Object - -A STAC task can be provided additional configuration via the 'process' field in the input -ItemCollection. - -| Field Name | Type | Description | -| ------------- | ---- | ----------- | -| description | string | Optional description of the process configuration | -| upload_options | UploadOptions | Options used when uploading assets to a remote server | -| tasks | Map | Dictionary of task configurations. A List of [task configurations](#taskconfig-object) is supported for backwards compatibility reasons, but a dictionary should be preferred. | - -#### UploadOptions Object - -| Field Name | Type | Description | -| ------------- | ---- | ----------- | -| path_template | string | **REQUIRED** A string template for specifying the location of uploaded assets | -| public_assets | [str] | A list of asset keys that should be marked as public when uploaded | -| headers | Map | A set of key, value headers to send when uploading data to s3 | -| collections | Map | A mapping of output collection name to a JSONPath pattern (for matching Items) | -| s3_urls | bool | Controls if the final published URLs should be an s3 (s3://*bucket*/*key*) or https URL | - -##### path_template - -The path_template string is a way to control the output location of uploaded assets from a STAC Item using metadata from the Item itself. -The template can contain fixed strings along with variables used for substitution. -See [the PySTAC documentation for `LayoutTemplate`](https://pystac.readthedocs.io/en/stable/api/layout.html#pystac.layout.LayoutTemplate) for a list of supported template variables and their meaning. - -##### collections - -The collections dictionary provides a collection ID and JSONPath pattern for matching against STAC Items. -At the end of processing, before the final STAC Items are returned, the Task class can be used to assign -all of the Items to specific collection IDs. For each Item the JSONPath pattern for all collections will be -compared. The first match will cause the Item's Collection ID to be set to the provided value. - -For example: - -```json -"collections": { - "landsat-c2l2": "$[?(@.id =~ 'LC08.*')]" -} -``` - -In this example, the task will set any STAC Items that have an ID beginning with "LC08" to the `landsat-c2l2` collection. - -See [Jayway JsonPath Evaluator](https://jsonpath.herokuapp.com/) to experiment with JSONpath and [regex101](https://regex101.com/) to experiment with regex. - -#### tasks - -The tasks field is a dictionary with an optional key for each task. If present, it contains -a dictionary that is converted to a set of keywords and passed to the Task's `process` function. -The documentation for each task will provide the list of available parameters. - -```json -{ - "tasks": { - "task-a": { - "param1": "value1" - }, - "task-c": { - "param2": "value2" - } - } -} -``` - -In the example above a task named `task-a` would have the `param1=value1` passed as a keyword, while `task-c` -would have `param2=value2` passed. If there were a `task-b` to be run it would not be passed any keywords. - -#### TaskConfig Object - -**DEPRECATED**: `tasks` should be a dictionary of parameters, with task names as keys. See [tasks](#tasks) for more information. - -A Task Configuration contains information for running a specific task. - -| Field Name | Type | Description | -| ------------- | ---- | ----------- | -| name | str | **REQUIRED** Name of the task | -| parameters | Map | Dictionary of keyword parameters that will be passed to the Tasks `process` function | - -## Full Process Definition Example - -Process definitions are sometimes called "Payloads": - -```json -{ - "description": "My process configuration", - "collections": { - "landsat-c2l2": "$[?(@.id =~ 'LC08.*')]" - }, - "upload_options": { - "path_template": "s3://my-bucket/${collection}/${year}/${month}/${day}/${id}" - }, - "tasks": { - "task-name": { - "param": "value" - } - } -} -``` - -## Development - -Clone, install in editable mode with development requirements, and install the **pre-commit** hooks: - -```shell -git clone https://github.com/stac-utils/stac-task -cd stac-task -pip install -e '.[dev]' -pre-commit install -``` - -To run the complete test and linting suite: - -```shell -./scripts/test -``` - -To just run the tests: - -```shell -pytest -``` - -## Contributing - -Use Github [issues](https://github.com/stac-utils/stac-task/issues) and [pull requests](https://github.com/stac-utils/stac-task/pulls). +# stac-task diff --git a/codecov.yml b/codecov.yml deleted file mode 100644 index c61977e..0000000 --- a/codecov.yml +++ /dev/null @@ -1,8 +0,0 @@ -comment: off - -coverage: - status: - project: - default: - target: auto - threshold: 5 diff --git a/examples/dataset_geo.tif b/examples/dataset_geo.tif new file mode 100644 index 0000000..0443154 Binary files /dev/null and b/examples/dataset_geo.tif differ diff --git a/examples/payload.json b/examples/payload.json new file mode 100644 index 0000000..402b13e --- /dev/null +++ b/examples/payload.json @@ -0,0 +1,13 @@ +{ + "type": "FeatureCollection", + "features": [ + { + "href": "./dataset_geo.tif" + } + ], + "process": { + "tasks": { + "rio-stac": {} + } + } +} \ No newline at end of file diff --git a/examples/rio_stac_task.py b/examples/rio_stac_task.py new file mode 100644 index 0000000..d3be558 --- /dev/null +++ b/examples/rio_stac_task.py @@ -0,0 +1,12 @@ +import rio_stac.stac +import stac_task +from pystac import Item +from stac_task import HrefTask + + +class RioStacTask(HrefTask): + def process_href(self, href: str) -> Item: + return rio_stac.stac.create_stac_item(href) + + +stac_task.register_task("rio-stac", RioStacTask) diff --git a/pyproject.toml b/pyproject.toml index 3bfab51..d4c3da2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,14 +1,17 @@ [project] -name = "stactask" -version = "0.1.1" -authors = [{ name = "Matthew Hanson", email = "matt.a.hanson@gmail.com" }] -description = "Class interface for running custom algorithms and workflows on STAC Items" +name = "stac-task" +version = "0.2.0" +authors = [ + { name = "Matthew Hanson", email = "matt.a.hanson@gmail.com" }, + { name = "Pete Gadomski", email = "pete.gadomski@gmail.com" }, +] +description = "Class interface for running custom algorithms and workflows on STAC items" readme = "README.md" requires-python = ">=3.8" keywords = ["pystac", "imagery", "raster", "catalog", "STAC"] license = { text = "Apache-2.0" } classifiers = [ - "Development Status :: 2 - Pre-Alpha", + "Development Status :: 3 - Alpha", "Intended Audience :: Developers", "License :: OSI Approved :: Apache Software License", "Natural Language :: English", @@ -18,37 +21,29 @@ classifiers = [ "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", ] -dependencies = [ - "pystac>=1.6", - "python-dateutil>=2.7.0", - "boto3-utils>=0.3.2", - "fsspec>=2022.8.2", - "jsonpath_ng>=1.5.3", - "requests>=2.28.1", - "s3fs>=2022.8.2", -] +dependencies = ["pystac>=1.8", "pydantic>=2", "stac-asset>=0.2.1"] [project.optional-dependencies] +cli = ["click~=8.1"] dev = [ - "black~=23.9.1", - "codespell~=2.2.5", - "flake8~=6.1.0", - "isort~=5.12.0", - "mypy~=1.5.0", - "pre-commit~=3.4.0", - "pytest~=7.4.0", - "pytest-cov~=4.1.0", - "types-setuptools~=68.2.0", + "black~=23.9", + "codespell~=2.2", + "ruff==0.0.289", + "mypy~=1.5", + "pre-commit~=3.4", + "pytest~=7.4", + "pytest-cov~=4.1", + "types-setuptools~=68.2", ] +example = ["rio-stac~=0.8.0"] + +[project.scripts] +stac-task = "stac_task._cli:cli" [project.urls] -Issues = "https://github.com/stac-utils/stactask/issues" +Issues = "https://github.com/stac-utils/stac-task/issues" Github = "https://github.com/stac-utils/stac-task" Changelog = "https://github.com/stac-utils/stac-task/blob/main/CHANGELOG.md" [tool.mypy] strict = true - -[[tool.mypy.overrides]] -module = ["boto3utils", "jsonpath_ng.ext", "fsspec"] -ignore_missing_imports = true diff --git a/scripts/check b/scripts/check deleted file mode 100755 index f128227..0000000 --- a/scripts/check +++ /dev/null @@ -1,22 +0,0 @@ -#!/bin/bash - -set -e - -if [[ -n "${CI}" ]]; then - set -x -fi - -function usage() { - echo -n \ - "Usage: $(basename "$0") -Execute project checks. -" -} - -if [ "${BASH_SOURCE[0]}" = "${0}" ]; then - if [ "${1:-}" = "--help" ]; then - usage - else - pre-commit run --all-files - fi -fi diff --git a/scripts/cibuild b/scripts/cibuild deleted file mode 100755 index c637853..0000000 --- a/scripts/cibuild +++ /dev/null @@ -1,23 +0,0 @@ -#!/bin/bash - -set -e - -if [[ -n "${CI}" ]]; then - set -x -fi - -function usage() { - echo -n \ - "Usage: $(basename "$0") -Execute project linters and test suites in CI. -" -} - -if [ "${BASH_SOURCE[0]}" = "${0}" ]; then - if [ "${1:-}" = "--help" ]; then - usage - else - ./scripts/update - ./scripts/test - fi -fi diff --git a/scripts/run-example b/scripts/run-example new file mode 100755 index 0000000..8331446 --- /dev/null +++ b/scripts/run-example @@ -0,0 +1,7 @@ +#!/usr/bin/env sh +# +# Run this from the root directory of the repo. + +set -e + +stac-task -f examples/rio_stac_task.py run examples/payload.json diff --git a/scripts/test b/scripts/test deleted file mode 100755 index 8b59b2c..0000000 --- a/scripts/test +++ /dev/null @@ -1,32 +0,0 @@ -#!/bin/bash - -set -e - -if [[ -n "${CI}" ]]; then - set -x -fi - -function usage() { - echo -n \ - "Usage: $(basename "$0") -Execute project linters and test suites. -" -} - -if [ "${BASH_SOURCE[0]}" = "${0}" ]; then - if [ "${1:-}" = "--help" ]; then - usage - else - ./scripts/check - - codespell -I .codespellignore -f \ - scripts/* \ - *.py ./**/*.py \ - *.md \ - docs/*.rst docs/**/*.rst \ - docs/*.ipynb docs/**/*.ipynb - - pytest --cov=stactask tests - coverage xml - fi -fi \ No newline at end of file diff --git a/scripts/update b/scripts/update deleted file mode 100755 index 95bf523..0000000 --- a/scripts/update +++ /dev/null @@ -1,23 +0,0 @@ -#!/bin/bash - -set -e - -if [[ -n "${CI}" ]]; then - set -x -fi - -function usage() { - echo -n \ - "Usage: $(basename "$0") -Install requirements for all subpackages and development. -" -} - -if [ "${BASH_SOURCE[0]}" = "${0}" ]; then - if [ "${1:-}" = "--help" ]; then - usage - else - python -m pip install --upgrade pip - pip install '.[dev]' - fi -fi diff --git a/src/stac_task/__init__.py b/src/stac_task/__init__.py new file mode 100644 index 0000000..1509607 --- /dev/null +++ b/src/stac_task/__init__.py @@ -0,0 +1,57 @@ +import copy +import importlib.util +import sys +from pathlib import Path +from typing import Any, Dict, Type + +from .errors import ExecutionError, StacTaskError +from .payload import Payload +from .process import Process +from .task import HrefTask, Input, ItemTask, Output, PassthroughTask, Task +from .types import PathLikeObject +from .upload_options import UploadOptions + +_TASKS: Dict[str, Type[Task[Any, Any]]] = {"passthrough": PassthroughTask} + + +def get_tasks() -> Dict[str, Type[Task[Input, Input]]]: + """Returns all tasks.""" + return copy.deepcopy(_TASKS) + + +def register_task(name: str, task_class: Type[Task[Input, Output]]) -> None: + if name in _TASKS: + raise ValueError(f"task is already registered: {name}") + else: + _TASKS[name] = task_class + + +def load_file(path: PathLikeObject) -> None: + # https://docs.python.org/3/library/importlib.html#importing-a-source-file-directly + module_name = ( + "stac_task.plugins." + Path(path).stem + ) # FIXME this feels a little fragile + if module_name in sys.modules: + raise ValueError(f"module name is already imported: {module_name}") + spec = importlib.util.spec_from_file_location(module_name, str(path)) + if spec is None: + raise ValueError( + f"could not build spec from file location: {module_name}, {path}" + ) + module = importlib.util.module_from_spec(spec) + sys.modules[module_name] = module + if not spec.loader: + raise ValueError(f"spec does not have a loader for module {module_name}") + spec.loader.exec_module(module) + + +__all__ = [ + "ExecutionError", + "HrefTask", + "ItemTask", + "Payload", + "Process", + "StacTaskError", + "Task", + "UploadOptions", +] diff --git a/src/stac_task/_cli.py b/src/stac_task/_cli.py new file mode 100644 index 0000000..60b19c9 --- /dev/null +++ b/src/stac_task/_cli.py @@ -0,0 +1,62 @@ +import json +import sys +from pathlib import Path +from typing import Optional + +import click + +import stac_task +from stac_task import Payload + + +@click.group() +@click.option("-f", "--file", help="A Python file to load and include in the task list") +def cli(file: Optional[str]) -> None: + if file: + stac_task.load_file(Path(file).absolute()) + + +@cli.command() +@click.argument("INPUT") +@click.argument("OUTPUT", required=False) +def run(input: str, output: Optional[str]) -> None: + payload = Payload.from_href(input) + result = payload.execute(stac_task.get_tasks()) + if output is None: + print(result.model_dump_json(indent=2)) + else: + result.to_path(output) + + +@cli.command() +def list() -> None: + for key, value in stac_task.get_tasks().items(): # type: ignore + if value.__doc__: + description = value.__doc__.split("\n")[0] + print(f"{key}: {description}") + else: + print(key) + + +@cli.command() +@click.argument("TASK") +@click.argument("MODEL") +def jsonschema(task: str, model: str) -> None: + tasks = stac_task.get_tasks() # type: ignore + if task not in tasks: + print(f"Invalid task: {task}", file=sys.stderr) + print("Run `stac-task --list` to see available tasks", file=sys.stderr) + sys.exit(1) + task_class = tasks[task] + if model.lower() == "input": + output = task_class.input.model_json_schema() + elif model.lower() == "output": + output = task_class.output.model_json_schema() + elif model.lower() == "config": + output = task_class.model_json_schema() + else: + print(f"Invalid model: {model}", file=sys.stderr) + print("Must be one of 'input', 'output', or 'config'", file=sys.stderr) + sys.exit(1) + + print(json.dumps(output, indent=2)) diff --git a/src/stac_task/errors.py b/src/stac_task/errors.py new file mode 100644 index 0000000..f8ae957 --- /dev/null +++ b/src/stac_task/errors.py @@ -0,0 +1,6 @@ +class StacTaskError(Exception): + pass + + +class ExecutionError(StacTaskError): + pass diff --git a/src/stac_task/models.py b/src/stac_task/models.py new file mode 100644 index 0000000..8375955 --- /dev/null +++ b/src/stac_task/models.py @@ -0,0 +1,90 @@ +from __future__ import annotations + +from typing import Dict, List, Literal, Optional + +import pystac +import pystac.utils +from pydantic import BaseModel, ConfigDict + + +class Anything(BaseModel): + """A model for any dictionary.""" + + model_config = ConfigDict(extra="allow") + + +class Href(BaseModel): + """A model for a single href.""" + + href: str + + +class Properties(BaseModel): + """The properties of a STAC item.""" + + model_config = ConfigDict(extra="allow") + + datetime: Optional[str] + + +class Link(BaseModel): + """A link.""" + + model_config = ConfigDict(extra="allow") + + href: str + rel: str + type: Optional[str] = None + title: Optional[str] = None + + +class Asset(BaseModel): + """An asset.""" + + model_config = ConfigDict(extra="allow") + + href: str + title: Optional[str] = None + description: Optional[str] = None + type: Optional[str] = None + roles: Optional[List[str]] = None + + +class Item(BaseModel): + """A STAC Item. + + We choose to define our own instead of using *stac-pydantic*. + """ + + model_config = ConfigDict(extra="allow") + + type: Literal["Feature"] = "Feature" + stac_version: str = "1.0.0" + stac_extensions: Optional[List[str]] = None + id: str + geometry: Optional[Anything] = None + bbox: Optional[List[float]] = None + properties: Properties = Properties(datetime=None) + links: List[Link] = [] + assets: Dict[str, Asset] = {} + collection: Optional[str] = None + + def to_pystac(self) -> pystac.Item: + """Converts this pydantic model to a pystac Item.""" + item = pystac.Item( + id=self.id, + geometry=self.geometry, + bbox=self.bbox, + properties=self.properties.model_dump(), + assets=self.assets, + collection=self.collection, + extra_fields=self.model_extra, + datetime=pystac.utils.str_to_datetime(self.properties.datetime), + ) + item.links = self.links + return item + + @classmethod + def from_pystac(cls, item: pystac.Item) -> Item: + """Converts this pydantic model to a pystac Item.""" + return Item.model_validate(item.to_dict(transform_hrefs=False)) diff --git a/src/stac_task/payload.py b/src/stac_task/payload.py new file mode 100644 index 0000000..d39fb0a --- /dev/null +++ b/src/stac_task/payload.py @@ -0,0 +1,85 @@ +from __future__ import annotations + +from pathlib import Path +from typing import Any, Dict, List, Literal, Mapping, Optional, Type + +import pystac.utils +import stac_asset.blocking +from pydantic import BaseModel, Field + +from .errors import ExecutionError +from .process import Process +from .task import Input, Output, Task +from .types import PathLikeObject + + +class Payload(BaseModel): + type: Literal["FeatureCollection"] = "FeatureCollection" + features: List[Dict[str, Any]] = [] + process: Process = Process() + # TODO do we need to support `url` as well? + href: Optional[str] = None + self_href: Optional[str] = Field(None, exclude=True) + + @classmethod + def from_href(cls, href: str) -> Payload: + """Loads a payload from an href. + + If the payload has an `href` attribute set, that href will be fetched. + This is used for "indirect" payloads that point to large payloads that + need to be stored on s3. + + Args: + href: The href to load the payload from. + + Returns: + Payload: The payload + """ + # TODO we could go async with these + payload = cls.model_validate_json(stac_asset.blocking.read_href(href)) + if payload.href and not payload.features: + href = pystac.utils.make_absolute_href( + payload.href, href, start_is_dir=False + ) + return cls.from_href(href) + else: + payload.self_href = href + return payload + + def execute(self, tasks: Mapping[str, Type[Task[Input, Output]]]) -> Payload: + """Executes this payload, returning the updated payload. + + Args: + tasks: The tasks to use for processing. + + Returns: + Payload: A new payload, with updated items. + """ + matches = set(key for key in tasks.keys() if key in self.process.tasks) + if len(matches) == 0: + raise ExecutionError("no tasks to execute") + elif len(matches) == 1: + key = matches.pop() + task_class = tasks[key] + task = task_class(**self.process.tasks[key]) + task.payload_href = self.self_href + items = list() + # TODO async / thread pool + for item in self.features: + items.extend(task.process_dict(item)) + return Payload( + features=items, process=self.process.model_copy(), self_href=None + ) + else: + raise ValueError( + "multiple task execution not supported at this time: " + + ",".join(sorted(matches)) + ) + + def to_path(self, path: PathLikeObject) -> None: + """Writes a payload a path. + + Args: + path: The path to write the payload to. + """ + Path(path).write_text(self.model_dump_json()) diff --git a/src/stac_task/process.py b/src/stac_task/process.py new file mode 100644 index 0000000..d6e10e7 --- /dev/null +++ b/src/stac_task/process.py @@ -0,0 +1,11 @@ +from typing import Any, Dict, Optional + +from pydantic import BaseModel + +from .upload_options import UploadOptions + + +class Process(BaseModel): + description: Optional[str] = None + upload_options: UploadOptions = UploadOptions() + tasks: Dict[str, dict[str, Any]] = {} diff --git a/stactask/py.typed b/src/stac_task/py.typed similarity index 100% rename from stactask/py.typed rename to src/stac_task/py.typed diff --git a/src/stac_task/task.py b/src/stac_task/task.py new file mode 100644 index 0000000..9b121b2 --- /dev/null +++ b/src/stac_task/task.py @@ -0,0 +1,81 @@ +from abc import ABC, abstractmethod +from typing import Any, ClassVar, Dict, Generic, List, Optional, Type, TypeVar + +import pystac +import pystac.utils +from pydantic import BaseModel + +from .models import Anything, Href, Item + +Input = TypeVar("Input", bound=BaseModel) +Output = TypeVar("Output", bound=BaseModel) + + +class Task(Generic[Input, Output], ABC, BaseModel): + # Go away mypy, you can't handle this (it's not your fault, + # https://github.com/python/mypy/issues/5144) + input: ClassVar[Type[Input]] # type: ignore + output: ClassVar[Type[Output]] # type: ignore + payload_href: Optional[str] = None + + @abstractmethod + def process(self, item: Input) -> List[Output]: + ... + + def process_dict(self, item: Dict[str, Any]) -> List[Dict[str, Any]]: + return [ + output.model_dump() + for output in self.process(self.input.model_validate(item)) + ] + + +class PassthroughTask(Task[Anything, Anything]): + """A simple task that doesn't modify the items at all.""" + + input = Anything + output = Anything + + def process(self, item: Anything) -> List[Anything]: + return [item] + + +class ItemTask(Task[Item, Item], ABC): + """STAC In, STAC Out task. + + A abstract task that has STAC Items as the input and the output. + """ + + input = Item + output = Item + + @abstractmethod + def process_item(self, item: pystac.Item) -> pystac.Item: + """Takes a :py:class:`pystac.Item` as input, and the same.""" + ... + + def process(self, item: Item) -> List[Item]: + return [Item.from_pystac(self.process_item(item.to_pystac()))] + + +class HrefTask(Task[Href, Item], ABC): + """Href in, STAC Out task. + + A abstract task that takes a single href as input, and returns a pystac Item. + """ + + input = Href + output = Item + + @abstractmethod + def process_href(self, href: str) -> pystac.Item: + """Takes an href as input, and returns a single pystac Item.""" + ... + + def process(self, item: Href) -> List[Item]: + if self.payload_href: + href = pystac.utils.make_absolute_href( + item.href, self.payload_href, start_is_dir=False + ) + else: + href = item.href + return [Item.from_pystac(self.process_href(href))] diff --git a/src/stac_task/types.py b/src/stac_task/types.py new file mode 100644 index 0000000..d74fc38 --- /dev/null +++ b/src/stac_task/types.py @@ -0,0 +1,22 @@ +from os import PathLike +from typing import TYPE_CHECKING, Union + +if TYPE_CHECKING: + from typing import Any + + _PathLike = PathLike[Any] +else: + _PathLike = PathLike + +PathLikeObject = Union[_PathLike, str] +"""An object representing a file system path, except we exclude `bytes` because +`Path()` doesn't accept `bytes`. + +A path-like object is either a str or bytes object representing a path, or an +object implementing the os.PathLike protocol. An object that supports the +os.PathLike protocol can be converted to a str or bytes file system path by +calling the os.fspath() function; os.fsdecode() and os.fsencode() can be used to +guarantee a str or bytes result instead, respectively. Introduced by PEP 519. + +https://docs.python.org/3/glossary.html#term-path-like-object +""" diff --git a/src/stac_task/upload_options.py b/src/stac_task/upload_options.py new file mode 100644 index 0000000..ad8db8f --- /dev/null +++ b/src/stac_task/upload_options.py @@ -0,0 +1,11 @@ +from typing import Dict, List, Literal, Optional, Union + +from pydantic import BaseModel + + +class UploadOptions(BaseModel): + path_template: str = r"${collection}" + public_assets: Optional[Union[Literal["ALL"], List[str]]] = None + headers: Optional[Dict[str, str]] = None + collections: Optional[Dict[str, str]] = None + s3_urls: bool = False diff --git a/stactask/__init__.py b/stactask/__init__.py deleted file mode 100644 index d50ef90..0000000 --- a/stactask/__init__.py +++ /dev/null @@ -1,9 +0,0 @@ -# flake8: noqa - -import pkg_resources - -__version__ = pkg_resources.get_distribution(__package__).version - -from .task import Task - -__all__ = ["Task"] diff --git a/stactask/asset_io.py b/stactask/asset_io.py deleted file mode 100644 index ed644f2..0000000 --- a/stactask/asset_io.py +++ /dev/null @@ -1,151 +0,0 @@ -import asyncio -import logging -import os -from os import path as op -from typing import Any, Dict, Iterable, List, Optional, Union - -import fsspec -from boto3utils import s3 -from fsspec import AbstractFileSystem -from pystac import Item -from pystac.layout import LayoutTemplate - -logger = logging.getLogger(__name__) - -# global dictionary of sessions per bucket -s3_client = s3() - -SIMULTANEOUS_DOWNLOADS = int(os.getenv("STAC_SIMULTANEOUS_DOWNLOADS", 3)) -sem = asyncio.Semaphore(SIMULTANEOUS_DOWNLOADS) - - -async def download_file(fs: AbstractFileSystem, src: str, dest: str) -> None: - async with sem: - logger.debug(f"{src} start") - await fs._get_file(src, dest) - logger.debug(f"{src} completed") - - -async def download_item_assets( - item: Item, - assets: Optional[List[str]] = None, - save_item: bool = True, - overwrite: bool = False, - path_template: str = "${collection}/${id}", - absolute_path: bool = False, - **kwargs: Any, -) -> Item: - _assets = item.assets.keys() if assets is None else assets - - # determine path from template and item - layout = LayoutTemplate(path_template) - path = layout.substitute(item) - - # make necessary directories - os.makedirs(path, exist_ok=True) - - new_item = item.clone() - - tasks = [] - for a in _assets: - if a not in item.assets: - continue - href = item.assets[a].href - - # local filename - ext = os.path.splitext(href)[-1] - new_href = os.path.join(path, a + ext) - if absolute_path: - new_href = os.path.abspath(new_href) - - # save file - if not os.path.exists(new_href) or overwrite: - fs = fsspec.core.url_to_fs(href, asynchronous=True, **kwargs)[0] - task = asyncio.create_task(download_file(fs, href, new_href)) - tasks.append(task) - - # update - new_item.assets[a].href = new_href - - await asyncio.gather(*tasks) - - # save Item metadata alongside saved assets - if save_item: - new_item.remove_links("root") - new_item.save_object(dest_href=os.path.join(path, "item.json")) - - return new_item - - -async def download_items_assets(items: Iterable[Item], **kwargs: Any) -> List[Item]: - tasks = [] - for item in items: - tasks.append(asyncio.create_task(download_item_assets(item, **kwargs))) - new_items: List[Item] = await asyncio.gather(*tasks) - return new_items - - -def upload_item_assets_to_s3( - item: Item, - assets: Optional[List[str]] = None, - public_assets: Union[None, List[str], str] = None, - path_template: str = "${collection}/${id}", - s3_urls: bool = False, - headers: Optional[Dict[str, Any]] = None, - **kwargs: Any, -) -> Item: - """Upload Item assets to s3 bucket - Args: - item (Dict): STAC Item - assets (List[str], optional): List of asset keys to upload. Defaults to None. - public_assets (List[str], optional): List of assets keys that should be - public. Defaults to []. - path_template (str, optional): Path string template. Defaults to - '${collection}/${id}'. - s3_urls (bool, optional): Return s3 URLs instead of http URLs. Defaults - to False. - headers (Dict, optional): Dictionary of headers to set on uploaded - assets. Defaults to {}, - Returns: - Dict: A new STAC Item with uploaded assets pointing to newly uploaded file URLs - """ - if headers is None: - headers = {} - - # deepcopy of item - _item = item.to_dict() - - if public_assets is None: - public_assets = [] - # determine which assets should be public - elif type(public_assets) is str: - if public_assets == "ALL": - public_assets = list(_item["assets"].keys()) - else: - raise ValueError(f"unexpected value for `public_assets`: {public_assets}") - - # if assets not provided, upload all assets - _assets = assets if assets is not None else _item["assets"].keys() - - for key in [a for a in _assets if a in _item["assets"].keys()]: - asset = _item["assets"][key] - filename = asset["href"] - if not op.exists(filename): - logger.warning(f"Cannot upload {filename}: does not exist") - continue - public = True if key in public_assets else False - _headers = {} - if "type" in asset: - _headers["ContentType"] = asset["type"] - _headers.update(headers) - # output URL - layout = LayoutTemplate(op.join(path_template, op.basename(filename))) - url = layout.substitute(item) - - # upload - logger.debug(f"Uploading {filename} to {url}") - url_out = s3_client.upload( - filename, url, public=public, extra=_headers, http_url=not s3_urls - ) - _item["assets"][key]["href"] = url_out - return Item.from_dict(_item) diff --git a/stactask/exceptions.py b/stactask/exceptions.py deleted file mode 100644 index a236f19..0000000 --- a/stactask/exceptions.py +++ /dev/null @@ -1,14 +0,0 @@ -class InvalidInput(Exception): - """Exception class for when processing fails due to invalid input - - Args: - Exception (Exception): Base class - """ - - pass - - -class FailedValidation(Exception): - """Exception class thrown when input payload does not validate""" - - pass diff --git a/stactask/task.py b/stactask/task.py deleted file mode 100644 index ee53686..0000000 --- a/stactask/task.py +++ /dev/null @@ -1,435 +0,0 @@ -import argparse -import asyncio -import itertools -import json -import logging -import sys -import warnings -from abc import ABC, abstractmethod -from copy import deepcopy -from os import makedirs -from pathlib import Path -from shutil import rmtree -from tempfile import mkdtemp -from typing import Any, Callable, Dict, Iterable, List, Optional, Union - -import fsspec -from pystac import Item, ItemCollection - -from .asset_io import ( - download_item_assets, - download_items_assets, - upload_item_assets_to_s3, -) -from .exceptions import FailedValidation -from .utils import stac_jsonpath_match - -# types -PathLike = Union[str, Path] - - -class Task(ABC): - """ - Tasks can use parameters provided in a `process` Dictionary that is supplied in - the ItemCollection JSON under the "process" field. An example process - definition: - - ``` - { - "description": "My process configuration" - "upload_options": { - "path_template": "s3://my-bucket/${collection}/${year}/${month}/${day}/${id}", - "collections": { - "landsat-c2l2": "" - } - }, - "tasks": { - "task-name": { - "param": "value" - } - ] - } - ``` - """ - - name = "task" - description = "A task for doing things" - version = "0.1.0" - - def __init__( - self: "Task", - payload: Dict[str, Any], - workdir: Optional[PathLike] = None, - save_workdir: bool = False, - skip_upload: bool = False, - skip_validation: bool = False, - ): - # set up logger - self.logger = logging.getLogger(self.name) - - # set this to avoid confusion in destructor if called during validation - self._save_workdir = True - - # validate input payload...or not - if not skip_validation: - if not self.validate(payload): - raise FailedValidation() - - # set instance variables - self._save_workdir = save_workdir - self._skip_upload = skip_upload - self._payload = payload - - # create temporary work directory if workdir is None - if workdir is None: - self._workdir = Path(mkdtemp()) - else: - self._workdir = Path(workdir) - makedirs(self._workdir, exist_ok=True) - - def __del__(self) -> None: - # remove work directory if not running locally - if not self._save_workdir: - self.logger.debug("Removing work directory %s", self._workdir) - rmtree(self._workdir) - - @property - def process_definition(self) -> Dict[str, Any]: - return self._payload.get("process", {}) - - @property - def parameters(self) -> Dict[str, Any]: - task_configs = self.process_definition.get("tasks", []) - if isinstance(task_configs, List): - warnings.warn( - "task configs is list, use a dictionary instead", - DeprecationWarning, - stacklevel=2, - ) - task_config_list = [cfg for cfg in task_configs if cfg["name"] == self.name] - if len(task_config_list) == 0: - return {} - else: - task_config: Dict[str, Any] = task_config_list[0] - return task_config.get("parameters", {}) - elif isinstance(task_configs, Dict): - return task_configs.get(self.name, {}) - else: - raise ValueError(f"unexpected value for 'tasks': {task_configs}") - - @property - def upload_options(self) -> Dict[str, Any]: - return self.process_definition.get("upload_options", {}) - - @property - def items_as_dicts(self) -> List[Dict[str, Any]]: - return self._payload.get("features", []) - - @property - def items(self) -> ItemCollection: - items_dict = {"type": "FeatureCollection", "features": self.items_as_dicts} - return ItemCollection.from_dict(items_dict, preserve_dict=True) - - @classmethod - def validate(cls, payload: Dict[str, Any]) -> bool: - # put validation logic on input Items and process definition here - return True - - @classmethod - def add_software_version(cls, items: List[Dict[str, Any]]) -> List[Dict[str, Any]]: - warnings.warn( - "add_software_version is deprecated, " - "use add_software_version_to_item instead", - DeprecationWarning, - ) - modified_items = list() - for item in items: - modified_items.append(cls.add_software_version_to_item(item)) - return modified_items - - @classmethod - def add_software_version_to_item(cls, item: Dict[str, Any]) -> Dict[str, Any]: - """Adds software version information to a single item. - - Uses the processing extension. - - Args: - item: A single STAC item - - Returns: - Dict[str, Any]: The same item with processing information applied. - """ - processing_ext = ( - "https://stac-extensions.github.io/processing/v1.1.0/schema.json" - ) - if "stac_extensions" not in item: - item["stac_extensions"] = [] - item["stac_extensions"].append(processing_ext) - item["stac_extensions"] = list(set(item["stac_extensions"])) - if "properties" not in item: - item["properties"] = {} - item["properties"]["processing:software"] = {cls.name: cls.version} - return item - - def assign_collections(self) -> None: - """Assigns new collection names based on""" - for i, (coll, expr) in itertools.product( - self._payload["features"], - self.upload_options.get("collections", dict()).items(), - ): - if stac_jsonpath_match(i, expr): - i["collection"] = coll - - def download_item_assets( - self, - item: Item, - path_template: str = "${collection}/${id}", - **kwargs: Any, - ) -> Item: - """Download provided asset keys for all items in payload. Assets are - saved in workdir in a directory named by the Item ID, and the items are - updated with the new asset hrefs. - - Args: - assets (Optional[List[str]], optional): List of asset keys to - download. Defaults to all assets. - """ - outdir = str(self._workdir / path_template) - loop = asyncio.get_event_loop() - item = loop.run_until_complete( - download_item_assets(item, path_template=outdir, **kwargs) - ) - return item - - def download_items_assets( - self, - items: Iterable[Item], - path_template: str = "${collection}/${id}", - **kwargs: Any, - ) -> List[Item]: - outdir = str(self._workdir / path_template) - loop = asyncio.get_event_loop() - items = loop.run_until_complete( - download_items_assets(items, path_template=outdir, **kwargs) - ) - return list(items) - - def upload_item_assets_to_s3( - self, item: Item, assets: Optional[List[str]] = None - ) -> Item: - if self._skip_upload: - self.logger.warning("Skipping upload of new and modified assets") - return item - item = upload_item_assets_to_s3(item, assets=assets, **self.upload_options) - return item - - # this should be in PySTAC - @staticmethod - def create_item_from_item(item: Dict[str, Any]) -> Dict[str, Any]: - new_item = deepcopy(item) - # create a derived output item - links = [ - link["href"] for link in item.get("links", []) if link["rel"] == "self" - ] - if len(links) == 1: - # add derived from link - new_item["links"].append( - { - "title": "Source STAC Item", - "rel": "derived_from", - "href": links[0], - "type": "application/json", - } - ) - return new_item - - @abstractmethod - def process(self, **kwargs: Any) -> List[Dict[str, Any]]: - """Main task logic - virtual - - Returns: - [type]: [description] - """ - # download assets of interest, this will update self.items - # self.download_assets(['key1', 'key2']) - # do some stuff - # self.upload_assets(['key1', 'key2']) - pass - - def post_process_item(self, item: Dict[str, Any]) -> Dict[str, Any]: - """Perform post-processing operations on an item. - - E.g. add software version information. - - Most tasks should prefer to not override this method, as logic should be - kept in :py:meth:`Task.process`. If you do override this method, make - sure to call ``super().post_process_item()`` AFTER doing any custom - post-processing, so any regular behavior can take your changes into account. - - Args: - item: An item produced by :py:meth:`Task.process` - - Returns: - Dict[str, Any]: The item with any additional attributes applied. - """ - item = self.add_software_version_to_item(item) - assert "stac_extensions" in item - assert isinstance(item["stac_extensions"], list) - item["stac_extensions"].sort() - return item - - @classmethod - def handler(cls, payload: Dict[str, Any], **kwargs: Any) -> Dict[str, Any]: - if "href" in payload or "url" in payload: - # read input - with fsspec.open(payload.get("href", payload.get("url"))) as f: - payload = json.loads(f.read()) - - task = cls(payload, **kwargs) - try: - items = list() - for item in task.process(**task.parameters): - items.append(task.post_process_item(item)) - - task._payload["features"] = items - task.assign_collections() - - return task._payload - except Exception as err: - task.logger.error(err, exc_info=True) - raise err - - @classmethod - def parse_args(cls, args: List[str]) -> Dict[str, Any]: - dhf = argparse.ArgumentDefaultsHelpFormatter - parser0 = argparse.ArgumentParser(description=cls.description) - parser0.add_argument( - "--version", - help="Print version and exit", - action="version", - version=cls.version, - ) - - pparser = argparse.ArgumentParser(add_help=False) - pparser.add_argument( - "--logging", default="INFO", help="DEBUG, INFO, WARN, ERROR, CRITICAL" - ) - - subparsers = parser0.add_subparsers(dest="command") - - # run - h = "Process STAC Item Collection" - parser = subparsers.add_parser( - "run", parents=[pparser], help=h, formatter_class=dhf - ) - parser.add_argument( - "input", help="Full path of item collection to process (s3 or local)" - ) - - h = "Write output payload to this URL" - parser.add_argument("--output", help=h, default=None) - - # additional options - h = "Use this as work directory. Will be created." - parser.add_argument("--workdir", help=h, default=None, type=Path) - h = "Save workdir after completion" - parser.add_argument( - "--save-workdir", dest="save_workdir", action="store_true", default=False - ) - h = "Skip uploading of any generated assets and resulting STAC Items" - parser.add_argument( - "--skip-upload", dest="skip_upload", action="store_true", default=False - ) - h = "Skip validation of input payload" - parser.add_argument( - "--skip-validation", - dest="skip_validation", - action="store_true", - default=False, - ) - h = """ Run local mode -(save-workdir = True, skip-upload = True, skip-validation = True, -workdir = 'local-output', output = 'local-output/output-payload.json') """ - parser.add_argument("--local", help=h, action="store_true", default=False) - - # turn Namespace into dictionary - pargs = vars(parser0.parse_args(args)) - # only keep keys that are not None - pargs = {k: v for k, v in pargs.items() if v is not None} - - if pargs.pop("local", False): - # local mode sets all of - for k in ["save_workdir", "skip_upload", "skip_validation"]: - pargs[k] = True - if pargs.get("workdir") is None: - pargs["workdir"] = "local-output" - if pargs.get("output") is None: - pargs["output"] = Path(pargs["workdir"]) / "output-payload.json" - - if pargs.get("command", None) is None: - parser.print_help() - sys.exit(0) - - return pargs - - @classmethod - def cli(cls) -> None: - args = cls.parse_args(sys.argv[1:]) - cmd = args.pop("command") - - # logging - loglevel = args.pop("logging") - logging.basicConfig(level=loglevel) - - # quiet these loud loggers - for ql in [ - "botocore", - "s3transfer", - "urllib3", - "fsspec", - "asyncio", - "aiobotocore", - ]: - logging.getLogger(ql).propagate = False - - if cmd == "run": - href = args.pop("input") - href_out = args.pop("output", None) - - # read input - with fsspec.open(href) as f: - payload = json.loads(f.read()) - - # run task handler - payload_out = cls.handler(payload, **args) - - # write output - if href_out is not None: - with fsspec.open(href_out, "w") as f: - f.write(json.dumps(payload_out)) - - -# from https://pythonalgos.com/runtimeerror-event-loop-is-closed-asyncio-fix/ -"""fix yelling at me error""" -from asyncio.proactor_events import _ProactorBasePipeTransport # noqa -from functools import wraps # noqa - - -def silence_event_loop_closed(func: Callable[[Any], Any]) -> Callable[[Any], Any]: - @wraps(func) - def wrapper(self, *args: Any, **kwargs: Any) -> Any: # type: ignore - try: - return func(self, *args, **kwargs) - except RuntimeError as e: - if str(e) != "Event loop is closed": - raise - - return wrapper - - -setattr( - _ProactorBasePipeTransport, - "__del__", - silence_event_loop_closed(_ProactorBasePipeTransport.__del__), -) -"""fix yelling at me error end""" diff --git a/stactask/utils.py b/stactask/utils.py deleted file mode 100644 index 5527b98..0000000 --- a/stactask/utils.py +++ /dev/null @@ -1,25 +0,0 @@ -from typing import Any, Dict - -from jsonpath_ng.ext import parser - - -def stac_jsonpath_match(item: Dict[str, Any], expr: str) -> bool: - """Match jsonpath expression against STAC JSON. - Use https://jsonpath.herokuapp.com/ to experiment with JSONpath - and https://regex101.com/ to experiment with regex - - Args: - item (Dict): A STAC Item - expr (str): A valid JSONPath expression - - Raises: - err: Invalid inputs - - Returns: - Boolean: Returns True if the jsonpath expression matches the STAC Item JSON - """ - result = [x.value for x in parser.parse(expr).find([item])] - if len(result) == 1: - return True - else: - return False diff --git a/tests/__init__.py b/tests/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/tests/conftest.py b/tests/conftest.py index 238fc21..1c893a4 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,39 +1,12 @@ -from typing import List +from pathlib import Path +from typing import Callable import pytest -def pytest_addoption(parser: pytest.Parser) -> None: - parser.addoption( - "--runslow", action="store_true", default=False, help="run slow tests" - ) - parser.addoption( - "--s3-requester-pays", - action="store_true", - default=False, - help="run tests that require fetching data via s3 requester pays", - ) +@pytest.fixture +def data_path() -> Callable[[str], Path]: + def f(file_name: str) -> Path: + return Path(__file__).parent / "data" / file_name - -def pytest_configure(config: pytest.Config) -> None: - config.addinivalue_line("markers", "slow: mark test as slow to run") - config.addinivalue_line( - "markers", "s3_requester_pays: mark test as requiring s3 requester pays to run" - ) - - -def pytest_collection_modifyitems( - config: pytest.Config, items: List[pytest.Item] -) -> None: - if not config.getoption("--runslow"): - skip_slow = pytest.mark.skip(reason="need --runslow option to run") - for item in items: - if "slow" in item.keywords: - item.add_marker(skip_slow) - if not config.getoption("--s3-requester-pays"): - skip_s3_requestor_pays = pytest.mark.skip( - reason="need --s3-requester-pays option to run" - ) - for item in items: - if "s3_requester_pays" in item.keywords: - item.add_marker(skip_s3_requestor_pays) + return f diff --git a/tests/data/indirect.json b/tests/data/indirect.json new file mode 100644 index 0000000..a525ad0 --- /dev/null +++ b/tests/data/indirect.json @@ -0,0 +1,3 @@ +{ + "href": "./sentinel2-l2a-j2k-payload.json" +} \ No newline at end of file diff --git a/tests/data/passthrough.json b/tests/data/passthrough.json new file mode 100644 index 0000000..26e6a6d --- /dev/null +++ b/tests/data/passthrough.json @@ -0,0 +1,9 @@ +{ + "type": "FeatureCollection", + "features": [], + "process": { + "tasks": { + "passthrough": {} + } + } +} \ No newline at end of file diff --git a/tests/fixtures/sentinel2-l2a-j2k-payload.json b/tests/data/sentinel2-l2a-j2k-payload.json similarity index 99% rename from tests/fixtures/sentinel2-l2a-j2k-payload.json rename to tests/data/sentinel2-l2a-j2k-payload.json index 33b86fa..729b426 100644 --- a/tests/fixtures/sentinel2-l2a-j2k-payload.json +++ b/tests/data/sentinel2-l2a-j2k-payload.json @@ -1595,4 +1595,4 @@ "collection": "sentinel-2-l2a" } ] -} \ No newline at end of file +} diff --git a/tests/data/simple-item.json b/tests/data/simple-item.json new file mode 100644 index 0000000..1e413c4 --- /dev/null +++ b/tests/data/simple-item.json @@ -0,0 +1,81 @@ +{ + "stac_version": "1.0.0", + "stac_extensions": [], + "type": "Feature", + "id": "20201211_223832_CS2", + "bbox": [ + 172.91173669923782, + 1.3438851951615003, + 172.95469614953714, + 1.3690476620161975 + ], + "geometry": { + "type": "Polygon", + "coordinates": [ + [ + [ + 172.91173669923782, + 1.3438851951615003 + ], + [ + 172.95469614953714, + 1.3438851951615003 + ], + [ + 172.95469614953714, + 1.3690476620161975 + ], + [ + 172.91173669923782, + 1.3690476620161975 + ], + [ + 172.91173669923782, + 1.3438851951615003 + ] + ] + ] + }, + "properties": { + "datetime": "2020-12-11T22:38:32.125000Z" + }, + "collection": "simple-collection", + "links": [ + { + "rel": "collection", + "href": "./collection.json", + "type": "application/json", + "title": "Simple Example Collection" + }, + { + "rel": "root", + "href": "./collection.json", + "type": "application/json", + "title": "Simple Example Collection" + }, + { + "rel": "parent", + "href": "./collection.json", + "type": "application/json", + "title": "Simple Example Collection" + } + ], + "assets": { + "visual": { + "href": "https://storage.googleapis.com/open-cogs/stac-examples/20201211_223832_CS2.tif", + "type": "image/tiff; application=geotiff; profile=cloud-optimized", + "title": "3-Band Visual", + "roles": [ + "visual" + ] + }, + "thumbnail": { + "href": "https://storage.googleapis.com/open-cogs/stac-examples/20201211_223832_CS2.jpg", + "title": "Thumbnail", + "type": "image/jpeg", + "roles": [ + "thumbnail" + ] + } + } +} diff --git a/tests/tasks.py b/tests/tasks.py deleted file mode 100644 index 58aebc5..0000000 --- a/tests/tasks.py +++ /dev/null @@ -1,32 +0,0 @@ -from typing import Any, Dict, List - -from stactask import Task - - -class NothingTask(Task): - name = "nothing-task" - description = "this task does nothing" - - def process(self, **kwargs: Any) -> List[Dict[str, Any]]: - return self.items_as_dicts - - -class FailValidateTask(Task): - name = "failvalidation-task" - description = "this task always fails validation" - - @classmethod - def validate(self, payload: Dict[str, Any]) -> bool: - return False - - def process(self, **kwargs: Any) -> List[Dict[str, Any]]: - return self.items_as_dicts - - -class DerivedItemTask(Task): - name = "derived-item-task" - description = "this task creates a derived item" - - def process(self, **kwargs: Any) -> List[Dict[str, Any]]: - assert kwargs["parameter"] == "value" - return [self.create_item_from_item(self.items_as_dicts[0])] diff --git a/tests/test_cli.py b/tests/test_cli.py new file mode 100644 index 0000000..1ba8b86 --- /dev/null +++ b/tests/test_cli.py @@ -0,0 +1,28 @@ +from pathlib import Path +from typing import Callable + +from click.testing import CliRunner +from stac_task._cli import cli + + +def test_run_noop_no_output(data_path: Callable[[str], Path]) -> None: + runner = CliRunner() + result = runner.invoke( + cli, + ["run", str(data_path("passthrough.json"))], + ) + assert result.exit_code == 0, result.stdout + + +def test_run_noop_output(data_path: Callable[[str], Path], tmp_path: Path) -> None: + runner = CliRunner() + result = runner.invoke( + cli, + [ + "run", + str(data_path("passthrough.json")), + str(tmp_path / "item-collection.json"), + ], + ) + assert result.exit_code == 0, result.stdout + assert (tmp_path / "item-collection.json").exists() diff --git a/tests/test_models.py b/tests/test_models.py new file mode 100644 index 0000000..26cee4d --- /dev/null +++ b/tests/test_models.py @@ -0,0 +1,25 @@ +import datetime + +import pystac +from stac_task.models import Item, Properties + + +def test_item_to_pystac() -> None: + item = Item( + id="an-id", properties=Properties(datetime=datetime.datetime.now().isoformat()) + ).to_pystac() + assert item.id == "an-id" + assert isinstance(item.datetime, datetime.datetime) + + +def test_item_from_pystac() -> None: + item = Item.from_pystac( + pystac.Item( + "an-id", + geometry=None, + bbox=None, + datetime=datetime.datetime.now(), + properties={}, + ) + ) + assert item.id == "an-id" diff --git a/tests/test_payload.py b/tests/test_payload.py new file mode 100644 index 0000000..92369c0 --- /dev/null +++ b/tests/test_payload.py @@ -0,0 +1,63 @@ +from pathlib import Path +from typing import Any, Callable, Dict, List, Optional + +import pytest +from pydantic import BaseModel, ConfigDict +from stac_task import ExecutionError, Payload, Process, Task + + +class Input(BaseModel): + pass + + +class Output(BaseModel): + model_config = ConfigDict(extra="allow") + + the_meaning: int + + +class TheMeaning(Task[Input, Output]): + input = Input + + foo: Optional[bool] = None + + def process(self, item: Input) -> List[Output]: + fields: Dict[str, Any] = {"the_meaning": 42} + if self.foo: + fields["foo"] = "bar" + return [Output(**fields)] + + +def test_from_path(data_path: Callable[[str], Path]) -> None: + payload = Payload.from_href(str(data_path("sentinel2-l2a-j2k-payload.json"))) + assert len(payload.features) == 2 + + +def test_from_path_indirect(data_path: Callable[[str], Path]) -> None: + payload = Payload.from_href(str(data_path("indirect.json"))) + assert len(payload.features) == 2 + + +def test_empty() -> None: + with pytest.raises(ExecutionError): + assert Payload().execute({}) == Payload() + + +def test_add_attribute() -> None: + payload = Payload(features=[{}], process=Process(tasks={"the-meaning": {}})) + result = payload.execute({"the-meaning": TheMeaning}) + assert result.features == [{"the_meaning": 42}] + + +def test_error_without_tasks() -> None: + payload = Payload(features=[{}], process=Process(tasks={"the-meaning": {}})) + with pytest.raises(ExecutionError): + payload.execute({}) + + +def test_config() -> None: + payload = Payload( + features=[{}], process=Process(tasks={"the-meaning": {"foo": True}}) + ) + result = payload.execute({"the-meaning": TheMeaning}) + assert result.features == [{"the_meaning": 42, "foo": "bar"}] diff --git a/tests/test_task.py b/tests/test_task.py index 38754d3..e629fd1 100644 --- a/tests/test_task.py +++ b/tests/test_task.py @@ -1,151 +1,36 @@ -#!/usr/bin/env python -import json +import datetime from pathlib import Path -from typing import Any, Dict +from typing import Callable -import pytest +from pystac import Item +from stac_task import HrefTask, ItemTask +from stac_task.models import Href -from stactask.exceptions import FailedValidation -from stactask.task import Task -from .tasks import DerivedItemTask, FailValidateTask, NothingTask +def test_item_task() -> None: + class AddPropertyTask(ItemTask): + def process_item(self, item: Item) -> Item: + item.properties["foo"] = "bar" + return item -# import vcr - - -testpath = Path(__file__).parent -cassettepath = testpath / "fixtures" / "cassettes" - - -@pytest.fixture -def items() -> Dict[str, Any]: - filename = testpath / "fixtures" / "sentinel2-l2a-j2k-payload.json" - with open(filename) as f: - items = json.loads(f.read()) - assert isinstance(items, dict) - return items - - -@pytest.fixture -def nothing_task(items: Dict[str, Any]) -> Task: - return NothingTask(items) - - -@pytest.fixture -def derived_item_task(items: Dict[str, Any]) -> Task: - return DerivedItemTask(items) - - -def test_task_init(nothing_task: Task) -> None: - assert len(nothing_task._payload["features"]) == 2 - assert len(nothing_task.items) == 2 - assert nothing_task.logger.name == nothing_task.name - assert nothing_task._save_workdir is False - - -def test_failed_validation(items: Dict[str, Any]) -> None: - with pytest.raises(FailedValidation): - FailValidateTask(items) - - -def test_edit_items(nothing_task: Task) -> None: - nothing_task.process_definition["workflow"] = "test-task-workflow" - assert nothing_task._payload["process"]["workflow"] == "test-task-workflow" - - -def test_edit_items2(nothing_task: Task) -> None: - assert nothing_task._payload["features"][0]["type"] == "Feature" - - -def test_tmp_workdir(items: Dict[str, Any]) -> None: - nothing_task = NothingTask(items) - assert nothing_task._save_workdir is False - workdir = nothing_task._workdir - assert workdir.parts[-1].startswith("tmp") - assert workdir.is_dir() is True - del nothing_task - assert workdir.is_dir() is False - - -def test_workdir(items: Dict[str, Any]) -> None: - t = NothingTask(items, workdir=testpath / "test_task", save_workdir=True) - assert t._save_workdir is True - workdir = t._workdir - assert workdir.parts[-1] == "test_task" - assert workdir.is_dir() is True - del t - assert workdir.is_dir() is True - workdir.rmdir() - assert workdir.is_dir() is False - - -def test_parameters(items: Dict[str, Any]) -> None: - nothing_task = NothingTask(items) - assert nothing_task.process_definition["workflow"] == "cog-archive" - assert ( - nothing_task.upload_options["path_template"] - == items["process"]["upload_options"]["path_template"] + item = Item( + id="an-id", + geometry=None, + bbox=None, + datetime=datetime.datetime.now(), + properties={}, ) + result = AddPropertyTask().process_dict(item.to_dict()) + assert result[0]["properties"]["foo"] == "bar" -def test_process(nothing_task: Task) -> None: - processed_items = nothing_task.process() - assert processed_items[0]["type"] == "Feature" - - -def test_post_process(items: Dict[str, Any]) -> None: - class PostProcessTask(NothingTask): - name = "post-processing-test" - version = "42" - - def post_process_item(self, item: Dict[str, Any]) -> Dict[str, Any]: - item["properties"]["foo"] = "bar" - item["stac_extensions"].insert(0, "zzz") - return super().post_process_item(item) - - payload = PostProcessTask.handler(items) - for item in payload["features"]: - assert item["properties"]["foo"] == "bar" - assert item["properties"]["processing:software"]["post-processing-test"] == "42" - stac_extensions = item["stac_extensions"] - assert item["stac_extensions"] == sorted(stac_extensions) - - -def test_derived_item(derived_item_task: Task) -> None: - items = derived_item_task.process(**derived_item_task.parameters) - links = [lk for lk in items[0]["links"] if lk["rel"] == "derived_from"] - assert len(links) == 1 - self_link = [lk for lk in items[0]["links"] if lk["rel"] == "self"][0] - assert links[0]["href"] == self_link["href"] - - -def test_task_handler(items: Dict[str, Any]) -> None: - self_link = [lk for lk in items["features"][0]["links"] if lk["rel"] == "self"][0] - output_items = DerivedItemTask.handler(items) - derived_link = [ - lk for lk in output_items["features"][0]["links"] if lk["rel"] == "derived_from" - ][0] - assert derived_link["href"] == self_link["href"] - assert ( - "derived-item-task" - in output_items["features"][0]["properties"]["processing:software"] - ) - - -def test_parse_no_args() -> None: - with pytest.raises(SystemExit): - NothingTask.parse_args([]) - - -def test_parse_args() -> None: - args = NothingTask.parse_args("run input --save-workdir".split()) - assert args["command"] == "run" - assert args["logging"] == "INFO" - assert args["input"] == "input" - assert args["save_workdir"] is True - assert args["skip_upload"] is False - assert args["skip_validation"] is False +def test_relative_href_task(data_path: Callable[[str], Path]) -> None: + class RelativeHrefTask(HrefTask): + def process_href(self, href: str) -> Item: + return Item.from_file(href) -if __name__ == "__main__": - output = NothingTask.cli() + task = RelativeHrefTask() + task.payload_href = str(data_path("payload.json")) + result = task.process(Href(href="./simple-item.json"))[0] + assert result.id == "20201211_223832_CS2" diff --git a/tests/test_task_download.py b/tests/test_task_download.py deleted file mode 100644 index 537b990..0000000 --- a/tests/test_task_download.py +++ /dev/null @@ -1,85 +0,0 @@ -import json -from pathlib import Path -from typing import Any, Dict - -import pytest - -from .tasks import NothingTask - - -@pytest.fixture -def item_collection() -> Dict[str, Any]: - name = "sentinel2-l2a-j2k-payload" - filename = Path(__file__).parent / "fixtures" / f"{name}.json" - with open(filename) as f: - items = json.loads(f.read()) - assert isinstance(items, dict) - return items - - -def test_download_nosuch_asset(item_collection: Dict[str, Any]) -> None: - t = NothingTask( - item_collection, - ) - item = t.download_item_assets(t.items[0], assets=["nosuch_asset"]).to_dict() - # new item same as old item - assert item["assets"] == t.items[0].to_dict()["assets"] - - -# @vcr.use_cassette(str(cassettepath / 'download_assets')) -def test_download_item_asset(tmp_path: Path, item_collection: Dict[str, Any]) -> None: - t = NothingTask(item_collection, workdir=tmp_path / "test-task-download-item-asset") - item = t.download_item_assets(t.items[0], assets=["tileinfo_metadata"]).to_dict() - fname = item["assets"]["tileinfo_metadata"]["href"] - filename = Path(fname) - assert filename.is_file() is True - del t - assert filename.is_file() is False - - -# @vcr.use_cassette(str(cassettepath / 'download_assets')) -def test_download_item_assets(tmp_path: Path, item_collection: Dict[str, Any]) -> None: - t = NothingTask( - item_collection, - workdir=tmp_path / "test-task-download-item-assets", - save_workdir=True, - ) - item = t.download_item_assets( - t.items[0], assets=["tileinfo_metadata", "granule_metadata"] - ).to_dict() - filename = Path(item["assets"]["tileinfo_metadata"]["href"]) - assert filename.is_file() is True - filename = Path(item["assets"]["granule_metadata"]["href"]) - assert filename.is_file() is True - - -def test_download_items_assets(tmp_path: Path, item_collection: Dict[str, Any]) -> None: - asset_key = "tileinfo_metadata" - t = NothingTask( - item_collection, - workdir=tmp_path / "test-task-download-items-assets", - save_workdir=True, - ) - items = [i.to_dict() for i in t.download_items_assets(t.items, assets=[asset_key])] - filename = Path(items[0]["assets"][asset_key]["href"]) - assert filename.is_file() is True - filename = Path(items[1]["assets"][asset_key]["href"]) - assert filename.is_file() is True - - -# @vcr.use_cassette(str(cassettepath / 'download_assets')) -@pytest.mark.s3_requester_pays -def test_download_large_asset(tmp_path: Path, item_collection: Dict[str, Any]) -> None: - t = NothingTask( - item_collection, - workdir=tmp_path / "test-task-download-assets", - save_workdir=True, - ) - item = t.download_item_assets( - t.items[0], assets=["red"], requester_pays=True - ).to_dict() - filename = Path(item["assets"]["red"]["href"]) - assert filename.is_file() is True - # t._save_workdir = False - del t - # assert (filename.is_file() is False)