Skip to content

Commit

Permalink
feat: Added delete_table to redis online store (#3857)
Browse files Browse the repository at this point in the history
* feat: Added delete_table to redis online store

Signed-off-by: Jiwon Park <phil.park@sktelecom.com>

* fix: Fix hdel params to *args

Signed-off-by: Jiwon Park <phil.park@sktelecom.com>

---------

Signed-off-by: Jiwon Park <phil.park@sktelecom.com>
  • Loading branch information
phil-park authored Mar 4, 2024
1 parent 64459ad commit 03dae13
Showing 1 changed file with 44 additions and 8 deletions.
52 changes: 44 additions & 8 deletions sdk/python/feast/infra/online_stores/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,39 @@ def delete_entity_values(self, config: RepoConfig, join_keys: List[str]):

logger.debug(f"Deleted {deleted_count} rows for entity {', '.join(join_keys)}")

def delete_table(self, config: RepoConfig, table: FeatureView):
"""
Delete all rows in Redis for a specific feature view
Args:
config: Feast config
table: Feature view to delete
"""
client = self._get_client(config.online_store)
deleted_count = 0
prefix = _redis_key_prefix(table.join_keys)

redis_hash_keys = [_mmh3(f"{table.name}:{f.name}") for f in table.features]
redis_hash_keys.append(bytes(f"_ts:{table.name}", "utf8"))

with client.pipeline(transaction=False) as pipe:
for _k in client.scan_iter(
b"".join([prefix, b"*", config.project.encode("utf8")])
):
_tables = {
_hk[4:] for _hk in client.hgetall(_k) if _hk.startswith(b"_ts:")
}
if bytes(table.name, "utf8") not in _tables:
continue
if len(_tables) == 1:
pipe.delete(_k)
else:
pipe.hdel(_k, *redis_hash_keys)
deleted_count += 1
pipe.execute()

logger.debug(f"Deleted {deleted_count} rows for feature view {table.name}")

@log_exceptions_and_usage(online_store="redis")
def update(
self,
Expand All @@ -117,16 +150,19 @@ def update(
partial: bool,
):
"""
Look for join_keys (list of entities) that are not in use anymore
(usually this happens when the last feature view that was using specific compound key is deleted)
and remove all features attached to this "join_keys".
Delete data from feature views that are no longer in use.
Args:
config: Feast config
tables_to_delete: Feature views to delete
tables_to_keep: Feature views to keep
entities_to_delete: Entities to delete
entities_to_keep: Entities to keep
partial: Whether to do a partial update
"""
join_keys_to_keep = set(tuple(table.join_keys) for table in tables_to_keep)

join_keys_to_delete = set(tuple(table.join_keys) for table in tables_to_delete)

for join_keys in join_keys_to_delete - join_keys_to_keep:
self.delete_entity_values(config, list(join_keys))
for table in tables_to_delete:
self.delete_table(config, table)

def teardown(
self,
Expand Down

0 comments on commit 03dae13

Please sign in to comment.