Skip to content

Commit

Permalink
Allow specifying insert method for bulk transactions (#614)
Browse files Browse the repository at this point in the history
* Added method parameter to bulk_item_insert route in order to allow more flexibility in how data is loaded into the DB

* Add method param to AsyncBaseBulkTransactionsClient, which pgstac inherits from

* Moved method on to the Items base model for bulk transaction request

* Updated docs and changelog with bulk transactions method parameter
  • Loading branch information
edkeeble authored Oct 25, 2023
1 parent 2cb8359 commit d035774
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

* Forward `x-forwarded-host` ([#586](https://github.com/stac-utils/stac-fastapi/pull/586))
* Add CQL2-json to filter conformance class ([#611](https://github.com/stac-utils/stac-fastapi/issues/611))
* Add `method` parameter to Bulk Transactions extension in order to support upserting bulk data ([#614](https://github.com/stac-utils/stac-fastapi/pull/614))

## [2.4.8] - 2023-06-07

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Bulk transactions extension."""
import abc
from enum import Enum
from typing import Any, Dict, List, Optional, Union

import attr
Expand All @@ -11,10 +12,18 @@
from stac_fastapi.types.extension import ApiExtension


class BulkTransactionMethod(str, Enum):
"""Bulk Transaction Methods."""

INSERT = "insert"
UPSERT = "upsert"


class Items(BaseModel):
"""A group of STAC Item objects, in the form of a dictionary from Item.id -> Item."""

items: Dict[str, Any]
method: BulkTransactionMethod = BulkTransactionMethod.INSERT

def __iter__(self):
"""Return an iterable of STAC Item objects."""
Expand All @@ -36,7 +45,10 @@ def _chunks(lst, n):

@abc.abstractmethod
def bulk_item_insert(
self, items: Items, chunk_size: Optional[int] = None, **kwargs
self,
items: Items,
chunk_size: Optional[int] = None,
**kwargs,
) -> str:
"""Bulk creation of items.
Expand All @@ -55,7 +67,11 @@ class AsyncBaseBulkTransactionsClient(abc.ABC):
"""BulkTransactionsClient."""

@abc.abstractmethod
async def bulk_item_insert(self, items: Items, **kwargs) -> str:
async def bulk_item_insert(
self,
items: Items,
**kwargs,
) -> str:
"""Bulk creation of items.
Args:
Expand All @@ -77,11 +93,19 @@ class BulkTransactionExtension(ApiExtension):
attribute "items", that has a value that is an object with a group of
attributes that are the ids of each Item, and the value is the Item entity.
Optionally, clients can specify a "method" attribute that is either "insert"
or "upsert". If "insert", then the items will be inserted if they do not
exist, and an error will be returned if they do. If "upsert", then the items
will be inserted if they do not exist, and updated if they do. This defaults
to "insert".
{
"items": {
"id1": { "type": "Feature", ... },
"id2": { "type": "Feature", ... },
"id3": { "type": "Feature", ... }
"items": {
"id1": { "type": "Feature", ... },
"id2": { "type": "Feature", ... },
"id3": { "type": "Feature", ... }
},
"method": "insert"
}
"""

Expand Down

0 comments on commit d035774

Please sign in to comment.