Skip to content

Commit

Permalink
fixes dynamodb batch dropping missing entities
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Pope <apope@nursefly.com>
  • Loading branch information
Andrew Pope committed Jun 16, 2022
1 parent 1bd0930 commit 93ec924
Showing 1 changed file with 10 additions and 4 deletions.
14 changes: 10 additions & 4 deletions sdk/python/feast/infra/online_stores/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,9 @@ def online_read(
entity_ids_iter = iter(entity_ids)
while True:
batch = list(itertools.islice(entity_ids_iter, batch_size))
batch_result: List[
Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]
] = []
# No more items to insert
if len(batch) == 0:
break
Expand All @@ -243,20 +246,23 @@ def online_read(
for tbl_res in table_responses:
entity_id = tbl_res["entity_id"]
while entity_id != batch[entity_idx]:
result.append((None, None))
batch_result.append((None, None))
entity_idx += 1
res = {}
for feature_name, value_bin in tbl_res["values"].items():
val = ValueProto()
val.ParseFromString(value_bin.value)
res[feature_name] = val
result.append((datetime.fromisoformat(tbl_res["event_ts"]), res))
batch_result.append(
(datetime.fromisoformat(tbl_res["event_ts"]), res)
)
entity_idx += 1

# Not all entities in a batch may have responses
# Pad with remaining values in batch that were not found
batch_size_nones = ((None, None),) * (len(batch) - len(result))
result.extend(batch_size_nones)
batch_size_nones = ((None, None),) * (len(batch) - len(batch_result))
batch_result.extend(batch_size_nones)
result.extend(batch_result)
return result

def _get_dynamodb_client(self, region: str, endpoint_url: Optional[str] = None):
Expand Down

0 comments on commit 93ec924

Please sign in to comment.