Skip to content

Commit

Permalink
Delete entity key from Redis only when all attached feature views are…
Browse files Browse the repository at this point in the history
… gone (#2240)

* Delete entity from redis when the last attached feature view is deleted

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* Delete entity key from Redis only when all attached feature views are gone

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* make lint happy

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* make lint happy

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* one more try with mypy

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>
  • Loading branch information
pyalex authored Jan 28, 2022
1 parent 9f2c6d6 commit 5fc0b52
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 8 deletions.
24 changes: 16 additions & 8 deletions sdk/python/feast/infra/online_stores/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,11 @@ class RedisOnlineStoreConfig(FeastConfigBaseModel):
class RedisOnlineStore(OnlineStore):
_client: Optional[Union[Redis, RedisCluster]] = None

def delete_table_values(self, config: RepoConfig, table: FeatureView):
def delete_entity_values(self, config: RepoConfig, join_keys: List[str]):
client = self._get_client(config.online_store)
deleted_count = 0
pipeline = client.pipeline()
prefix = _redis_key_prefix(table.entities)
prefix = _redis_key_prefix(join_keys)

for _k in client.scan_iter(
b"".join([prefix, b"*", config.project.encode("utf8")])
Expand All @@ -85,7 +85,7 @@ def delete_table_values(self, config: RepoConfig, table: FeatureView):
deleted_count += 1
pipeline.execute()

logger.debug(f"Deleted {deleted_count} keys for {table.name}")
logger.debug(f"Deleted {deleted_count} rows for entity {', '.join(join_keys)}")

@log_exceptions_and_usage(online_store="redis")
def update(
Expand All @@ -98,10 +98,16 @@ def update(
partial: bool,
):
"""
We delete the keys in redis for tables/views being removed.
Look for join_keys (list of entities) that are not in use anymore
(usually this happens when the last feature view that was using specific compound key is deleted)
and remove all features attached to this "join_keys".
"""
for table in tables_to_delete:
self.delete_table_values(config, table)
join_keys_to_keep = set(tuple(table.entities) for table in tables_to_keep)

join_keys_to_delete = set(tuple(table.entities) for table in tables_to_delete)

for join_keys in join_keys_to_delete - join_keys_to_keep:
self.delete_entity_values(config, list(join_keys))

def teardown(
self,
Expand All @@ -112,8 +118,10 @@ def teardown(
"""
We delete the keys in redis for tables/views being removed.
"""
for table in tables:
self.delete_table_values(config, table)
join_keys_to_delete = set(tuple(table.entities) for table in tables)

for join_keys in join_keys_to_delete:
self.delete_entity_values(config, list(join_keys))

@staticmethod
def _parse_connection_string(connection_string: str):
Expand Down
74 changes: 74 additions & 0 deletions sdk/python/tests/integration/online_store/test_universal_online.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
)
from tests.integration.feature_repos.universal.feature_views import (
create_driver_hourly_stats_feature_view,
driver_feature_view,
)
from tests.utils.data_source_utils import prep_file_source

Expand Down Expand Up @@ -503,6 +504,79 @@ def test_online_retrieval(environment, universal_data_sources, full_feature_name
)


@pytest.mark.integration
@pytest.mark.universal
def test_online_store_cleanup(environment, universal_data_sources):
"""
Some online store implementations (like Redis) keep features from different features views
but with common entities together.
This might end up with deletion of all features attached to the entity,
when only one feature view was deletion target (see https://github.com/feast-dev/feast/issues/2150).
Plan:
1. Register two feature views with common entity "driver"
2. Materialize data
3. Check if features are available (via online retrieval)
4. Delete one feature view
5. Check that features for other are still available
6. Delete another feature view (and create again)
7. Verify that features for both feature view were deleted
"""
fs = environment.feature_store
entities, datasets, data_sources = universal_data_sources
driver_stats_fv = construct_universal_feature_views(data_sources)["driver"]

df = pd.DataFrame(
{
"ts_1": [environment.end_date] * len(entities["driver"]),
"created_ts": [environment.end_date] * len(entities["driver"]),
"driver_id": entities["driver"],
"value": np.random.random(size=len(entities["driver"])),
}
)

ds = environment.data_source_creator.create_data_source(
df, destination_name="simple_driver_dataset"
)

simple_driver_fv = driver_feature_view(
data_source=ds, name="test_universal_online_simple_driver"
)

fs.apply([driver(), simple_driver_fv, driver_stats_fv])

fs.materialize(
environment.start_date - timedelta(days=1),
environment.end_date + timedelta(days=1),
)
expected_values = df.sort_values(by="driver_id")

features = [f"{simple_driver_fv.name}:value"]
entity_rows = [{"driver": driver_id} for driver_id in sorted(entities["driver"])]

online_features = fs.get_online_features(
features=features, entity_rows=entity_rows
).to_dict()
assert np.allclose(expected_values["value"], online_features["value"])

fs.apply(
objects=[simple_driver_fv], objects_to_delete=[driver_stats_fv], partial=False
)

online_features = fs.get_online_features(
features=features, entity_rows=entity_rows
).to_dict()
assert np.allclose(expected_values["value"], online_features["value"])

fs.apply(objects=[], objects_to_delete=[simple_driver_fv], partial=False)
fs.apply([simple_driver_fv])

online_features = fs.get_online_features(
features=features, entity_rows=entity_rows
).to_dict()
assert all(v is None for v in online_features["value"])


def response_feature_name(feature: str, full_feature_names: bool) -> str:
if (
feature in {"current_balance", "avg_passenger_count", "lifetime_trip_count"}
Expand Down

0 comments on commit 5fc0b52

Please sign in to comment.