Skip to content

Commit

Permalink
Delete keys from redis when tearing down online store
Browse files Browse the repository at this point in the history
Signed-off-by: Achal Shah <achals@gmail.com>
  • Loading branch information
achals committed Oct 22, 2021
1 parent b0635c3 commit 57adc31
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 5 deletions.
2 changes: 1 addition & 1 deletion sdk/python/feast/feature_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def name(self, name: str):
self._name = name

@property
def entities(self):
def entities(self) -> List[str]:
"""
Returns the entities of this feature table
"""
Expand Down
9 changes: 9 additions & 0 deletions sdk/python/feast/infra/key_encoding_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,15 @@ def _serialize_val(value_type, v: ValueProto) -> Tuple[bytes, int]:
raise ValueError(f"Value type not supported for Firestore: {v}")


def serialize_entity_key_prefix(entity_keys: List[str]) -> bytes:
sorted_keys = sorted(entity_keys)
output: List[bytes] = []
for k in sorted_keys:
output.append(struct.pack("<I", ValueType.STRING))
output.append(k.encode("utf8"))
return b"".join(output)


def serialize_entity_key(entity_key: EntityKeyProto) -> bytes:
"""
Serialize entity key to a bytestring so it can be used as a lookup key in a hash table.
Expand Down
9 changes: 8 additions & 1 deletion sdk/python/feast/infra/online_stores/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
import mmh3

from feast import errors
from feast.infra.key_encoding_utils import serialize_entity_key
from feast.infra.key_encoding_utils import (
serialize_entity_key,
serialize_entity_key_prefix,
)
from feast.infra.online_stores.online_store import OnlineStore
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto

Expand Down Expand Up @@ -41,6 +44,10 @@ def _redis_key(project: str, entity_key: EntityKeyProto) -> bytes:
return b"".join(key)


def _redis_key_prefix(entity_keys: List[str]) -> bytes:
return serialize_entity_key_prefix(entity_keys)


def _mmh3(key: str):
"""
Calculate murmur3_32 hash which is equal to scala version which is using little endian:
Expand Down
18 changes: 15 additions & 3 deletions sdk/python/feast/infra/online_stores/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
from datetime import datetime
from enum import Enum
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union
Expand All @@ -21,7 +22,7 @@
from pydantic.typing import Literal

from feast import Entity, FeatureTable, FeatureView, RepoConfig, utils
from feast.infra.online_stores.helpers import _mmh3, _redis_key
from feast.infra.online_stores.helpers import _mmh3, _redis_key, _redis_key_prefix
from feast.infra.online_stores.online_store import OnlineStore
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
Expand All @@ -36,6 +37,7 @@
raise FeastExtrasDependencyImportError("redis", str(e))

EX_SECONDS = 253402300799
logger = logging.getLogger(__name__)


class RedisType(str, Enum):
Expand Down Expand Up @@ -83,7 +85,18 @@ def teardown(
"""
There's currently no teardown done for Redis.
"""
pass

client = self._get_client(config.online_store)
deleted_count = 0
for table in tables:
pipline = client.pipeline()
prefix = _redis_key_prefix(table.entities)

for _k in client.scan_iter(b"".join([prefix, b"*"])):
pipline.delete(_k)
deleted_count += 1
pipline.execute()
logger.debug(f"Deleted {deleted_count} keys")

@staticmethod
def _parse_connection_string(connection_string: str):
Expand Down Expand Up @@ -151,7 +164,6 @@ def online_write_batch(
ex = Timestamp()
ex.seconds = EX_SECONDS
ex_str = ex.SerializeToString()

for entity_key, values, timestamp, created_ts in data:
redis_key_bin = _redis_key(project, entity_key)
ts = Timestamp()
Expand Down

0 comments on commit 57adc31

Please sign in to comment.