Skip to content

Commit

Permalink
implement dynamodb batch_get_item
Browse files Browse the repository at this point in the history
  • Loading branch information
aht committed Feb 16, 2022
1 parent beeaa16 commit b4a011a
Showing 1 changed file with 59 additions and 8 deletions.
67 changes: 59 additions & 8 deletions sdk/python/feast/infra/online_stores/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,21 +167,23 @@ def online_read(
assert isinstance(online_config, DynamoDBOnlineStoreConfig)
dynamodb_client = self._get_dynamodb_client(online_config)
table_name = _get_table_name(config, table)
entity_keys = [compute_entity_id(k) for k in entity_keys]

result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []
for entity_key in entity_keys:
entity_id = compute_entity_id(entity_key)
with tracing_span(name="remote_call"):
response = dynamodb_client.get_item(TableName=table_name, Key={"entity_id": {'S': entity_id}})
value = response.get("Item")

if value is not None:
with tracing_span(name="remote_call"):
responses = _do_single_table_batch_get(dynamodb_client, table_name, entity_keys)

for item in responses:
values = item.get("values")

if values is not None:
res = {}
for feature_name, value_bin in value["values"]['M'].items():
for feature_name, value_bin in values['M'].items():
val = ValueProto()
val.ParseFromString(value_bin['B'])
res[feature_name] = val
result.append((value["event_ts"]['S'], res))
result.append((item["event_ts"]['S'], res))
else:
result.append((None, None))
return result
Expand All @@ -198,6 +200,55 @@ def _get_dynamodb_resource(self, online_config: DynamoDBOnlineStoreConfig):
return threadlocal.dynamodb_resource



def _do_single_table_batch_get(client, table_name, entity_keys, max_tries=10, base_backoff_time=0.01):
"""
Gets a batch of items from Amazon DynamoDB. Batches can contain keys from
more than one table, but this function assume one table due to upstream limitation.
When Amazon DynamoDB cannot process all items in a batch, a set of unprocessed
keys is returned. This function uses an exponential backoff algorithm to retry
getting the unprocessed keys until all are retrieved or the specified
number of tries is reached.
:param entity_keys: The set of keys to retrieve. A batch can contain at most 100
keys. Otherwise, Amazon DynamoDB returns an error.
:param max_tries: max number of attempts for retries when DynamoDB returns UnprocessedKeys
:param base_backoff_time: base of exponetial backoff time (in seconds), doubling every retry
:return: The dictionary of retrieved items grouped under their respective
table names.
"""
tries = 0
retrieved = []
while tries < max_tries:
request = {
table_name: {"Keys": [{"entity_id": {"S": k}} for k in entity_keys]}
}
response = client.batch_get_item(RequestItems=request)
# Collect any retrieved items and retry unprocessed keys.
retrieved = response['Responses'][table_name]
unprocessed = response['UnprocessedKeys']
if len(unprocessed) > 0:
batch_keys = unprocessed
unprocessed_count = sum(
[len(batch_key['Keys']) for batch_key in batch_keys.values()])
logger.debug("batch_get_item: %s unprocessed keys returned. Sleep, then retry.", unprocessed_count)
tries += 1
if tries <= max_tries:
time.sleep(base_backoff_time)
base_backoff_time = min(base_backoff_time * 2, 8)
else:
# we could return to clients a incomplete results but they would expect to have all the items retrived,
# failure could happen in batch get if one table is heavy-throttled
# and failing would help them notice the problem and adjust capacity.
raise RuntimeError("batch_get_item failed to retrieve all items, got %d out of %d requested keys after %d tries. "
"Please check if one of the tables for the missing keys need more capacity." % (len(retrieved), len(batch_keys), tries))
else:
break

return retrieved


def _initialize_dynamodb_client(online_config: DynamoDBOnlineStoreConfig):
if online_config.iam_role is not None:
sts_client = boto3.client('sts')
Expand Down

0 comments on commit b4a011a

Please sign in to comment.