From c34bf3e4f1affc1220af73dfdd0d26eeda9c9b55 Mon Sep 17 00:00:00 2001 From: Miguel Trejo Date: Sat, 5 Mar 2022 18:50:33 -0600 Subject: [PATCH 01/18] feat: dynamodb onlin read in batches Signed-off-by: Miguel Trejo --- feast_test_changes.ipynb | 208 ++++++++++++++++++ .../feast/infra/online_stores/dynamodb.py | 46 +++- 2 files changed, 242 insertions(+), 12 deletions(-) create mode 100644 feast_test_changes.ipynb diff --git a/feast_test_changes.ipynb b/feast_test_changes.ipynb new file mode 100644 index 0000000000..981fd61c0d --- /dev/null +++ b/feast_test_changes.ipynb @@ -0,0 +1,208 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 7, + "id": "57d0a096", + "metadata": {}, + "outputs": [], + "source": [ + "batch_size = 10\n", + "entity_ids = range(85)\n", + "total = len(entity_ids)" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "id": "27072d11", + "metadata": {}, + "outputs": [], + "source": [ + "iters = total // batch_size + 1 if total % batch_size > 0 else total // batch_size" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "id": "e2e470b4", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "0 10\n", + "10 20\n", + "20 30\n", + "30 40\n", + "40 50\n", + "50 60\n", + "60 70\n", + "70 80\n", + "80 85\n" + ] + } + ], + "source": [ + "for i in range(iters):\n", + " start_index = min(i * batch_size, len(entity_ids))\n", + " end_index = min(i * batch_size + batch_size, len(entity_ids))\n", + " print(start_index, end_index)" + ] + }, + { + "cell_type": "code", + "execution_count": 18, + "id": "a2641186", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "empty\n" + ] + } + ], + "source": [ + "if {}:\n", + " print('aa')\n", + "else:\n", + " print('empty')" + ] + }, + { + "cell_type": "code", + "execution_count": 19, + "id": "ad83fde4", + "metadata": {}, + "outputs": [], + "source": [ + "result = {\n", + " \"Item\": {\n", + " \"AlbumTitle\": {\n", + " \"S\": \"Songs About Life\"\n", + " },\n", + " \"SongTitle\": {\n", + " \"S\": \"Happy Day\"\n", + " },\n", + " \"Artist\": {\n", + " \"S\": \"Acme Band\"\n", + " }\n", + " },\n", + " \"ConsumedCapacity\": {\n", + " \"TableName\": \"MusicCollection\",\n", + " \"CapacityUnits\": 0.5\n", + " }\n", + "}" + ] + }, + { + "cell_type": "code", + "execution_count": 20, + "id": "bd88642b", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{'AlbumTitle': {'S': 'Songs About Life'},\n", + " 'SongTitle': {'S': 'Happy Day'},\n", + " 'Artist': {'S': 'Acme Band'}}" + ] + }, + "execution_count": 20, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "result.get(\"Item\")" + ] + }, + { + "cell_type": "code", + "execution_count": 21, + "id": "8133f52a", + "metadata": {}, + "outputs": [], + "source": [ + "response = {\n", + " \"Responses\": {\n", + " \"MusicCollection\": [\n", + " {\n", + " \"AlbumTitle\": {\n", + " \"S\": \"Somewhat Famous\"\n", + " }\n", + " },\n", + " {\n", + " \"AlbumTitle\": {\n", + " \"S\": \"Blue Sky Blues\"\n", + " }\n", + " },\n", + " {\n", + " \"AlbumTitle\": {\n", + " \"S\": \"Louder Than Ever\"\n", + " }\n", + " }\n", + " ]\n", + " },\n", + " \"UnprocessedKeys\": {},\n", + " \"ConsumedCapacity\": [\n", + " {\n", + " \"TableName\": \"MusicCollection\",\n", + " \"CapacityUnits\": 1.5\n", + " }\n", + " ]\n", + "}" + ] + }, + { + "cell_type": "code", + "execution_count": 22, + "id": "c90e260a", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "[{'AlbumTitle': {'S': 'Somewhat Famous'}},\n", + " {'AlbumTitle': {'S': 'Blue Sky Blues'}},\n", + " {'AlbumTitle': {'S': 'Louder Than Ever'}}]" + ] + }, + "execution_count": 22, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "response = response.get(\"Responses\")\n", + "response = response.get(\"MusicCollection\")\n", + "response" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.7" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/sdk/python/feast/infra/online_stores/dynamodb.py b/sdk/python/feast/infra/online_stores/dynamodb.py index 66c32c1fb8..3302d5ecd9 100644 --- a/sdk/python/feast/infra/online_stores/dynamodb.py +++ b/sdk/python/feast/infra/online_stores/dynamodb.py @@ -203,22 +203,44 @@ def online_read( online_config = config.online_store assert isinstance(online_config, DynamoDBOnlineStoreConfig) dynamodb_resource = self._get_dynamodb_resource(online_config.region) + table_instance = dynamodb_resource.Table(_get_table_name(config, table)) result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] - for entity_key in entity_keys: - table_instance = dynamodb_resource.Table(_get_table_name(config, table)) - entity_id = compute_entity_id(entity_key) + entity_ids = [ + compute_entity_id(entity_key) for entity_key in entity_keys + ] + + # DynamoDB record size limit is 400kb and can retrieve 16MB per call + # More info: https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_BatchGetItem.html + batch_size = 10 + len_entity_ids = len(entity_ids) + iters = len_entity_ids // batch_size + 1 if len_entity_ids % batch_size > 0 else len_entity_ids // batch_size + for i in range(iters): + start_index = min(i * batch_size, len_entity_ids) + end_index = min(i * batch_size + batch_size, len_entity_ids) + + batch_entity_ids = { + table_instance.name: { + 'Keys': [ + {'entity_id': entity_id} for entity_id in entity_ids[start_index:end_index] + ] + } + } + with tracing_span(name="remote_call"): - response = table_instance.get_item(Key={"entity_id": entity_id}) - value = response.get("Item") - - if value is not None: + response = dynamodb_resource.batch_get_item(RequestItems=batch_entity_ids) + + response = response.get("Responses") + table_responses = response.get(table_instance.name) + + if table_responses: res = {} - for feature_name, value_bin in value["values"].items(): - val = ValueProto() - val.ParseFromString(value_bin.value) - res[feature_name] = val - result.append((datetime.fromisoformat(value["event_ts"]), res)) + for tbl_res in table_responses: + for feature_name, value_bin in tbl_res["values"].items(): + val = ValueProto() + val.ParseFromString(value_bin.value) + res[feature_name] = val + result.append((datetime.fromisoformat(tbl_res["event_ts"]), res)) else: result.append((None, None)) return result From 56d2d322a61ab7824473287e6426fd9b8f8c996d Mon Sep 17 00:00:00 2001 From: Miguel Trejo Date: Sat, 5 Mar 2022 19:28:40 -0600 Subject: [PATCH 02/18] run linters and format Signed-off-by: Miguel Trejo --- feast_test_changes.ipynb | 208 ------------------ .../feast/infra/online_stores/dynamodb.py | 27 ++- 2 files changed, 16 insertions(+), 219 deletions(-) delete mode 100644 feast_test_changes.ipynb diff --git a/feast_test_changes.ipynb b/feast_test_changes.ipynb deleted file mode 100644 index 981fd61c0d..0000000000 --- a/feast_test_changes.ipynb +++ /dev/null @@ -1,208 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": 7, - "id": "57d0a096", - "metadata": {}, - "outputs": [], - "source": [ - "batch_size = 10\n", - "entity_ids = range(85)\n", - "total = len(entity_ids)" - ] - }, - { - "cell_type": "code", - "execution_count": 13, - "id": "27072d11", - "metadata": {}, - "outputs": [], - "source": [ - "iters = total // batch_size + 1 if total % batch_size > 0 else total // batch_size" - ] - }, - { - "cell_type": "code", - "execution_count": 15, - "id": "e2e470b4", - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "0 10\n", - "10 20\n", - "20 30\n", - "30 40\n", - "40 50\n", - "50 60\n", - "60 70\n", - "70 80\n", - "80 85\n" - ] - } - ], - "source": [ - "for i in range(iters):\n", - " start_index = min(i * batch_size, len(entity_ids))\n", - " end_index = min(i * batch_size + batch_size, len(entity_ids))\n", - " print(start_index, end_index)" - ] - }, - { - "cell_type": "code", - "execution_count": 18, - "id": "a2641186", - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "empty\n" - ] - } - ], - "source": [ - "if {}:\n", - " print('aa')\n", - "else:\n", - " print('empty')" - ] - }, - { - "cell_type": "code", - "execution_count": 19, - "id": "ad83fde4", - "metadata": {}, - "outputs": [], - "source": [ - "result = {\n", - " \"Item\": {\n", - " \"AlbumTitle\": {\n", - " \"S\": \"Songs About Life\"\n", - " },\n", - " \"SongTitle\": {\n", - " \"S\": \"Happy Day\"\n", - " },\n", - " \"Artist\": {\n", - " \"S\": \"Acme Band\"\n", - " }\n", - " },\n", - " \"ConsumedCapacity\": {\n", - " \"TableName\": \"MusicCollection\",\n", - " \"CapacityUnits\": 0.5\n", - " }\n", - "}" - ] - }, - { - "cell_type": "code", - "execution_count": 20, - "id": "bd88642b", - "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "{'AlbumTitle': {'S': 'Songs About Life'},\n", - " 'SongTitle': {'S': 'Happy Day'},\n", - " 'Artist': {'S': 'Acme Band'}}" - ] - }, - "execution_count": 20, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "result.get(\"Item\")" - ] - }, - { - "cell_type": "code", - "execution_count": 21, - "id": "8133f52a", - "metadata": {}, - "outputs": [], - "source": [ - "response = {\n", - " \"Responses\": {\n", - " \"MusicCollection\": [\n", - " {\n", - " \"AlbumTitle\": {\n", - " \"S\": \"Somewhat Famous\"\n", - " }\n", - " },\n", - " {\n", - " \"AlbumTitle\": {\n", - " \"S\": \"Blue Sky Blues\"\n", - " }\n", - " },\n", - " {\n", - " \"AlbumTitle\": {\n", - " \"S\": \"Louder Than Ever\"\n", - " }\n", - " }\n", - " ]\n", - " },\n", - " \"UnprocessedKeys\": {},\n", - " \"ConsumedCapacity\": [\n", - " {\n", - " \"TableName\": \"MusicCollection\",\n", - " \"CapacityUnits\": 1.5\n", - " }\n", - " ]\n", - "}" - ] - }, - { - "cell_type": "code", - "execution_count": 22, - "id": "c90e260a", - "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "[{'AlbumTitle': {'S': 'Somewhat Famous'}},\n", - " {'AlbumTitle': {'S': 'Blue Sky Blues'}},\n", - " {'AlbumTitle': {'S': 'Louder Than Ever'}}]" - ] - }, - "execution_count": 22, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "response = response.get(\"Responses\")\n", - "response = response.get(\"MusicCollection\")\n", - "response" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3 (ipykernel)", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.9.7" - } - }, - "nbformat": 4, - "nbformat_minor": 5 -} diff --git a/sdk/python/feast/infra/online_stores/dynamodb.py b/sdk/python/feast/infra/online_stores/dynamodb.py index 3302d5ecd9..d39829faac 100644 --- a/sdk/python/feast/infra/online_stores/dynamodb.py +++ b/sdk/python/feast/infra/online_stores/dynamodb.py @@ -206,33 +206,38 @@ def online_read( table_instance = dynamodb_resource.Table(_get_table_name(config, table)) result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] - entity_ids = [ - compute_entity_id(entity_key) for entity_key in entity_keys - ] + entity_ids = [compute_entity_id(entity_key) for entity_key in entity_keys] # DynamoDB record size limit is 400kb and can retrieve 16MB per call # More info: https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_BatchGetItem.html batch_size = 10 len_entity_ids = len(entity_ids) - iters = len_entity_ids // batch_size + 1 if len_entity_ids % batch_size > 0 else len_entity_ids // batch_size + iters = ( + len_entity_ids // batch_size + 1 + if len_entity_ids % batch_size > 0 + else len_entity_ids // batch_size + ) for i in range(iters): start_index = min(i * batch_size, len_entity_ids) end_index = min(i * batch_size + batch_size, len_entity_ids) - + batch_entity_ids = { table_instance.name: { - 'Keys': [ - {'entity_id': entity_id} for entity_id in entity_ids[start_index:end_index] + "Keys": [ + {"entity_id": entity_id} + for entity_id in entity_ids[start_index:end_index] ] } } - + with tracing_span(name="remote_call"): - response = dynamodb_resource.batch_get_item(RequestItems=batch_entity_ids) - + response = dynamodb_resource.batch_get_item( + RequestItems=batch_entity_ids + ) + response = response.get("Responses") table_responses = response.get(table_instance.name) - + if table_responses: res = {} for tbl_res in table_responses: From 7b98faf56a8dd64cb047b2a9c7b635e54cdd7bfa Mon Sep 17 00:00:00 2001 From: Miguel Trejo Date: Sat, 5 Mar 2022 20:09:22 -0600 Subject: [PATCH 03/18] feat: batch_size parameter Signed-off-by: Miguel Trejo --- sdk/python/feast/infra/online_stores/dynamodb.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/dynamodb.py b/sdk/python/feast/infra/online_stores/dynamodb.py index d39829faac..c36c0e387d 100644 --- a/sdk/python/feast/infra/online_stores/dynamodb.py +++ b/sdk/python/feast/infra/online_stores/dynamodb.py @@ -187,6 +187,7 @@ def online_read( config: RepoConfig, table: FeatureView, entity_keys: List[EntityKeyProto], + batch_size: int = 10, requested_features: Optional[List[str]] = None, ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: """ @@ -199,6 +200,9 @@ def online_read( config: The RepoConfig for the current FeatureStore. table: Feast FeatureView. entity_keys: a list of entity keys that should be read from the FeatureStore. + batch_size: the number of items to send to send in a batch_get_item request to DynamoDB. + DynamoDB record size limit is 400kb and can retrieve 16MB per call, it is recommended + to set batch_size value less than 40 to UnprocessedKeys and ValidationException. """ online_config = config.online_store assert isinstance(online_config, DynamoDBOnlineStoreConfig) @@ -208,10 +212,8 @@ def online_read( result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] entity_ids = [compute_entity_id(entity_key) for entity_key in entity_keys] - # DynamoDB record size limit is 400kb and can retrieve 16MB per call - # More info: https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_BatchGetItem.html - batch_size = 10 len_entity_ids = len(entity_ids) + # Iterate until the end_index is the value len_entity_ids iters = ( len_entity_ids // batch_size + 1 if len_entity_ids % batch_size > 0 @@ -239,8 +241,8 @@ def online_read( table_responses = response.get(table_instance.name) if table_responses: - res = {} for tbl_res in table_responses: + res = {} for feature_name, value_bin in tbl_res["values"].items(): val = ValueProto() val.ParseFromString(value_bin.value) From b5c1a3deaabb26d2cff830e7d815a5236849a068 Mon Sep 17 00:00:00 2001 From: Miguel Trejo Date: Sat, 5 Mar 2022 20:19:30 -0600 Subject: [PATCH 04/18] docs: typo in batch_size description Signed-off-by: Miguel Trejo --- sdk/python/feast/infra/online_stores/dynamodb.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/dynamodb.py b/sdk/python/feast/infra/online_stores/dynamodb.py index c36c0e387d..b3aa12a69e 100644 --- a/sdk/python/feast/infra/online_stores/dynamodb.py +++ b/sdk/python/feast/infra/online_stores/dynamodb.py @@ -200,9 +200,10 @@ def online_read( config: The RepoConfig for the current FeatureStore. table: Feast FeatureView. entity_keys: a list of entity keys that should be read from the FeatureStore. - batch_size: the number of items to send to send in a batch_get_item request to DynamoDB. + batch_size: the number of items to send in a batch_get_item request to DynamoDB. DynamoDB record size limit is 400kb and can retrieve 16MB per call, it is recommended - to set batch_size value less than 40 to UnprocessedKeys and ValidationException. + to set batch_size value less than 40 to avoid ``UnprocessedKeys``` and + ``ValidationException`` errors. """ online_config = config.online_store assert isinstance(online_config, DynamoDBOnlineStoreConfig) From 5a12856d1fadcc48ce0004f65799c0dd55a777d9 Mon Sep 17 00:00:00 2001 From: Miguel Trejo Date: Sat, 5 Mar 2022 20:30:11 -0600 Subject: [PATCH 05/18] trailing white space Signed-off-by: Miguel Trejo --- sdk/python/feast/infra/online_stores/dynamodb.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/online_stores/dynamodb.py b/sdk/python/feast/infra/online_stores/dynamodb.py index b3aa12a69e..7ee5df6b4d 100644 --- a/sdk/python/feast/infra/online_stores/dynamodb.py +++ b/sdk/python/feast/infra/online_stores/dynamodb.py @@ -202,7 +202,7 @@ def online_read( entity_keys: a list of entity keys that should be read from the FeatureStore. batch_size: the number of items to send in a batch_get_item request to DynamoDB. DynamoDB record size limit is 400kb and can retrieve 16MB per call, it is recommended - to set batch_size value less than 40 to avoid ``UnprocessedKeys``` and + to set batch_size value less than 40 to avoid ``UnprocessedKeys`` and ``ValidationException`` errors. """ online_config = config.online_store From 8bd2a84c54713a3e000314de718b210f3bfbdc4d Mon Sep 17 00:00:00 2001 From: Miguel Trejo Date: Sun, 6 Mar 2022 18:10:17 -0600 Subject: [PATCH 06/18] fix: batch_size is last argument Signed-off-by: Miguel Trejo --- sdk/python/feast/infra/online_stores/dynamodb.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/online_stores/dynamodb.py b/sdk/python/feast/infra/online_stores/dynamodb.py index 7ee5df6b4d..29298ccbd6 100644 --- a/sdk/python/feast/infra/online_stores/dynamodb.py +++ b/sdk/python/feast/infra/online_stores/dynamodb.py @@ -187,8 +187,8 @@ def online_read( config: RepoConfig, table: FeatureView, entity_keys: List[EntityKeyProto], - batch_size: int = 10, requested_features: Optional[List[str]] = None, + batch_size: int = 10 ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: """ Retrieve feature values from the online DynamoDB store. From fb6eacb3a5718f92dfd4c466ad313a6dfed812b7 Mon Sep 17 00:00:00 2001 From: Miguel Trejo Date: Sun, 6 Mar 2022 23:17:35 -0600 Subject: [PATCH 07/18] test: dynamodb online store online_read in batches Signed-off-by: Miguel Trejo --- .../test_dynamodb_online_store.py | 59 +++++++++++++++++++ sdk/python/tests/utils/online_store_utils.py | 54 +++++++++++++++++ 2 files changed, 113 insertions(+) create mode 100644 sdk/python/tests/unit/online_store/test_dynamodb_online_store.py create mode 100644 sdk/python/tests/utils/online_store_utils.py diff --git a/sdk/python/tests/unit/online_store/test_dynamodb_online_store.py b/sdk/python/tests/unit/online_store/test_dynamodb_online_store.py new file mode 100644 index 0000000000..ef6101c4c0 --- /dev/null +++ b/sdk/python/tests/unit/online_store/test_dynamodb_online_store.py @@ -0,0 +1,59 @@ +import time +from dataclasses import dataclass + +import pytest + +from feast.infra.offline_stores.file import FileOfflineStoreConfig +from feast.infra.online_stores.dynamodb import ( + DynamoDBOnlineStore, + DynamoDBOnlineStoreConfig, +) +from feast.repo_config import RepoConfig +from tests.utils.online_store_utils import ( + _create_n_customer_test_samples, + _create_test_table, + _delete_test_table, + _insert_data_test_table, +) + +REGISTRY = "s3://test_registry/registry.db" +PROJECT = "test_aws" +PROVIDER = "aws" +TABLE_NAME = "dynamodb_online_store" +REGION = "us-west-2" + + +@dataclass +class MockFeatureView: + name: str + + +@pytest.fixture +def repo_config(): + return RepoConfig( + registry=REGISTRY, + project=PROJECT, + provider=PROVIDER, + online_store=DynamoDBOnlineStoreConfig(region=REGION), + offline_store=FileOfflineStoreConfig(), + ) + + +@pytest.mark.parametrize("n_samples", [5, 50, 100]) +def test_online_read(repo_config, n_samples): + """Test DynamoDBOnlineStore online_read method.""" + _create_test_table(PROJECT, f"{TABLE_NAME}_{n_samples}", REGION) + time.sleep(10) # Wait for table to be available + data = _create_n_customer_test_samples(n=n_samples) + _insert_data_test_table(data, PROJECT, f"{TABLE_NAME}_{n_samples}", REGION) + + entity_keys, features = zip(*data) + dynamodb_store = DynamoDBOnlineStore() + returned_items = dynamodb_store.online_read( + config=repo_config, + table=MockFeatureView(name=f"{TABLE_NAME}_{n_samples}"), + entity_keys=entity_keys, + ) + _delete_test_table(PROJECT, f"{TABLE_NAME}_{n_samples}", REGION) + assert len(returned_items) == len(data) + assert [item[1] for item in returned_items] == list(features) diff --git a/sdk/python/tests/utils/online_store_utils.py b/sdk/python/tests/utils/online_store_utils.py new file mode 100644 index 0000000000..ee90c2a542 --- /dev/null +++ b/sdk/python/tests/utils/online_store_utils.py @@ -0,0 +1,54 @@ +from datetime import datetime + +import boto3 + +from feast import utils +from feast.infra.online_stores.helpers import compute_entity_id +from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import Value as ValueProto + + +def _create_n_customer_test_samples(n=10): + return [ + ( + EntityKeyProto( + join_keys=["customer"], entity_values=[ValueProto(string_val=str(i))] + ), + { + "avg_orders_day": ValueProto(float_val=1.0), + "name": ValueProto(string_val="John"), + "age": ValueProto(int64_val=3), + }, + ) + for i in range(n) + ] + + +def _create_test_table(project, tbl_name, region): + client = boto3.client("dynamodb", region_name=region) + client.create_table( + TableName=f"{project}.{tbl_name}", + KeySchema=[{"AttributeName": "entity_id", "KeyType": "HASH"}], + AttributeDefinitions=[{"AttributeName": "entity_id", "AttributeType": "S"}], + BillingMode="PAY_PER_REQUEST", + ) + + +def _delete_test_table(project, tbl_name, region): + client = boto3.client("dynamodb", region_name=region) + client.delete_table(TableName=f"{project}.{tbl_name}") + + +def _insert_data_test_table(data, project, tbl_name, region): + dynamodb_resource = boto3.resource("dynamodb", region_name=region) + table_instance = dynamodb_resource.Table(f"{project}.{tbl_name}") + for entity_key, features in data: + entity_id = compute_entity_id(entity_key) + with table_instance.batch_writer() as batch: + batch.put_item( + Item={ + "entity_id": entity_id, + "event_ts": str(utils.make_tzaware(datetime.utcnow())), + "values": {k: v.SerializeToString() for k, v in features.items()}, + } + ) From 307bab90bc16b41f731c05f630767e26f5cf1b5a Mon Sep 17 00:00:00 2001 From: Miguel Trejo Date: Mon, 7 Mar 2022 17:58:12 -0600 Subject: [PATCH 08/18] test: mock dynamodb behavior Signed-off-by: Miguel Trejo --- sdk/python/feast/infra/online_stores/dynamodb.py | 2 +- .../tests/unit/online_store/test_dynamodb_online_store.py | 6 ++---- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/dynamodb.py b/sdk/python/feast/infra/online_stores/dynamodb.py index 29298ccbd6..899110c437 100644 --- a/sdk/python/feast/infra/online_stores/dynamodb.py +++ b/sdk/python/feast/infra/online_stores/dynamodb.py @@ -188,7 +188,7 @@ def online_read( table: FeatureView, entity_keys: List[EntityKeyProto], requested_features: Optional[List[str]] = None, - batch_size: int = 10 + batch_size: int = 10, ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: """ Retrieve feature values from the online DynamoDB store. diff --git a/sdk/python/tests/unit/online_store/test_dynamodb_online_store.py b/sdk/python/tests/unit/online_store/test_dynamodb_online_store.py index ef6101c4c0..0f42230ef5 100644 --- a/sdk/python/tests/unit/online_store/test_dynamodb_online_store.py +++ b/sdk/python/tests/unit/online_store/test_dynamodb_online_store.py @@ -1,7 +1,7 @@ -import time from dataclasses import dataclass import pytest +from moto import mock_dynamodb2 from feast.infra.offline_stores.file import FileOfflineStoreConfig from feast.infra.online_stores.dynamodb import ( @@ -12,7 +12,6 @@ from tests.utils.online_store_utils import ( _create_n_customer_test_samples, _create_test_table, - _delete_test_table, _insert_data_test_table, ) @@ -39,11 +38,11 @@ def repo_config(): ) +@mock_dynamodb2 @pytest.mark.parametrize("n_samples", [5, 50, 100]) def test_online_read(repo_config, n_samples): """Test DynamoDBOnlineStore online_read method.""" _create_test_table(PROJECT, f"{TABLE_NAME}_{n_samples}", REGION) - time.sleep(10) # Wait for table to be available data = _create_n_customer_test_samples(n=n_samples) _insert_data_test_table(data, PROJECT, f"{TABLE_NAME}_{n_samples}", REGION) @@ -54,6 +53,5 @@ def test_online_read(repo_config, n_samples): table=MockFeatureView(name=f"{TABLE_NAME}_{n_samples}"), entity_keys=entity_keys, ) - _delete_test_table(PROJECT, f"{TABLE_NAME}_{n_samples}", REGION) assert len(returned_items) == len(data) assert [item[1] for item in returned_items] == list(features) From e52a895e043846b59c94093bd5b7455095cd1b64 Mon Sep 17 00:00:00 2001 From: Miguel Trejo Date: Mon, 7 Mar 2022 18:03:59 -0600 Subject: [PATCH 09/18] feat: batch_size value must be less than 40 Signed-off-by: Miguel Trejo --- sdk/python/feast/infra/online_stores/dynamodb.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/online_stores/dynamodb.py b/sdk/python/feast/infra/online_stores/dynamodb.py index 899110c437..12973a3c36 100644 --- a/sdk/python/feast/infra/online_stores/dynamodb.py +++ b/sdk/python/feast/infra/online_stores/dynamodb.py @@ -188,7 +188,7 @@ def online_read( table: FeatureView, entity_keys: List[EntityKeyProto], requested_features: Optional[List[str]] = None, - batch_size: int = 10, + batch_size: int = 20, ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: """ Retrieve feature values from the online DynamoDB store. @@ -205,6 +205,10 @@ def online_read( to set batch_size value less than 40 to avoid ``UnprocessedKeys`` and ``ValidationException`` errors. """ + if batch_size > 40: + raise ValueError( + f"batch_size value must be less than 40, input value is {batch_size}" + ) online_config = config.online_store assert isinstance(online_config, DynamoDBOnlineStoreConfig) dynamodb_resource = self._get_dynamodb_resource(online_config.region) From 97fd71f6ab2950e342c96b3552147f32d92dc8c0 Mon Sep 17 00:00:00 2001 From: Miguel Trejo Date: Wed, 9 Mar 2022 18:13:08 -0600 Subject: [PATCH 10/18] feat: batch_size defaults to 40 Signed-off-by: Miguel Trejo --- sdk/python/feast/infra/online_stores/dynamodb.py | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/dynamodb.py b/sdk/python/feast/infra/online_stores/dynamodb.py index 12973a3c36..0442227cbf 100644 --- a/sdk/python/feast/infra/online_stores/dynamodb.py +++ b/sdk/python/feast/infra/online_stores/dynamodb.py @@ -188,7 +188,6 @@ def online_read( table: FeatureView, entity_keys: List[EntityKeyProto], requested_features: Optional[List[str]] = None, - batch_size: int = 20, ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: """ Retrieve feature values from the online DynamoDB store. @@ -200,15 +199,7 @@ def online_read( config: The RepoConfig for the current FeatureStore. table: Feast FeatureView. entity_keys: a list of entity keys that should be read from the FeatureStore. - batch_size: the number of items to send in a batch_get_item request to DynamoDB. - DynamoDB record size limit is 400kb and can retrieve 16MB per call, it is recommended - to set batch_size value less than 40 to avoid ``UnprocessedKeys`` and - ``ValidationException`` errors. """ - if batch_size > 40: - raise ValueError( - f"batch_size value must be less than 40, input value is {batch_size}" - ) online_config = config.online_store assert isinstance(online_config, DynamoDBOnlineStoreConfig) dynamodb_resource = self._get_dynamodb_resource(online_config.region) @@ -218,6 +209,7 @@ def online_read( entity_ids = [compute_entity_id(entity_key) for entity_key in entity_keys] len_entity_ids = len(entity_ids) + batch_size = 40 # Iterate until the end_index is the value len_entity_ids iters = ( len_entity_ids // batch_size + 1 From 12064e4a4cb665b21dbdfbad9091d6417897fb8e Mon Sep 17 00:00:00 2001 From: Miguel Trejo Date: Fri, 11 Mar 2022 17:41:38 -0600 Subject: [PATCH 11/18] feat: sort dynamodb responses Signed-off-by: Miguel Trejo --- .../feast/infra/online_stores/dynamodb.py | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/sdk/python/feast/infra/online_stores/dynamodb.py b/sdk/python/feast/infra/online_stores/dynamodb.py index 0442227cbf..7704f2a38a 100644 --- a/sdk/python/feast/infra/online_stores/dynamodb.py +++ b/sdk/python/feast/infra/online_stores/dynamodb.py @@ -188,6 +188,7 @@ def online_read( table: FeatureView, entity_keys: List[EntityKeyProto], requested_features: Optional[List[str]] = None, + sort_response: bool = True, ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: """ Retrieve feature values from the online DynamoDB store. @@ -199,6 +200,7 @@ def online_read( config: The RepoConfig for the current FeatureStore. table: Feast FeatureView. entity_keys: a list of entity keys that should be read from the FeatureStore. + sort_response: wether or not to sort DynamoDB responses by the entity_ids order. """ online_config = config.online_store assert isinstance(online_config, DynamoDBOnlineStoreConfig) @@ -238,6 +240,10 @@ def online_read( table_responses = response.get(table_instance.name) if table_responses: + if sort_response: + table_responses = self._sort_dynamodb_response( + table_responses, entity_ids + ) for tbl_res in table_responses: res = {} for feature_name, value_bin in tbl_res["values"].items(): @@ -259,6 +265,20 @@ def _get_dynamodb_resource(self, region: str): self._dynamodb_resource = _initialize_dynamodb_resource(region) return self._dynamodb_resource + def _sort_dynamodb_response(self, responses: list, order: list): + """DynamoDB Batch Get Item doesn't return items in a particular order.""" + # Assign an index to order + order_with_index = {value: idx for idx, value in enumerate(order)} + # Sort table responses by index + table_responses_ordered = [ + (order_with_index[tbl_res["entity_id"]], tbl_res) for tbl_res in responses + ] + table_responses_ordered = sorted( + table_responses_ordered, key=lambda tup: tup[0] + ) + _, table_responses_ordered = zip(*table_responses_ordered) + return table_responses_ordered + def _initialize_dynamodb_client(region: str): return boto3.client("dynamodb", region_name=region) From 3f72228b0cd3075c586ba8fea2e492cdd9a372c8 Mon Sep 17 00:00:00 2001 From: Miguel Trejo Date: Fri, 11 Mar 2022 18:24:18 -0600 Subject: [PATCH 12/18] resolve merge conflicts Signed-off-by: Miguel Trejo --- sdk/python/feast/infra/online_stores/dynamodb.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/online_stores/dynamodb.py b/sdk/python/feast/infra/online_stores/dynamodb.py index 2aca987f35..83057fd1de 100644 --- a/sdk/python/feast/infra/online_stores/dynamodb.py +++ b/sdk/python/feast/infra/online_stores/dynamodb.py @@ -213,7 +213,9 @@ def online_read( online_config = config.online_store assert isinstance(online_config, DynamoDBOnlineStoreConfig) dynamodb_resource = self._get_dynamodb_resource(online_config.region) - table_instance = dynamodb_resource.Table(_get_table_name(config, table)) + table_instance = dynamodb_resource.Table( + _get_table_name(online_config, config, table) + ) result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] entity_ids = [compute_entity_id(entity_key) for entity_key in entity_keys] From 7a4edbd2604e893a9973e9299906d8ff2ea7796e Mon Sep 17 00:00:00 2001 From: Miguel Trejo Date: Sat, 12 Mar 2022 12:09:03 -0600 Subject: [PATCH 13/18] test online response proto with redshift:dynamodb Signed-off-by: Miguel Trejo --- sdk/python/feast/online_response.py | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/online_response.py b/sdk/python/feast/online_response.py index f01bd510be..177a3e086e 100644 --- a/sdk/python/feast/online_response.py +++ b/sdk/python/feast/online_response.py @@ -22,6 +22,18 @@ TIMESTAMP_POSTFIX: str = "__ts" +import logging +import sys + +logger = logging.getLogger() +logger.setLevel(logging.DEBUG) + +handler = logging.StreamHandler(sys.stdout) +handler.setLevel(logging.DEBUG) +formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') +handler.setFormatter(formatter) +logger.addHandler(handler) + class OnlineResponse: """ @@ -57,7 +69,12 @@ def to_dict(self, include_event_timestamps: bool = False) -> Dict[str, Any]: for result in self.proto.results: for idx, feature_ref in enumerate(self.proto.metadata.feature_names.val): - native_type_value = feast_value_type_to_python_type(result.values[idx]) + try: + native_type_value = feast_value_type_to_python_type(result.values[idx]) + except IndexError as e: + logger.info(f'ERROR -- {e}') + logger.info(f'ONLINE RESPONSE PROTO: {self.proto}') + raise IndexError(e) if feature_ref not in response: response[feature_ref] = [native_type_value] else: From 3843a02f02275a5fc9b31e15e25b25c129d3e5ce Mon Sep 17 00:00:00 2001 From: Miguel Trejo Date: Tue, 15 Mar 2022 17:17:29 -0600 Subject: [PATCH 14/18] feat: consistency in batch_size process Signed-off-by: Miguel Trejo --- .../feast/infra/online_stores/dynamodb.py | 42 ++++++++----------- 1 file changed, 18 insertions(+), 24 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/dynamodb.py b/sdk/python/feast/infra/online_stores/dynamodb.py index 83057fd1de..0f77e2d726 100644 --- a/sdk/python/feast/infra/online_stores/dynamodb.py +++ b/sdk/python/feast/infra/online_stores/dynamodb.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import itertools import logging from datetime import datetime from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple @@ -50,10 +51,13 @@ class DynamoDBOnlineStoreConfig(FeastConfigBaseModel): """Online store type selector""" region: StrictStr - """ AWS Region Name """ + """AWS Region Name""" table_name_template: StrictStr = "{project}.{table_name}" - """ DynamoDB table name template """ + """DynamoDB table name template""" + + sort_response: bool = True + """Wether or not to sort BatchGetItem response.""" class DynamoDBOnlineStore(OnlineStore): @@ -63,10 +67,12 @@ class DynamoDBOnlineStore(OnlineStore): Attributes: _dynamodb_client: Boto3 DynamoDB client. _dynamodb_resource: Boto3 DynamoDB resource. + _batch_size: Number of items to retrieve in a DynamoDB BatchGetItem call. """ _dynamodb_client = None _dynamodb_resource = None + _batch_size = 40 @log_exceptions_and_usage(online_store="dynamodb") def update( @@ -196,7 +202,6 @@ def online_read( table: FeatureView, entity_keys: List[EntityKeyProto], requested_features: Optional[List[str]] = None, - sort_response: bool = True, ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: """ Retrieve feature values from the online DynamoDB store. @@ -208,7 +213,6 @@ def online_read( config: The RepoConfig for the current FeatureStore. table: Feast FeatureView. entity_keys: a list of entity keys that should be read from the FeatureStore. - sort_response: wether or not to sort DynamoDB responses by the entity_ids order. """ online_config = config.online_store assert isinstance(online_config, DynamoDBOnlineStoreConfig) @@ -219,36 +223,26 @@ def online_read( result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] entity_ids = [compute_entity_id(entity_key) for entity_key in entity_keys] - - len_entity_ids = len(entity_ids) - batch_size = 40 - # Iterate until the end_index is the value len_entity_ids - iters = ( - len_entity_ids // batch_size + 1 - if len_entity_ids % batch_size > 0 - else len_entity_ids // batch_size - ) - for i in range(iters): - start_index = min(i * batch_size, len_entity_ids) - end_index = min(i * batch_size + batch_size, len_entity_ids) - + + batch_size = self._batch_size + sort_response = online_config.sort_response + entity_ids_iter = iter(entity_ids) + while True: + batch = list(itertools.islice(entity_ids_iter, batch_size)) + # No more items to insert + if len(batch) == 0: + break batch_entity_ids = { table_instance.name: { - "Keys": [ - {"entity_id": entity_id} - for entity_id in entity_ids[start_index:end_index] - ] + "Keys": [{"entity_id": entity_id} for entity_id in batch] } } - with tracing_span(name="remote_call"): response = dynamodb_resource.batch_get_item( RequestItems=batch_entity_ids ) - response = response.get("Responses") table_responses = response.get(table_instance.name) - if table_responses: if sort_response: table_responses = self._sort_dynamodb_response( From 88e183e6d695805ce21161de87e2b240f0d5f862 Mon Sep 17 00:00:00 2001 From: Miguel Trejo Date: Tue, 15 Mar 2022 21:19:48 -0600 Subject: [PATCH 15/18] fix: return batch_size times None Signed-off-by: Miguel Trejo --- sdk/python/feast/infra/online_stores/dynamodb.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/dynamodb.py b/sdk/python/feast/infra/online_stores/dynamodb.py index 0f77e2d726..25cc482361 100644 --- a/sdk/python/feast/infra/online_stores/dynamodb.py +++ b/sdk/python/feast/infra/online_stores/dynamodb.py @@ -223,7 +223,6 @@ def online_read( result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] entity_ids = [compute_entity_id(entity_key) for entity_key in entity_keys] - batch_size = self._batch_size sort_response = online_config.sort_response entity_ids_iter = iter(entity_ids) @@ -256,7 +255,8 @@ def online_read( res[feature_name] = val result.append((datetime.fromisoformat(tbl_res["event_ts"]), res)) else: - result.append((None, None)) + batch_size_nones = ((None, None),) * len(batch) + result.extend(batch_size_nones) return result def _get_dynamodb_client(self, region: str): From 23cb49a7bf428e018e337ac03023b18dfeece113 Mon Sep 17 00:00:00 2001 From: Miguel Trejo Date: Tue, 15 Mar 2022 21:28:30 -0600 Subject: [PATCH 16/18] remove debug code Signed-off-by: Miguel Trejo --- sdk/python/feast/online_response.py | 19 +------------------ 1 file changed, 1 insertion(+), 18 deletions(-) diff --git a/sdk/python/feast/online_response.py b/sdk/python/feast/online_response.py index 177a3e086e..f01bd510be 100644 --- a/sdk/python/feast/online_response.py +++ b/sdk/python/feast/online_response.py @@ -22,18 +22,6 @@ TIMESTAMP_POSTFIX: str = "__ts" -import logging -import sys - -logger = logging.getLogger() -logger.setLevel(logging.DEBUG) - -handler = logging.StreamHandler(sys.stdout) -handler.setLevel(logging.DEBUG) -formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') -handler.setFormatter(formatter) -logger.addHandler(handler) - class OnlineResponse: """ @@ -69,12 +57,7 @@ def to_dict(self, include_event_timestamps: bool = False) -> Dict[str, Any]: for result in self.proto.results: for idx, feature_ref in enumerate(self.proto.metadata.feature_names.val): - try: - native_type_value = feast_value_type_to_python_type(result.values[idx]) - except IndexError as e: - logger.info(f'ERROR -- {e}') - logger.info(f'ONLINE RESPONSE PROTO: {self.proto}') - raise IndexError(e) + native_type_value = feast_value_type_to_python_type(result.values[idx]) if feature_ref not in response: response[feature_ref] = [native_type_value] else: From eaf4940d0d088529c6ebf79ea6356848f1a8d187 Mon Sep 17 00:00:00 2001 From: Miguel Trejo Date: Mon, 21 Mar 2022 18:19:47 -0600 Subject: [PATCH 17/18] typo in docstring Signed-off-by: Miguel Trejo --- sdk/python/feast/infra/online_stores/dynamodb.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/online_stores/dynamodb.py b/sdk/python/feast/infra/online_stores/dynamodb.py index 25cc482361..ca44183895 100644 --- a/sdk/python/feast/infra/online_stores/dynamodb.py +++ b/sdk/python/feast/infra/online_stores/dynamodb.py @@ -57,7 +57,7 @@ class DynamoDBOnlineStoreConfig(FeastConfigBaseModel): """DynamoDB table name template""" sort_response: bool = True - """Wether or not to sort BatchGetItem response.""" + """Whether or not to sort BatchGetItem response.""" class DynamoDBOnlineStore(OnlineStore): From 5bc54d3a24f0294dd5edce7b903cde0473af21f4 Mon Sep 17 00:00:00 2001 From: Miguel Trejo Date: Tue, 22 Mar 2022 22:06:25 -0600 Subject: [PATCH 18/18] batch_size in onlineconfigstore Signed-off-by: Miguel Trejo --- sdk/python/feast/infra/online_stores/dynamodb.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/dynamodb.py b/sdk/python/feast/infra/online_stores/dynamodb.py index ca44183895..42a1d7c1fc 100644 --- a/sdk/python/feast/infra/online_stores/dynamodb.py +++ b/sdk/python/feast/infra/online_stores/dynamodb.py @@ -59,6 +59,9 @@ class DynamoDBOnlineStoreConfig(FeastConfigBaseModel): sort_response: bool = True """Whether or not to sort BatchGetItem response.""" + batch_size: int = 40 + """Number of items to retrieve in a DynamoDB BatchGetItem call.""" + class DynamoDBOnlineStore(OnlineStore): """ @@ -67,12 +70,10 @@ class DynamoDBOnlineStore(OnlineStore): Attributes: _dynamodb_client: Boto3 DynamoDB client. _dynamodb_resource: Boto3 DynamoDB resource. - _batch_size: Number of items to retrieve in a DynamoDB BatchGetItem call. """ _dynamodb_client = None _dynamodb_resource = None - _batch_size = 40 @log_exceptions_and_usage(online_store="dynamodb") def update( @@ -223,7 +224,7 @@ def online_read( result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] entity_ids = [compute_entity_id(entity_key) for entity_key in entity_keys] - batch_size = self._batch_size + batch_size = online_config.batch_size sort_response = online_config.sort_response entity_ids_iter = iter(entity_ids) while True: