From b4a011a4a8637f81a16acecd5c5bfdbfd5d9c80e Mon Sep 17 00:00:00 2001 From: Trinh Hai-Anh Date: Wed, 16 Feb 2022 12:45:34 -0800 Subject: [PATCH] implement dynamodb batch_get_item --- .../feast/infra/online_stores/dynamodb.py | 67 ++++++++++++++++--- 1 file changed, 59 insertions(+), 8 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/dynamodb.py b/sdk/python/feast/infra/online_stores/dynamodb.py index 9a3b36271b..4b42b98c48 100644 --- a/sdk/python/feast/infra/online_stores/dynamodb.py +++ b/sdk/python/feast/infra/online_stores/dynamodb.py @@ -167,21 +167,23 @@ def online_read( assert isinstance(online_config, DynamoDBOnlineStoreConfig) dynamodb_client = self._get_dynamodb_client(online_config) table_name = _get_table_name(config, table) + entity_keys = [compute_entity_id(k) for k in entity_keys] result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] - for entity_key in entity_keys: - entity_id = compute_entity_id(entity_key) - with tracing_span(name="remote_call"): - response = dynamodb_client.get_item(TableName=table_name, Key={"entity_id": {'S': entity_id}}) - value = response.get("Item") - if value is not None: + with tracing_span(name="remote_call"): + responses = _do_single_table_batch_get(dynamodb_client, table_name, entity_keys) + + for item in responses: + values = item.get("values") + + if values is not None: res = {} - for feature_name, value_bin in value["values"]['M'].items(): + for feature_name, value_bin in values['M'].items(): val = ValueProto() val.ParseFromString(value_bin['B']) res[feature_name] = val - result.append((value["event_ts"]['S'], res)) + result.append((item["event_ts"]['S'], res)) else: result.append((None, None)) return result @@ -198,6 +200,55 @@ def _get_dynamodb_resource(self, online_config: DynamoDBOnlineStoreConfig): return threadlocal.dynamodb_resource + +def _do_single_table_batch_get(client, table_name, entity_keys, max_tries=10, base_backoff_time=0.01): + """ + Gets a batch of items from Amazon DynamoDB. Batches can contain keys from + more than one table, but this function assume one table due to upstream limitation. + + When Amazon DynamoDB cannot process all items in a batch, a set of unprocessed + keys is returned. This function uses an exponential backoff algorithm to retry + getting the unprocessed keys until all are retrieved or the specified + number of tries is reached. + + :param entity_keys: The set of keys to retrieve. A batch can contain at most 100 + keys. Otherwise, Amazon DynamoDB returns an error. + :param max_tries: max number of attempts for retries when DynamoDB returns UnprocessedKeys + :param base_backoff_time: base of exponetial backoff time (in seconds), doubling every retry + :return: The dictionary of retrieved items grouped under their respective + table names. + """ + tries = 0 + retrieved = [] + while tries < max_tries: + request = { + table_name: {"Keys": [{"entity_id": {"S": k}} for k in entity_keys]} + } + response = client.batch_get_item(RequestItems=request) + # Collect any retrieved items and retry unprocessed keys. + retrieved = response['Responses'][table_name] + unprocessed = response['UnprocessedKeys'] + if len(unprocessed) > 0: + batch_keys = unprocessed + unprocessed_count = sum( + [len(batch_key['Keys']) for batch_key in batch_keys.values()]) + logger.debug("batch_get_item: %s unprocessed keys returned. Sleep, then retry.", unprocessed_count) + tries += 1 + if tries <= max_tries: + time.sleep(base_backoff_time) + base_backoff_time = min(base_backoff_time * 2, 8) + else: + # we could return to clients a incomplete results but they would expect to have all the items retrived, + # failure could happen in batch get if one table is heavy-throttled + # and failing would help them notice the problem and adjust capacity. + raise RuntimeError("batch_get_item failed to retrieve all items, got %d out of %d requested keys after %d tries. " + "Please check if one of the tables for the missing keys need more capacity." % (len(retrieved), len(batch_keys), tries)) + else: + break + + return retrieved + + def _initialize_dynamodb_client(online_config: DynamoDBOnlineStoreConfig): if online_config.iam_role is not None: sts_client = boto3.client('sts')