Skip to content

Commit

Permalink
Add support for DynamoDB and S3 registry (#1483)
Browse files Browse the repository at this point in the history
* Add support for DynamoDB and S3 registry

Signed-off-by: lblokhin <lenin133@yandex.ru>

* rcu and wcu as a parameter of dynamodb online store

Signed-off-by: lblokhin <lenin133@yandex.ru>

* fix linter

Signed-off-by: lblokhin <lenin133@yandex.ru>

* aws dependency to extras

Signed-off-by: lblokhin <lenin133@yandex.ru>

* FEAST_S3_ENDPOINT_URL

Signed-off-by: lblokhin <lenin133@yandex.ru>

* tests

Signed-off-by: lblokhin <lenin133@yandex.ru>

* fix signature, after merge

Signed-off-by: lblokhin <lenin133@yandex.ru>

* aws default region name configurable

Signed-off-by: lblokhin <lenin133@yandex.ru>

* add offlinestore config type to test

Signed-off-by: lblokhin <lenin133@yandex.ru>

* review changes

Signed-off-by: lblokhin <lenin133@yandex.ru>

* review requested changes

Signed-off-by: lblokhin <lenin133@yandex.ru>

* integration test for Dynamo

Signed-off-by: lblokhin <lenin133@yandex.ru>

* change the rest of table_name to table_instance (where table_name is actually an instance of DynamoDB Table object)

Signed-off-by: lblokhin <lenin133@yandex.ru>

* fix DynamoDBOnlineStore commit

Signed-off-by: lblokhin <lenin133@yandex.ru>

* move client to _initialize_dynamodb

Signed-off-by: lblokhin <lenin133@yandex.ru>

* rename document_id to entity_id and Row to entity_id

Signed-off-by: lblokhin <lenin133@yandex.ru>

* The default value is None

Signed-off-by: lblokhin <lenin133@yandex.ru>

* Remove Datastore from the docstring.

Signed-off-by: lblokhin <lenin133@yandex.ru>

* get rid of the return call from S3RegistryStore

Signed-off-by: lblokhin <lenin133@yandex.ru>

* merge two exceptions

Signed-off-by: lblokhin <lenin133@yandex.ru>

* For ci requirement

Signed-off-by: lblokhin <lenin133@yandex.ru>

* remove configuration from test

Signed-off-by: lblokhin <lenin133@yandex.ru>

* feast-integration-tests for tests

Signed-off-by: lblokhin <lenin133@yandex.ru>

* change test path

Signed-off-by: lblokhin <lenin133@yandex.ru>

* add fixture feature_store_with_s3_registry to test

Signed-off-by: lblokhin <lenin133@yandex.ru>

* region required

Signed-off-by: lblokhin <lenin133@yandex.ru>

* Address the rest of the comments

Signed-off-by: Tsotne Tabidze <tsotne@tecton.ai>

* Update to_table to to_arrow

Signed-off-by: Tsotne Tabidze <tsotne@tecton.ai>

Co-authored-by: Tsotne Tabidze <tsotne@tecton.ai>
  • Loading branch information
leonid133 and Tsotne Tabidze authored Jul 3, 2021
1 parent 0a2e173 commit 17bfa61
Show file tree
Hide file tree
Showing 20 changed files with 678 additions and 21 deletions.
Binary file added docs/specs/dynamodb_online_example.monopic
Binary file not shown.
Binary file added docs/specs/dynamodb_online_example.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion docs/specs/online_store_format.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ This format is considered part of Feast public API contract; that allows other c

The format is not entirely technology or cloud agnostic. Since users may opt to use different key-value stores as an underlying engine to store feature data, and we don't want to aim for the lowest common denominator across them, we have to provide different "flavors" of this data format, specialized for every supported store.

This version of the Online Store Format supports only Redis as the underlying storage engine. We envision adding more storage engines to this document in the future.
This version of the Online Store Format supports Redis and DynamoDB as storage engine. We envision adding more storage engines to this document in the future.


## Overview
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ def materialize_incremental_command(ctx: click.Context, end_ts: str, views: List
@click.option(
"--template",
"-t",
type=click.Choice(["local", "gcp"], case_sensitive=False),
type=click.Choice(["local", "gcp", "aws"], case_sensitive=False),
help="Specify a template for the created project",
default="local",
)
Expand Down
10 changes: 10 additions & 0 deletions sdk/python/feast/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,16 @@ def __init__(self, name, project=None):
super().__init__(f"Feature table {name} does not exist")


class S3RegistryBucketNotExist(FeastObjectNotFoundException):
def __init__(self, bucket):
super().__init__(f"S3 bucket {bucket} for the Feast registry does not exist")


class S3RegistryBucketForbiddenAccess(FeastObjectNotFoundException):
def __init__(self, bucket):
super().__init__(f"S3 bucket {bucket} for the Feast registry can't be accessed")


class FeastProviderLoginError(Exception):
"""Error class that indicates a user has not authenticated with their provider."""

Expand Down
141 changes: 141 additions & 0 deletions sdk/python/feast/infra/aws.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
from datetime import datetime
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union

import pandas
from tqdm import tqdm

from feast import FeatureTable
from feast.entity import Entity
from feast.feature_view import FeatureView
from feast.infra.offline_stores.helpers import get_offline_store_from_config
from feast.infra.online_stores.helpers import get_online_store_from_config
from feast.infra.provider import (
Provider,
RetrievalJob,
_convert_arrow_to_proto,
_get_column_names,
_run_field_mapping,
)
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.registry import Registry
from feast.repo_config import RepoConfig


class AwsProvider(Provider):
def __init__(self, config: RepoConfig):
self.repo_config = config
self.offline_store = get_offline_store_from_config(config.offline_store)
self.online_store = get_online_store_from_config(config.online_store)

def update_infra(
self,
project: str,
tables_to_delete: Sequence[Union[FeatureTable, FeatureView]],
tables_to_keep: Sequence[Union[FeatureTable, FeatureView]],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
partial: bool,
):
self.online_store.update(
config=self.repo_config,
tables_to_delete=tables_to_delete,
tables_to_keep=tables_to_keep,
entities_to_keep=entities_to_keep,
entities_to_delete=entities_to_delete,
partial=partial,
)

def teardown_infra(
self,
project: str,
tables: Sequence[Union[FeatureTable, FeatureView]],
entities: Sequence[Entity],
) -> None:
self.online_store.teardown(self.repo_config, tables, entities)

def online_write_batch(
self,
config: RepoConfig,
table: Union[FeatureTable, FeatureView],
data: List[
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
],
progress: Optional[Callable[[int], Any]],
) -> None:
self.online_store.online_write_batch(config, table, data, progress)

def online_read(
self,
config: RepoConfig,
table: Union[FeatureTable, FeatureView],
entity_keys: List[EntityKeyProto],
requested_features: List[str] = None,
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
result = self.online_store.online_read(config, table, entity_keys)

return result

def materialize_single_feature_view(
self,
config: RepoConfig,
feature_view: FeatureView,
start_date: datetime,
end_date: datetime,
registry: Registry,
project: str,
tqdm_builder: Callable[[int], tqdm],
) -> None:
entities = []
for entity_name in feature_view.entities:
entities.append(registry.get_entity(entity_name, project))

(
join_key_columns,
feature_name_columns,
event_timestamp_column,
created_timestamp_column,
) = _get_column_names(feature_view, entities)

offline_job = self.offline_store.pull_latest_from_table_or_query(
config=config,
data_source=feature_view.input,
join_key_columns=join_key_columns,
feature_name_columns=feature_name_columns,
event_timestamp_column=event_timestamp_column,
created_timestamp_column=created_timestamp_column,
start_date=start_date,
end_date=end_date,
)

table = offline_job.to_arrow()

if feature_view.input.field_mapping is not None:
table = _run_field_mapping(table, feature_view.input.field_mapping)

join_keys = [entity.join_key for entity in entities]
rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys)

with tqdm_builder(len(rows_to_write)) as pbar:
self.online_write_batch(
self.repo_config, feature_view, rows_to_write, lambda x: pbar.update(x)
)

def get_historical_features(
self,
config: RepoConfig,
feature_views: List[FeatureView],
feature_refs: List[str],
entity_df: Union[pandas.DataFrame, str],
registry: Registry,
project: str,
) -> RetrievalJob:
job = self.offline_store.get_historical_features(
config=config,
feature_views=feature_views,
feature_refs=feature_refs,
entity_df=entity_df,
registry=registry,
project=project,
)
return job
17 changes: 3 additions & 14 deletions sdk/python/feast/infra/online_stores/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@
from multiprocessing.pool import ThreadPool
from typing import Any, Callable, Dict, Iterator, List, Optional, Sequence, Tuple, Union

import mmh3
from pydantic import PositiveInt, StrictStr
from pydantic.typing import Literal

from feast import Entity, FeatureTable, utils
from feast.feature_view import FeatureView
from feast.infra.key_encoding_utils import serialize_entity_key
from feast.infra.online_stores.helpers import compute_entity_id
from feast.infra.online_stores.online_store import OnlineStore
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
Expand Down Expand Up @@ -191,7 +190,7 @@ def _write_minibatch(
):
entities = []
for entity_key, features, timestamp, created_ts in data:
document_id = compute_datastore_entity_id(entity_key)
document_id = compute_entity_id(entity_key)

key = client.key(
"Project", project, "Table", table.name, "Row", document_id,
Expand Down Expand Up @@ -236,7 +235,7 @@ def online_read(

result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []
for entity_key in entity_keys:
document_id = compute_datastore_entity_id(entity_key)
document_id = compute_entity_id(entity_key)
key = client.key(
"Project", feast_project, "Table", table.name, "Row", document_id
)
Expand All @@ -253,16 +252,6 @@ def online_read(
return result


def compute_datastore_entity_id(entity_key: EntityKeyProto) -> str:
"""
Compute Datastore Entity id given Feast Entity Key.
Remember that Datastore Entity is a concept from the Datastore data model, that has nothing to
do with the Entity concept we have in Feast.
"""
return mmh3.hash_bytes(serialize_entity_key(entity_key)).hex()


def _delete_all_values(client, key) -> None:
"""
Delete all data under the key path in datastore.
Expand Down
Loading

0 comments on commit 17bfa61

Please sign in to comment.