Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add support for DynamoDB online_read in batches #2371

Merged
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
c34bf3e
feat: dynamodb onlin read in batches
TremaMiguel Mar 6, 2022
56d2d32
run linters and format
TremaMiguel Mar 6, 2022
7b98faf
feat: batch_size parameter
TremaMiguel Mar 6, 2022
94abfb6
Merge branch 'feast-dev:master' into feat/dynamo_db_online_write_read
TremaMiguel Mar 6, 2022
b5c1a3d
docs: typo in batch_size description
TremaMiguel Mar 6, 2022
5a12856
trailing white space
TremaMiguel Mar 6, 2022
8bd2a84
fix: batch_size is last argument
TremaMiguel Mar 7, 2022
fb6eacb
test: dynamodb online store online_read in batches
TremaMiguel Mar 7, 2022
1bbd5dc
Merge branch 'master' into feat/dynamo_db_online_write_read
adchia Mar 7, 2022
307bab9
test: mock dynamodb behavior
TremaMiguel Mar 7, 2022
e52a895
feat: batch_size value must be less than 40
TremaMiguel Mar 8, 2022
29b5cf6
Merge branch 'master' into feat/dynamo_db_online_write_read
adchia Mar 9, 2022
97fd71f
feat: batch_size defaults to 40
TremaMiguel Mar 10, 2022
ddb3f0a
Merge branch 'feat/dynamo_db_online_write_read' of github.com:TremaMi…
TremaMiguel Mar 10, 2022
12064e4
feat: sort dynamodb responses
TremaMiguel Mar 11, 2022
449f60d
merge branch master into feat/dynamo_db_online_write_read
TremaMiguel Mar 12, 2022
3f72228
resolve merge conflicts
TremaMiguel Mar 12, 2022
7a4edbd
test online response proto with redshift:dynamodb
TremaMiguel Mar 12, 2022
3843a02
feat: consistency in batch_size process
TremaMiguel Mar 15, 2022
88e183e
fix: return batch_size times None
TremaMiguel Mar 16, 2022
23cb49a
remove debug code
TremaMiguel Mar 16, 2022
44f97e7
Merge branch 'feast-dev:master' into feat/dynamo_db_online_write_read
TremaMiguel Mar 22, 2022
eaf4940
typo in docstring
TremaMiguel Mar 22, 2022
5bc54d3
batch_size in onlineconfigstore
TremaMiguel Mar 23, 2022
c7ab086
Merge branch 'master' into feat/dynamo_db_online_write_read
TremaMiguel Mar 23, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 47 additions & 13 deletions sdk/python/feast/infra/online_stores/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ def online_read(
table: FeatureView,
entity_keys: List[EntityKeyProto],
requested_features: Optional[List[str]] = None,
batch_size: int = 20,
TremaMiguel marked this conversation as resolved.
Show resolved Hide resolved
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
"""
Retrieve feature values from the online DynamoDB store.
Expand All @@ -199,26 +200,59 @@ def online_read(
config: The RepoConfig for the current FeatureStore.
table: Feast FeatureView.
entity_keys: a list of entity keys that should be read from the FeatureStore.
batch_size: the number of items to send in a batch_get_item request to DynamoDB.
DynamoDB record size limit is 400kb and can retrieve 16MB per call, it is recommended
to set batch_size value less than 40 to avoid ``UnprocessedKeys`` and
``ValidationException`` errors.
"""
if batch_size > 40:
TremaMiguel marked this conversation as resolved.
Show resolved Hide resolved
raise ValueError(
f"batch_size value must be less than 40, input value is {batch_size}"
)
online_config = config.online_store
assert isinstance(online_config, DynamoDBOnlineStoreConfig)
dynamodb_resource = self._get_dynamodb_resource(online_config.region)
table_instance = dynamodb_resource.Table(_get_table_name(config, table))

result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []
for entity_key in entity_keys:
table_instance = dynamodb_resource.Table(_get_table_name(config, table))
entity_id = compute_entity_id(entity_key)
entity_ids = [compute_entity_id(entity_key) for entity_key in entity_keys]

len_entity_ids = len(entity_ids)
# Iterate until the end_index is the value len_entity_ids
iters = (
TremaMiguel marked this conversation as resolved.
Show resolved Hide resolved
len_entity_ids // batch_size + 1
if len_entity_ids % batch_size > 0
else len_entity_ids // batch_size
)
for i in range(iters):
start_index = min(i * batch_size, len_entity_ids)
end_index = min(i * batch_size + batch_size, len_entity_ids)

batch_entity_ids = {
table_instance.name: {
"Keys": [
{"entity_id": entity_id}
for entity_id in entity_ids[start_index:end_index]
]
}
}

with tracing_span(name="remote_call"):
response = table_instance.get_item(Key={"entity_id": entity_id})
value = response.get("Item")

if value is not None:
res = {}
for feature_name, value_bin in value["values"].items():
val = ValueProto()
val.ParseFromString(value_bin.value)
res[feature_name] = val
result.append((datetime.fromisoformat(value["event_ts"]), res))
response = dynamodb_resource.batch_get_item(
RequestItems=batch_entity_ids
)

response = response.get("Responses")
table_responses = response.get(table_instance.name)

if table_responses:
for tbl_res in table_responses:
res = {}
for feature_name, value_bin in tbl_res["values"].items():
val = ValueProto()
val.ParseFromString(value_bin.value)
res[feature_name] = val
result.append((datetime.fromisoformat(tbl_res["event_ts"]), res))
else:
result.append((None, None))
return result
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from dataclasses import dataclass

import pytest
from moto import mock_dynamodb2

from feast.infra.offline_stores.file import FileOfflineStoreConfig
from feast.infra.online_stores.dynamodb import (
DynamoDBOnlineStore,
DynamoDBOnlineStoreConfig,
)
from feast.repo_config import RepoConfig
from tests.utils.online_store_utils import (
_create_n_customer_test_samples,
_create_test_table,
_insert_data_test_table,
)

REGISTRY = "s3://test_registry/registry.db"
PROJECT = "test_aws"
PROVIDER = "aws"
TABLE_NAME = "dynamodb_online_store"
REGION = "us-west-2"


@dataclass
class MockFeatureView:
name: str


@pytest.fixture
def repo_config():
return RepoConfig(
registry=REGISTRY,
project=PROJECT,
provider=PROVIDER,
online_store=DynamoDBOnlineStoreConfig(region=REGION),
offline_store=FileOfflineStoreConfig(),
)


@mock_dynamodb2
@pytest.mark.parametrize("n_samples", [5, 50, 100])
def test_online_read(repo_config, n_samples):
"""Test DynamoDBOnlineStore online_read method."""
_create_test_table(PROJECT, f"{TABLE_NAME}_{n_samples}", REGION)
data = _create_n_customer_test_samples(n=n_samples)
_insert_data_test_table(data, PROJECT, f"{TABLE_NAME}_{n_samples}", REGION)

entity_keys, features = zip(*data)
dynamodb_store = DynamoDBOnlineStore()
returned_items = dynamodb_store.online_read(
config=repo_config,
table=MockFeatureView(name=f"{TABLE_NAME}_{n_samples}"),
entity_keys=entity_keys,
)
assert len(returned_items) == len(data)
assert [item[1] for item in returned_items] == list(features)
54 changes: 54 additions & 0 deletions sdk/python/tests/utils/online_store_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from datetime import datetime

import boto3

from feast import utils
from feast.infra.online_stores.helpers import compute_entity_id
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto


def _create_n_customer_test_samples(n=10):
return [
(
EntityKeyProto(
join_keys=["customer"], entity_values=[ValueProto(string_val=str(i))]
),
{
"avg_orders_day": ValueProto(float_val=1.0),
"name": ValueProto(string_val="John"),
"age": ValueProto(int64_val=3),
},
)
for i in range(n)
]


def _create_test_table(project, tbl_name, region):
client = boto3.client("dynamodb", region_name=region)
client.create_table(
TableName=f"{project}.{tbl_name}",
KeySchema=[{"AttributeName": "entity_id", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "entity_id", "AttributeType": "S"}],
BillingMode="PAY_PER_REQUEST",
)


def _delete_test_table(project, tbl_name, region):
client = boto3.client("dynamodb", region_name=region)
client.delete_table(TableName=f"{project}.{tbl_name}")


def _insert_data_test_table(data, project, tbl_name, region):
dynamodb_resource = boto3.resource("dynamodb", region_name=region)
table_instance = dynamodb_resource.Table(f"{project}.{tbl_name}")
for entity_key, features in data:
entity_id = compute_entity_id(entity_key)
with table_instance.batch_writer() as batch:
batch.put_item(
Item={
"entity_id": entity_id,
"event_ts": str(utils.make_tzaware(datetime.utcnow())),
"values": {k: v.SerializeToString() for k, v in features.items()},
}
)