From af4ccef9de9e43dc781e104f55d724f60d794928 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Wed, 29 Jun 2022 11:21:36 -0700 Subject: [PATCH 01/19] fix: version the entity serialization mechanism to fix issue with int64 vals Signed-off-by: Achal Shah --- protos/feast/core/FeatureView.proto | 3 +++ protos/feast/core/OnDemandFeatureView.proto | 3 +++ protos/feast/core/RequestFeatureView.proto | 3 +++ protos/feast/core/StreamFeatureView.proto | 3 +++ sdk/python/feast/base_feature_view.py | 13 +++++++++++ sdk/python/feast/batch_feature_view.py | 2 ++ sdk/python/feast/feature_view.py | 8 +++++++ sdk/python/feast/infra/key_encoding_utils.py | 19 ++++++++++++--- .../infra/online_stores/contrib/postgres.py | 12 ++++++++-- .../feast/infra/online_stores/helpers.py | 23 +++++++++++++++---- .../feast/infra/online_stores/sqlite.py | 10 ++++++-- sdk/python/feast/on_demand_feature_view.py | 11 +++++++++ sdk/python/feast/registry.py | 15 ++++++++++++ sdk/python/feast/request_feature_view.py | 8 +++++++ sdk/python/feast/stream_feature_view.py | 8 +++++++ .../integration/registration/test_registry.py | 4 ++++ sdk/python/tests/unit/test_feature_views.py | 8 ++++++- .../unit/test_project/feature_store.yaml | 8 +++++++ .../tests/unit/test_serialization_version.py | 17 ++++++++++++++ 19 files changed, 166 insertions(+), 12 deletions(-) create mode 100644 sdk/python/tests/unit/test_project/feature_store.yaml create mode 100644 sdk/python/tests/unit/test_serialization_version.py diff --git a/protos/feast/core/FeatureView.proto b/protos/feast/core/FeatureView.proto index c9e38bf344..522a496c35 100644 --- a/protos/feast/core/FeatureView.proto +++ b/protos/feast/core/FeatureView.proto @@ -75,6 +75,9 @@ message FeatureViewSpec { // Whether these features should be served online or not bool online = 8; + + // Needed for backwards compatible behaviour when fixing + int32 entity_key_serialization_version = 13; } message FeatureViewMeta { diff --git a/protos/feast/core/OnDemandFeatureView.proto b/protos/feast/core/OnDemandFeatureView.proto index 33c51f5c4d..b3bf431f75 100644 --- a/protos/feast/core/OnDemandFeatureView.proto +++ b/protos/feast/core/OnDemandFeatureView.proto @@ -58,6 +58,9 @@ message OnDemandFeatureViewSpec { // Owner of the on demand feature view. string owner = 8; + + // Needed for backwards compatible behaviour when fixing + int32 entity_key_serialization_version = 9; } message OnDemandFeatureViewMeta { diff --git a/protos/feast/core/RequestFeatureView.proto b/protos/feast/core/RequestFeatureView.proto index 4049053c2b..5a3a55bb88 100644 --- a/protos/feast/core/RequestFeatureView.proto +++ b/protos/feast/core/RequestFeatureView.proto @@ -48,4 +48,7 @@ message RequestFeatureViewSpec { // Owner of the request feature view. string owner = 6; + + // Needed for backwards compatible behaviour when fixing + int32 entity_key_serialization_version = 7; } diff --git a/protos/feast/core/StreamFeatureView.proto b/protos/feast/core/StreamFeatureView.proto index 06e9ee0612..963577d8aa 100644 --- a/protos/feast/core/StreamFeatureView.proto +++ b/protos/feast/core/StreamFeatureView.proto @@ -88,5 +88,8 @@ message StreamFeatureViewSpec { // Timestamp field for aggregation string timestamp_field = 16; + + // Needed for backwards compatible behaviour when fixing + int32 entity_key_serialization_version = 17; } diff --git a/sdk/python/feast/base_feature_view.py b/sdk/python/feast/base_feature_view.py index 5feb1d7d89..9e975a1610 100644 --- a/sdk/python/feast/base_feature_view.py +++ b/sdk/python/feast/base_feature_view.py @@ -49,6 +49,8 @@ class BaseFeatureView(ABC): created_timestamp: Optional[datetime] last_updated_timestamp: Optional[datetime] + _entity_key_serialization_version: Optional[int] + @abstractmethod def __init__( self, @@ -58,6 +60,7 @@ def __init__( description: str = "", tags: Optional[Dict[str, str]] = None, owner: str = "", + entity_key_serialization_version: Optional[int] = None, ): """ Creates a BaseFeatureView object. @@ -82,6 +85,16 @@ def __init__( self.projection = FeatureViewProjection.from_definition(self) self.created_timestamp = None self.last_updated_timestamp = None + self._entity_key_serialization_version = entity_key_serialization_version + + @property + def entity_key_serialization_version(self) -> int: + if self._entity_key_serialization_version: + return self._entity_key_serialization_version + return 2 # The default entity key serialization version. + + def set_entity_key_serialization_version(self, v: int): + self._entity_key_serialization_version = v @property @abstractmethod diff --git a/sdk/python/feast/batch_feature_view.py b/sdk/python/feast/batch_feature_view.py index 2f9fb080db..4b609a32e3 100644 --- a/sdk/python/feast/batch_feature_view.py +++ b/sdk/python/feast/batch_feature_view.py @@ -30,6 +30,7 @@ def __init__( owner: str = "", schema: Optional[List[Field]] = None, source: Optional[DataSource] = None, + entity_key_serialization_version: Optional[int] = None, ): if source is None: @@ -55,4 +56,5 @@ def __init__( owner=owner, schema=schema, source=source, + entity_key_serialization_version=entity_key_serialization_version, ) diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index dd8cb4f0a6..bca524b413 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -110,6 +110,7 @@ def __init__( owner: str = "", schema: Optional[List[Field]] = None, source: Optional[DataSource] = None, + entity_key_serialization_version: Optional[int] = None, ): """ Creates a FeatureView object. @@ -260,6 +261,7 @@ def __init__( description=description, tags=tags, owner=owner, + entity_key_serialization_version=entity_key_serialization_version, ) self.online = online self.materialization_intervals = [] @@ -430,6 +432,7 @@ def to_proto(self) -> FeatureViewProto: online=self.online, batch_source=batch_source_proto, stream_source=stream_source_proto, + entity_key_serialization_version=self.entity_key_serialization_version, ) return FeatureViewProto(spec=spec, meta=meta) @@ -521,6 +524,11 @@ def from_proto(cls, feature_view_proto: FeatureViewProto): ) ) + if feature_view_proto.spec.entity_key_serialization_version <= 1: + feature_view.set_entity_key_serialization_version(1) + else: + feature_view.set_entity_key_serialization_version(2) + return feature_view @property diff --git a/sdk/python/feast/infra/key_encoding_utils.py b/sdk/python/feast/infra/key_encoding_utils.py index 8333610473..8f5b288b66 100644 --- a/sdk/python/feast/infra/key_encoding_utils.py +++ b/sdk/python/feast/infra/key_encoding_utils.py @@ -6,7 +6,9 @@ from feast.protos.feast.types.Value_pb2 import ValueType -def _serialize_val(value_type, v: ValueProto) -> Tuple[bytes, int]: +def _serialize_val( + value_type, v: ValueProto, entity_key_serialization_version=1 +) -> Tuple[bytes, int]: if value_type == "string_val": return v.string_val.encode("utf8"), ValueType.STRING elif value_type == "bytes_val": @@ -14,6 +16,11 @@ def _serialize_val(value_type, v: ValueProto) -> Tuple[bytes, int]: elif value_type == "int32_val": return struct.pack("= 0 + and entity_key_serialization_version <= 1 + ): + return struct.pack(" bytes: return b"".join(output) -def serialize_entity_key(entity_key: EntityKeyProto) -> bytes: +def serialize_entity_key( + entity_key: EntityKeyProto, entity_key_serialization_version=1 +) -> bytes: """ Serialize entity key to a bytestring so it can be used as a lookup key in a hash table. @@ -54,7 +63,11 @@ def serialize_entity_key(entity_key: EntityKeyProto) -> bytes: output.append(struct.pack(" OnlineStore: return online_store_class() -def _redis_key(project: str, entity_key: EntityKeyProto) -> bytes: - key: List[bytes] = [serialize_entity_key(entity_key), project.encode("utf-8")] +def _redis_key( + project: str, entity_key: EntityKeyProto, entity_key_serialization_version=1 +) -> bytes: + key: List[bytes] = [ + serialize_entity_key( + entity_key, + entity_key_serialization_version=entity_key_serialization_version, + ), + project.encode("utf-8"), + ] return b"".join(key) @@ -40,10 +48,17 @@ def _mmh3(key: str): return bytes.fromhex(struct.pack(" str: +def compute_entity_id( + entity_key: EntityKeyProto, entity_key_serialization_version=1 +) -> str: """ Compute Entity id given Feast Entity Key for online stores. Remember that Entity here refers to `EntityKeyProto` which is used in some online stores to encode the keys. It has nothing to do with the Entity concept we have in Feast. """ - return mmh3.hash_bytes(serialize_entity_key(entity_key)).hex() + return mmh3.hash_bytes( + serialize_entity_key( + entity_key, + entity_key_serialization_version=entity_key_serialization_version, + ) + ).hex() diff --git a/sdk/python/feast/infra/online_stores/sqlite.py b/sdk/python/feast/infra/online_stores/sqlite.py index 2f0e902942..b691e43c66 100644 --- a/sdk/python/feast/infra/online_stores/sqlite.py +++ b/sdk/python/feast/infra/online_stores/sqlite.py @@ -95,7 +95,10 @@ def online_write_batch( with conn: for entity_key, values, timestamp, created_ts in data: - entity_key_bin = serialize_entity_key(entity_key) + entity_key_bin = serialize_entity_key( + entity_key, + entity_key_serialization_version=table.entity_key_serialization_version, + ) timestamp = to_naive_utc(timestamp) if created_ts is not None: created_ts = to_naive_utc(created_ts) @@ -161,7 +164,10 @@ def online_read( k: list(group) for k, group in itertools.groupby(rows, key=lambda r: r[0]) } for entity_key in entity_keys: - entity_key_bin = serialize_entity_key(entity_key) + entity_key_bin = serialize_entity_key( + entity_key, + entity_key_serialization_version=table.entity_key_serialization_version, + ) res = {} res_ts = None for _, feature_name, val_bin, ts in rows.get(entity_key_bin, []): diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index bad4edba81..0de796ea7a 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -88,6 +88,7 @@ def __init__( # noqa: C901 description: str = "", tags: Optional[Dict[str, str]] = None, owner: str = "", + entity_key_serialization_version=1, ): """ Creates an OnDemandFeatureView object. @@ -219,6 +220,7 @@ def __init__( # noqa: C901 description=description, tags=tags, owner=owner, + entity_key_serialization_version=entity_key_serialization_version, ) assert _sources is not None self.source_feature_view_projections: Dict[str, FeatureViewProjection] = {} @@ -310,6 +312,7 @@ def to_proto(self) -> OnDemandFeatureViewProto: description=self.description, tags=self.tags, owner=self.owner, + entity_key_serialization_version=self.entity_key_serialization_version, ) return OnDemandFeatureViewProto(spec=spec, meta=meta) @@ -341,6 +344,7 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto): sources.append( RequestSource.from_proto(on_demand_source.request_data_source) ) + on_demand_feature_view_obj = cls( name=on_demand_feature_view_proto.spec.name, schema=[ @@ -359,6 +363,11 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto): owner=on_demand_feature_view_proto.spec.owner, ) + if on_demand_feature_view_proto.spec.entity_key_serialization_version <= 1: + on_demand_feature_view_obj.set_entity_key_serialization_version(1) + else: + on_demand_feature_view_obj.set_entity_key_serialization_version(2) + # FeatureViewProjections are not saved in the OnDemandFeatureView proto. # Create the default projection. on_demand_feature_view_obj.projection = FeatureViewProjection.from_definition( @@ -524,6 +533,7 @@ def on_demand_feature_view( description: str = "", tags: Optional[Dict[str, str]] = None, owner: str = "", + entity_key_serialization_version=1, ): """ Creates an OnDemandFeatureView object with the given user function as udf. @@ -650,6 +660,7 @@ def decorator(user_function): description=description, tags=tags, owner=owner, + entity_key_serialization_version=entity_key_serialization_version, ) functools.update_wrapper( wrapper=on_demand_feature_view_obj, wrapped=user_function diff --git a/sdk/python/feast/registry.py b/sdk/python/feast/registry.py index f72fd717d2..9c18aa0e84 100644 --- a/sdk/python/feast/registry.py +++ b/sdk/python/feast/registry.py @@ -1162,6 +1162,7 @@ def apply_feature_view( else: raise ValueError(f"Unexpected feature view type: {type(feature_view)}") + saved_entity_key_serialization_version: Optional[int] = None for idx, existing_feature_view_proto in enumerate( existing_feature_views_of_same_type ): @@ -1175,8 +1176,22 @@ def apply_feature_view( ): return else: + saved_entity_key_serialization_version = existing_feature_views_of_same_type[ + idx + ].spec.entity_key_serialization_version del existing_feature_views_of_same_type[idx] break + + if ( + not saved_entity_key_serialization_version + or saved_entity_key_serialization_version <= 1 + ): + feature_view_proto.spec.entity_key_serialization_version = 1 + feature_view.set_entity_key_serialization_version(1) + else: + feature_view_proto.spec.entity_key_serialization_version = 2 + feature_view.set_entity_key_serialization_version(2) + existing_feature_views_of_same_type.append(feature_view_proto) if commit: self.commit() diff --git a/sdk/python/feast/request_feature_view.py b/sdk/python/feast/request_feature_view.py index 7248ffe989..6d32eed245 100644 --- a/sdk/python/feast/request_feature_view.py +++ b/sdk/python/feast/request_feature_view.py @@ -44,6 +44,7 @@ def __init__( description: str = "", tags: Optional[Dict[str, str]] = None, owner: str = "", + entity_key_serialization_version=1, ): """ Creates a RequestFeatureView object. @@ -77,6 +78,7 @@ def __init__( description=description, tags=tags, owner=owner, + entity_key_serialization_version=entity_key_serialization_version, ) self.request_source = request_data_source @@ -97,6 +99,7 @@ def to_proto(self) -> RequestFeatureViewProto: description=self.description, tags=self.tags, owner=self.owner, + entity_key_serialization_version=self.entity_key_serialization_version, ) return RequestFeatureViewProto(spec=spec) @@ -123,6 +126,11 @@ def from_proto(cls, request_feature_view_proto: RequestFeatureViewProto): owner=request_feature_view_proto.spec.owner, ) + if request_feature_view_proto.spec.entity_key_serialization_version <= 1: + request_feature_view_obj.set_entity_key_serialization_version(1) + else: + request_feature_view_obj.set_entity_key_serialization_version(2) + # FeatureViewProjections are not saved in the RequestFeatureView proto. # Create the default projection. request_feature_view_obj.projection = FeatureViewProjection.from_definition( diff --git a/sdk/python/feast/stream_feature_view.py b/sdk/python/feast/stream_feature_view.py index f19b1fcff7..814cd41193 100644 --- a/sdk/python/feast/stream_feature_view.py +++ b/sdk/python/feast/stream_feature_view.py @@ -135,6 +135,8 @@ def __init__( owner=owner, schema=schema, source=source, + # Since stream feature views are so new, lets make sure they always use the latest serialization version. + entity_key_serialization_version=2, ) def __eq__(self, other): @@ -197,6 +199,7 @@ def to_proto(self): timestamp_field=self.timestamp_field, aggregations=[agg.to_proto() for agg in self.aggregations], mode=self.mode, + entity_key_serialization_version=self.entity_key_serialization_version, ) return StreamFeatureViewProto(spec=spec, meta=meta) @@ -275,6 +278,11 @@ def from_proto(cls, sfv_proto): ) ) + if sfv_proto.spec.entity_key_serialization_version <= 1: + stream_feature_view.set_entity_key_serialization_version(1) + else: + stream_feature_view.set_entity_key_serialization_version(2) + return stream_feature_view def __copy__(self): diff --git a/sdk/python/tests/integration/registration/test_registry.py b/sdk/python/tests/integration/registration/test_registry.py index 27bbbbd2bb..54795c3982 100644 --- a/sdk/python/tests/integration/registration/test_registry.py +++ b/sdk/python/tests/integration/registration/test_registry.py @@ -17,6 +17,7 @@ import pandas as pd import pytest +from assertpy import assertpy from pytest_lazyfixture import lazy_fixture from feast import FileSource @@ -416,6 +417,9 @@ def odfv1(feature_df: pd.DataFrame) -> pd.DataFrame: test_registry.apply_feature_view(odfv1, project) test_registry.apply_feature_view(fv1, project) + assertpy.assert_that(odfv1.entity_key_serialization_version).is_equal_to(2) + assertpy.assert_that(fv1.entity_key_serialization_version).is_equal_to(2) + # Modify odfv by changing a single feature dtype @on_demand_feature_view( features=[ diff --git a/sdk/python/tests/unit/test_feature_views.py b/sdk/python/tests/unit/test_feature_views.py index d6be8e0341..e91efd01df 100644 --- a/sdk/python/tests/unit/test_feature_views.py +++ b/sdk/python/tests/unit/test_feature_views.py @@ -1,6 +1,7 @@ from datetime import timedelta import pytest +from assertpy import assertpy from feast.aggregation import Aggregation from feast.batch_feature_view import BatchFeatureView @@ -17,12 +18,13 @@ def test_create_batch_feature_view(): batch_source = FileSource(path="some path") - BatchFeatureView( + bfv = BatchFeatureView( name="test batch feature view", entities=[], ttl=timedelta(days=30), source=batch_source, ) + assertpy.assert_that(bfv.entity_key_serialization_version).is_equal_to(2) with pytest.raises(ValueError): BatchFeatureView( @@ -127,11 +129,15 @@ def test_stream_feature_view_serialization(): tags={}, ) + assertpy.assert_that(sfv.entity_key_serialization_version).is_equal_to(2) + sfv_proto = sfv.to_proto() new_sfv = StreamFeatureView.from_proto(sfv_proto=sfv_proto) assert new_sfv == sfv + assertpy.assert_that(new_sfv.entity_key_serialization_version).is_equal_to(2) + def test_stream_feature_view_udfs(): entity = Entity(name="driver_entity", join_keys=["test_key"]) diff --git a/sdk/python/tests/unit/test_project/feature_store.yaml b/sdk/python/tests/unit/test_project/feature_store.yaml new file mode 100644 index 0000000000..eeec58e2ff --- /dev/null +++ b/sdk/python/tests/unit/test_project/feature_store.yaml @@ -0,0 +1,8 @@ +project: prompt_dory +provider: local +online_store: redis +offline_store: {} +registry: registry.db +flags: + alpha_features: true + on_demand_transforms: true diff --git a/sdk/python/tests/unit/test_serialization_version.py b/sdk/python/tests/unit/test_serialization_version.py new file mode 100644 index 0000000000..085983ff8c --- /dev/null +++ b/sdk/python/tests/unit/test_serialization_version.py @@ -0,0 +1,17 @@ +import os + +from assertpy import assertpy + +from feast import FeatureStore, RepoConfig + + +def test_registry_entity_serialization_version(): + r = RepoConfig( + project="prompt_dory", + provider="local", + online_store="redis", + registry=f"{os.path.dirname(__file__)}/test_project/registry.db", + ) + fs: FeatureStore = FeatureStore(config=r) + fvs = fs.list_feature_views() + assertpy.assert_that(fvs[0].entity_key_serialization_version).is_equal_to(1) From c6dabd163a00ae0693c3b5c6e6bedc1550edbff7 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Thu, 14 Jul 2022 23:09:07 -0700 Subject: [PATCH 02/19] fix tests Signed-off-by: Achal Shah --- sdk/python/feast/infra/key_encoding_utils.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sdk/python/feast/infra/key_encoding_utils.py b/sdk/python/feast/infra/key_encoding_utils.py index 8f5b288b66..ea0df02a69 100644 --- a/sdk/python/feast/infra/key_encoding_utils.py +++ b/sdk/python/feast/infra/key_encoding_utils.py @@ -17,11 +17,11 @@ def _serialize_val( return struct.pack("= 0 - and entity_key_serialization_version <= 1 + entity_key_serialization_version >= 0 + and entity_key_serialization_version <= 1 ): - return struct.pack(" Date: Thu, 14 Jul 2022 23:10:18 -0700 Subject: [PATCH 03/19] Add a test Signed-off-by: Achal Shah --- .../unit/infra/test_key_encoding_utils.py | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) create mode 100644 sdk/python/tests/unit/infra/test_key_encoding_utils.py diff --git a/sdk/python/tests/unit/infra/test_key_encoding_utils.py b/sdk/python/tests/unit/infra/test_key_encoding_utils.py new file mode 100644 index 0000000000..b82b3e4eb4 --- /dev/null +++ b/sdk/python/tests/unit/infra/test_key_encoding_utils.py @@ -0,0 +1,28 @@ +import pytest + +from feast.infra.key_encoding_utils import serialize_entity_key +from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import Value as ValueProto + + +@pytest.mark.parametrize( + "entity_key,expected_contains", + [ + ( + EntityKeyProto( + join_keys=["customer"], + entity_values=[ValueProto(int64_val=int(2 ** 31))], + ), + b"customer", + ), + ( + EntityKeyProto( + join_keys=["user"], entity_values=[ValueProto(int32_val=int(2 ** 15))] + ), + b"user", + ), + ], +) +def test_serialize_entity_key(entity_key, expected_contains): + output = serialize_entity_key(entity_key) + assert output.find(expected_contains) >= 0 From 0b819752fc8024b0bb501c07498f8d86f900d0ed Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Thu, 14 Jul 2022 23:14:24 -0700 Subject: [PATCH 04/19] Add a test Signed-off-by: Achal Shah --- sdk/python/feast/infra/key_encoding_utils.py | 4 +- .../unit/infra/test_key_encoding_utils.py | 40 ++++++++++--------- 2 files changed, 23 insertions(+), 21 deletions(-) diff --git a/sdk/python/feast/infra/key_encoding_utils.py b/sdk/python/feast/infra/key_encoding_utils.py index ea0df02a69..e52adc4951 100644 --- a/sdk/python/feast/infra/key_encoding_utils.py +++ b/sdk/python/feast/infra/key_encoding_utils.py @@ -17,8 +17,8 @@ def _serialize_val( return struct.pack("= 0 - and entity_key_serialization_version <= 1 + entity_key_serialization_version >= 0 + and entity_key_serialization_version <= 1 ): return struct.pack("= 0 + entity_key_serialization_version=2, + ) + + # Old serilization scheme, should fail. + with pytest.raises(BaseException): + serialize_entity_key( + EntityKeyProto( + join_keys=["user"], entity_values=[ValueProto(int64_val=int(2 ** 31))] + ), + ) From b91bf17bdd298377112bb0ada41abb7c0b09f190 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Thu, 14 Jul 2022 23:28:33 -0700 Subject: [PATCH 05/19] fix test Signed-off-by: Achal Shah --- sdk/python/.gitignore | 2 ++ sdk/python/tests/unit/test_project/registry.db | Bin 0 -> 1178 bytes 2 files changed, 2 insertions(+) create mode 100644 sdk/python/tests/unit/test_project/registry.db diff --git a/sdk/python/.gitignore b/sdk/python/.gitignore index 9cab5dff20..696eabedbf 100644 --- a/sdk/python/.gitignore +++ b/sdk/python/.gitignore @@ -116,3 +116,5 @@ dmypy.json .vscode/* playground + +!tests/unit/test_project/*.db \ No newline at end of file diff --git a/sdk/python/tests/unit/test_project/registry.db b/sdk/python/tests/unit/test_project/registry.db new file mode 100644 index 0000000000000000000000000000000000000000..f9ab03b57bc7415023d88f22432aae7911171659 GIT binary patch literal 1178 zcmcIj&x_MQ7&UD-&5o6ZS@2LiNZeDQNgG?Wd+^69ie48l9vr5VFK!@h;$&JZUW&LN zvIiF{h$4tAdr(j>9uyJr;=zj`!u|!yo>kb3=#X~T6yj-hF8N*u+5?ZoEi#dGLBETT*JqY?B>1SZldZv03#{rCx8u z*Y&*)NI3zA$aFnl)>0t@^R%oN;_{KQ)2YW5Y;GGoczQ3kX0+qYwO8xel8#huaQERg z70d6h)fIK_zJjiz11U=+bMx<7)PO{mXc152u7LYnphR>G%731>Bv|+e1$lWn^jm~l&uE;$3FkDWd dKD<3N6SANGTusQ1QR~_Nj_1y!_E Date: Fri, 15 Jul 2022 09:59:35 -0700 Subject: [PATCH 06/19] fix test Signed-off-by: Achal Shah --- sdk/python/feast/on_demand_feature_view.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index 0de796ea7a..4629d6719b 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -533,7 +533,7 @@ def on_demand_feature_view( description: str = "", tags: Optional[Dict[str, str]] = None, owner: str = "", - entity_key_serialization_version=1, + entity_key_serialization_version: Optional[int] = None, ): """ Creates an OnDemandFeatureView object with the given user function as udf. @@ -553,6 +553,7 @@ def on_demand_feature_view( tags (optional): A dictionary of key-value pairs to store arbitrary metadata. owner (optional): The owner of the on demand feature view, typically the email of the primary maintainer. + entity_key_serialization_version: The serialization version for entities. """ positional_attributes = ["features", "inputs"] From 13712bb0ff243fe81e99c115c7e5d457e70fcbe2 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Fri, 15 Jul 2022 10:39:13 -0700 Subject: [PATCH 07/19] fix test Signed-off-by: Achal Shah --- sdk/python/feast/on_demand_feature_view.py | 2 +- sdk/python/feast/registry.py | 35 +++++++++++++++++----- 2 files changed, 28 insertions(+), 9 deletions(-) diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index 4629d6719b..d16dced43f 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -88,7 +88,7 @@ def __init__( # noqa: C901 description: str = "", tags: Optional[Dict[str, str]] = None, owner: str = "", - entity_key_serialization_version=1, + entity_key_serialization_version: Optional[int] = None, ): """ Creates an OnDemandFeatureView object. diff --git a/sdk/python/feast/registry.py b/sdk/python/feast/registry.py index 9c18aa0e84..bb637921f4 100644 --- a/sdk/python/feast/registry.py +++ b/sdk/python/feast/registry.py @@ -1162,6 +1162,7 @@ def apply_feature_view( else: raise ValueError(f"Unexpected feature view type: {type(feature_view)}") + does_fv_exist_in_registry: bool = False saved_entity_key_serialization_version: Optional[int] = None for idx, existing_feature_view_proto in enumerate( existing_feature_views_of_same_type @@ -1170,27 +1171,45 @@ def apply_feature_view( existing_feature_view_proto.spec.name == feature_view_proto.spec.name and existing_feature_view_proto.spec.project == project ): + saved_entity_key_serialization_version = ( + existing_feature_view_proto.spec.entity_key_serialization_version + ) + does_fv_exist_in_registry = True if ( feature_view.__class__.from_proto(existing_feature_view_proto) == feature_view ): return else: - saved_entity_key_serialization_version = existing_feature_views_of_same_type[ - idx - ].spec.entity_key_serialization_version del existing_feature_views_of_same_type[idx] break - if ( - not saved_entity_key_serialization_version - or saved_entity_key_serialization_version <= 1 + if not does_fv_exist_in_registry: + # Brand new FV! Lets set the serialization version to 2, AKA the correct version with the + # int64 fix, if the serialization version is "unset" in the FV. + serialization_version = ( + feature_view.entity_key_serialization_version + if feature_view.entity_key_serialization_version + else 2 + ) + feature_view_proto.spec.entity_key_serialization_version = ( + serialization_version + ) + feature_view.set_entity_key_serialization_version(serialization_version) + elif ( + does_fv_exist_in_registry + and saved_entity_key_serialization_version + and saved_entity_key_serialization_version <= 1 ): + # Here, the feature view already exists in the registry, but doesn't have a serialization version set + # (or it has it set to 0 AKA "unset" in proto) + # in this case, we set the value to 1 (which is the current, broken serialization version) feature_view_proto.spec.entity_key_serialization_version = 1 feature_view.set_entity_key_serialization_version(1) else: - feature_view_proto.spec.entity_key_serialization_version = 2 - feature_view.set_entity_key_serialization_version(2) + # The FV exists in the registry, and the serialization version is set explicitly. + # In which case, we probably don't need to do anything? + pass existing_feature_views_of_same_type.append(feature_view_proto) if commit: From de4728667c8fcf3b8805043003a15b1b98a773ea Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Fri, 15 Jul 2022 13:40:30 -0700 Subject: [PATCH 08/19] simplify Signed-off-by: Achal Shah --- sdk/python/feast/infra/key_encoding_utils.py | 5 +---- .../feast/infra/materialization/lambda/lambda_engine.py | 6 +++++- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/sdk/python/feast/infra/key_encoding_utils.py b/sdk/python/feast/infra/key_encoding_utils.py index e52adc4951..fcccd5e806 100644 --- a/sdk/python/feast/infra/key_encoding_utils.py +++ b/sdk/python/feast/infra/key_encoding_utils.py @@ -16,10 +16,7 @@ def _serialize_val( elif value_type == "int32_val": return struct.pack("= 0 - and entity_key_serialization_version <= 1 - ): + if 0 <= entity_key_serialization_version <= 1: return struct.pack(" Date: Fri, 15 Jul 2022 21:08:27 -0700 Subject: [PATCH 09/19] simplify Signed-off-by: Achal Shah --- sdk/python/feast/infra/online_stores/sqlite.py | 8 +++++++- .../tests/integration/online_store/test_e2e_local.py | 2 +- sdk/python/tests/utils/cli_utils.py | 10 ++++------ 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/sqlite.py b/sdk/python/feast/infra/online_stores/sqlite.py index b691e43c66..8443783c88 100644 --- a/sdk/python/feast/infra/online_stores/sqlite.py +++ b/sdk/python/feast/infra/online_stores/sqlite.py @@ -156,7 +156,13 @@ def online_read( f"FROM {_table_id(config.project, table)} " f"WHERE entity_key IN ({','.join('?' * len(entity_keys))}) " f"ORDER BY entity_key", - [serialize_entity_key(entity_key) for entity_key in entity_keys], + [ + serialize_entity_key( + entity_key, + entity_key_serialization_version=table.entity_key_serialization_version, + ) + for entity_key in entity_keys + ], ) rows = cur.fetchall() diff --git a/sdk/python/tests/integration/online_store/test_e2e_local.py b/sdk/python/tests/integration/online_store/test_e2e_local.py index c5b66e7ddc..73c566847d 100644 --- a/sdk/python/tests/integration/online_store/test_e2e_local.py +++ b/sdk/python/tests/integration/online_store/test_e2e_local.py @@ -51,7 +51,7 @@ def _assert_online_features( .values[0] .float_val > 0 - ) + ), response.to_dict() result = response.to_dict() assert len(result) == 5 diff --git a/sdk/python/tests/utils/cli_utils.py b/sdk/python/tests/utils/cli_utils.py index ee6ea138fb..a038b85840 100644 --- a/sdk/python/tests/utils/cli_utils.py +++ b/sdk/python/tests/utils/cli_utils.py @@ -84,13 +84,11 @@ def local_repo(self, example_repo_py: str, offline_store: str): repo_example.write_text(example_repo_py) result = self.run(["apply"], cwd=repo_path) - assert ( - result.returncode == 0 - ), f"stdout: {result.stdout}\n stderr: {result.stderr}" + print(f"Apply: stdout: {str(result.stdout)}\n stderr: {str(result.stderr)}") + assert result.returncode == 0 yield FeatureStore(repo_path=str(repo_path), config=None) result = self.run(["teardown"], cwd=repo_path) - assert ( - result.returncode == 0 - ), f"stdout: {result.stdout}\n stderr: {result.stderr}" + print(f"Apply: stdout: {str(result.stdout)}\n stderr: {str(result.stderr)}") + assert result.returncode == 0 From da9b407424f959f1370418853b99acbce5508420 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Mon, 18 Jul 2022 12:42:37 -0700 Subject: [PATCH 10/19] feature_store.yaml Signed-off-by: Achal Shah --- go/internal/feast/onlinestore/onlinestore.go | 2 +- .../feast/onlinestore/redisonlinestore.go | 36 ++++++++++++------- .../feast/onlinestore/sqliteonlinestore.go | 15 ++++---- go/internal/feast/registry/repoconfig.go | 2 ++ sdk/python/feast/infra/key_encoding_utils.py | 2 +- .../contrib/hbase_online_store/hbase.py | 7 ++-- .../infra/online_stores/contrib/postgres.py | 4 +-- .../feast/infra/online_stores/datastore.py | 9 +++-- .../feast/infra/online_stores/dynamodb.py | 10 ++++-- sdk/python/feast/infra/online_stores/redis.py | 6 ++-- .../feast/infra/online_stores/sqlite.py | 6 ++-- sdk/python/feast/infra/utils/hbase_utils.py | 9 +++-- sdk/python/feast/repo_config.py | 2 ++ .../test_dynamodb_online_store.py | 2 +- sdk/python/tests/utils/online_store_utils.py | 2 +- 15 files changed, 73 insertions(+), 41 deletions(-) diff --git a/go/internal/feast/onlinestore/onlinestore.go b/go/internal/feast/onlinestore/onlinestore.go index 64a05f144c..88cd3dbd9b 100644 --- a/go/internal/feast/onlinestore/onlinestore.go +++ b/go/internal/feast/onlinestore/onlinestore.go @@ -61,7 +61,7 @@ func NewOnlineStore(config *registry.RepoConfig) (OnlineStore, error) { onlineStore, err := NewSqliteOnlineStore(config.Project, config, config.OnlineStore) return onlineStore, err } else if onlineStoreType == "redis" { - onlineStore, err := NewRedisOnlineStore(config.Project, config.OnlineStore) + onlineStore, err := NewRedisOnlineStore(config.Project, config, config.OnlineStore) return onlineStore, err } else { return nil, fmt.Errorf("%s online store type is currently not supported; only redis and sqlite are supported", onlineStoreType) diff --git a/go/internal/feast/onlinestore/redisonlinestore.go b/go/internal/feast/onlinestore/redisonlinestore.go index 26f34cf896..5d40834b4f 100644 --- a/go/internal/feast/onlinestore/redisonlinestore.go +++ b/go/internal/feast/onlinestore/redisonlinestore.go @@ -6,6 +6,7 @@ import ( "encoding/binary" "errors" "fmt" + "github.com/feast-dev/feast/go/internal/feast/registry" "sort" "strconv" "strings" @@ -13,7 +14,7 @@ import ( "github.com/go-redis/redis/v8" "github.com/golang/protobuf/proto" "github.com/spaolacci/murmur3" - timestamppb "google.golang.org/protobuf/types/known/timestamppb" + "google.golang.org/protobuf/types/known/timestamppb" "github.com/feast-dev/feast/go/protos/feast/serving" "github.com/feast-dev/feast/go/protos/feast/types" @@ -37,10 +38,15 @@ type RedisOnlineStore struct { // Redis client connector client *redis.Client + + config *registry.RepoConfig } -func NewRedisOnlineStore(project string, onlineStoreConfig map[string]interface{}) (*RedisOnlineStore, error) { - store := RedisOnlineStore{project: project} +func NewRedisOnlineStore(project string, config *registry.RepoConfig, onlineStoreConfig map[string]interface{}) (*RedisOnlineStore, error) { + store := RedisOnlineStore{ + project: project, + config: config, + } var address []string var password string @@ -161,7 +167,7 @@ func (r *RedisOnlineStore) OnlineRead(ctx context.Context, entityKeys []*types.E redisKeyToEntityIndex := make(map[string]int) for i := 0; i < len(entityKeys); i++ { - var key, err = buildRedisKey(r.project, entityKeys[i]) + var key, err = buildRedisKey(r.project, entityKeys[i], r.config.EntityKeySerializationVersion) if err != nil { return nil, err } @@ -270,8 +276,8 @@ func (r *RedisOnlineStore) Destruct() { } -func buildRedisKey(project string, entityKey *types.EntityKey) (*[]byte, error) { - serKey, err := serializeEntityKey(entityKey) +func buildRedisKey(project string, entityKey *types.EntityKey, entityKeySerializationVersion int64) (*[]byte, error) { + serKey, err := serializeEntityKey(entityKey, entityKeySerializationVersion) if err != nil { return nil, err } @@ -279,7 +285,7 @@ func buildRedisKey(project string, entityKey *types.EntityKey) (*[]byte, error) return &fullKey, nil } -func serializeEntityKey(entityKey *types.EntityKey) (*[]byte, error) { +func serializeEntityKey(entityKey *types.EntityKey, entityKeySerializationVersion int64) (*[]byte, error) { // Serialize entity key to a bytestring so that it can be used as a lookup key in a hash table. // Ensure that we have the right amount of join keys and entity values @@ -316,7 +322,7 @@ func serializeEntityKey(entityKey *types.EntityKey) (*[]byte, error) { offset := (2 * len(keys)) + (i * 3) value := m[keys[i]].GetVal() - valueBytes, valueTypeBytes, err := serializeValue(value) + valueBytes, valueTypeBytes, err := serializeValue(value, entityKeySerializationVersion) if err != nil { return valueBytes, err } @@ -341,7 +347,7 @@ func serializeEntityKey(entityKey *types.EntityKey) (*[]byte, error) { return &entityKeyBuffer, nil } -func serializeValue(value interface{}) (*[]byte, types.ValueType_Enum, error) { +func serializeValue(value interface{}, entityKeySerializationVersion int64) (*[]byte, types.ValueType_Enum, error) { // TODO: Implement support for other types (at least the major types like ints, strings, bytes) switch x := (value).(type) { case *types.Value_StringVal: @@ -355,9 +361,15 @@ func serializeValue(value interface{}) (*[]byte, types.ValueType_Enum, error) { return &valueBuffer, types.ValueType_INT32, nil case *types.Value_Int64Val: // TODO (woop): We unfortunately have to use 32 bit here for backward compatibility :( - valueBuffer := make([]byte, 4) - binary.LittleEndian.PutUint32(valueBuffer, uint32(x.Int64Val)) - return &valueBuffer, types.ValueType_INT64, nil + if entityKeySerializationVersion <= 1 { + valueBuffer := make([]byte, 4) + binary.LittleEndian.PutUint32(valueBuffer, uint32(x.Int64Val)) + return &valueBuffer, types.ValueType_INT64, nil + } else { + valueBuffer := make([]byte, 8) + binary.LittleEndian.PutUint64(valueBuffer, uint64(x.Int64Val)) + return &valueBuffer, types.ValueType_INT64, nil + } case nil: return nil, types.ValueType_INVALID, fmt.Errorf("could not detect type for %v", x) default: diff --git a/go/internal/feast/onlinestore/sqliteonlinestore.go b/go/internal/feast/onlinestore/sqliteonlinestore.go index 94ba0c0d56..1f407ad39c 100644 --- a/go/internal/feast/onlinestore/sqliteonlinestore.go +++ b/go/internal/feast/onlinestore/sqliteonlinestore.go @@ -16,7 +16,7 @@ import ( _ "github.com/mattn/go-sqlite3" "google.golang.org/protobuf/proto" - timestamppb "google.golang.org/protobuf/types/known/timestamppb" + "google.golang.org/protobuf/types/known/timestamppb" "github.com/feast-dev/feast/go/protos/feast/serving" "github.com/feast-dev/feast/go/protos/feast/types" @@ -24,15 +24,16 @@ import ( type SqliteOnlineStore struct { // Feast project name - project string - path string - db *sql.DB - db_mu sync.Mutex + project string + path string + db *sql.DB + db_mu sync.Mutex + repoConfig *registry.RepoConfig } // Creates a new sqlite online store object. onlineStoreConfig should have relative path of database file with respect to repoConfig.repoPath. func NewSqliteOnlineStore(project string, repoConfig *registry.RepoConfig, onlineStoreConfig map[string]interface{}) (*SqliteOnlineStore, error) { - store := SqliteOnlineStore{project: project} + store := SqliteOnlineStore{project: project, repoConfig: repoConfig} if db_path, ok := onlineStoreConfig["path"]; !ok { return nil, fmt.Errorf("cannot find sqlite path %s", db_path) } else { @@ -69,7 +70,7 @@ func (s *SqliteOnlineStore) OnlineRead(ctx context.Context, entityKeys []*types. in_query := make([]string, len(entityKeys)) serialized_entities := make([]interface{}, len(entityKeys)) for i := 0; i < len(entityKeys); i++ { - serKey, err := serializeEntityKey(entityKeys[i]) + serKey, err := serializeEntityKey(entityKeys[i], s.repoConfig.EntityKeySerializationVersion) if err != nil { return nil, err } diff --git a/go/internal/feast/registry/repoconfig.go b/go/internal/feast/registry/repoconfig.go index 59d125b1bf..b034b632dc 100644 --- a/go/internal/feast/registry/repoconfig.go +++ b/go/internal/feast/registry/repoconfig.go @@ -30,6 +30,8 @@ type RepoConfig struct { Flags map[string]interface{} `json:"flags"` // RepoPath RepoPath string `json:"repo_path"` + // EntityKeySerializationVersion + EntityKeySerializationVersion int64 `json:"entity_key_serialization_version"` } type RegistryConfig struct { diff --git a/sdk/python/feast/infra/key_encoding_utils.py b/sdk/python/feast/infra/key_encoding_utils.py index fcccd5e806..62b6b72724 100644 --- a/sdk/python/feast/infra/key_encoding_utils.py +++ b/sdk/python/feast/infra/key_encoding_utils.py @@ -25,7 +25,7 @@ def _serialize_val( def serialize_entity_key_prefix(entity_keys: List[str]) -> bytes: """ - Serialize keys to a bytestring so it can be used to prefix-scan through items stored in the online store + Serialize keys to a bytestring, so it can be used to prefix-scan through items stored in the online store using serialize_entity_key. This encoding is a partial implementation of serialize_entity_key, only operating on the keys of entities, diff --git a/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase.py b/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase.py index d95e83f429..473fb50d9b 100644 --- a/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase.py +++ b/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase.py @@ -108,7 +108,8 @@ def online_write_batch( b = hbase.batch(table_name) for entity_key, values, timestamp, created_ts in data: - row_key = serialize_entity_key(entity_key).hex() + row_key = serialize_entity_key(entity_key, + entity_key_serialization_version=config.entity_key_serialization_version).hex() values_dict = {} for feature_name, val in values.items(): values_dict[ @@ -154,7 +155,9 @@ def online_read( result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] row_keys = [ - serialize_entity_key(entity_key).hex() for entity_key in entity_keys + serialize_entity_key(entity_key, + entity_key_serialization_version=config.entity_key_serialization_version).hex() + for entity_key in entity_keys ] rows = hbase.rows(table_name, row_keys=row_keys) diff --git a/sdk/python/feast/infra/online_stores/contrib/postgres.py b/sdk/python/feast/infra/online_stores/contrib/postgres.py index 456e530b62..fb99385ea3 100644 --- a/sdk/python/feast/infra/online_stores/contrib/postgres.py +++ b/sdk/python/feast/infra/online_stores/contrib/postgres.py @@ -51,7 +51,7 @@ def online_write_batch( for entity_key, values, timestamp, created_ts in data: entity_key_bin = serialize_entity_key( entity_key, - entity_key_serialization_version=table.entity_key_serialization_version, + entity_key_serialization_version=config.entity_key_serialization_version, ) timestamp = _to_naive_utc(timestamp) if created_ts is not None: @@ -110,7 +110,7 @@ def online_read( keys.append( serialize_entity_key( entity_key, - entity_key_serialization_version=table.entity_key_serialization_version, + entity_key_serialization_version=config.entity_key_serialization_version, ) ) diff --git a/sdk/python/feast/infra/online_stores/datastore.py b/sdk/python/feast/infra/online_stores/datastore.py index fc3659ea1a..2da2ba186f 100644 --- a/sdk/python/feast/infra/online_stores/datastore.py +++ b/sdk/python/feast/infra/online_stores/datastore.py @@ -162,7 +162,7 @@ def online_write_batch( with ThreadPool(processes=write_concurrency) as pool: pool.map( lambda b: self._write_minibatch( - client, feast_project, table, b, progress + client, feast_project, table, b, progress, config ), self._to_minibatches(data, batch_size=write_batch_size), ) @@ -191,10 +191,12 @@ def _write_minibatch( Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] ], progress: Optional[Callable[[int], Any]], + config: RepoConfig ): entities = [] for entity_key, features, timestamp, created_ts in data: - document_id = compute_entity_id(entity_key) + document_id = compute_entity_id(entity_key, + entity_key_serialization_version=config.entity_key_serialization_version) key = client.key( "Project", project, "Table", table.name, "Row", document_id, @@ -241,7 +243,8 @@ def online_read( keys: List[Key] = [] result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] for entity_key in entity_keys: - document_id = compute_entity_id(entity_key) + document_id = compute_entity_id(entity_key, + entity_key_serialization_version=config.entity_key_serialization_version) key = client.key( "Project", feast_project, "Table", table.name, "Row", document_id ) diff --git a/sdk/python/feast/infra/online_stores/dynamodb.py b/sdk/python/feast/infra/online_stores/dynamodb.py index 6919f2cc29..8fa86409d9 100644 --- a/sdk/python/feast/infra/online_stores/dynamodb.py +++ b/sdk/python/feast/infra/online_stores/dynamodb.py @@ -188,7 +188,7 @@ def online_write_batch( table_instance = dynamodb_resource.Table( _get_table_name(online_config, config, table) ) - self._write_batch_non_duplicates(table_instance, data, progress) + self._write_batch_non_duplicates(table_instance, data, progress, config) @log_exceptions_and_usage(online_store="dynamodb") def online_read( @@ -216,7 +216,9 @@ def online_read( ) result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] - entity_ids = [compute_entity_id(entity_key) for entity_key in entity_keys] + entity_ids = [compute_entity_id(entity_key, + entity_key_serialization_version=config.entity_key_serialization_version) + for entity_key in entity_keys] batch_size = online_config.batch_size entity_ids_iter = iter(entity_ids) while True: @@ -300,11 +302,13 @@ def _write_batch_non_duplicates( Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] ], progress: Optional[Callable[[int], Any]], + config: RepoConfig, ): """Deduplicate write batch request items on ``entity_id`` primary key.""" with table_instance.batch_writer(overwrite_by_pkeys=["entity_id"]) as batch: for entity_key, features, timestamp, created_ts in data: - entity_id = compute_entity_id(entity_key) + entity_id = compute_entity_id(entity_key, + entity_key_serialization_version=config.entity_key_serialization_version) batch.put_item( Item={ "entity_id": entity_id, # PartitionKey diff --git a/sdk/python/feast/infra/online_stores/redis.py b/sdk/python/feast/infra/online_stores/redis.py index 60fa9265ca..7ee91df44d 100644 --- a/sdk/python/feast/infra/online_stores/redis.py +++ b/sdk/python/feast/infra/online_stores/redis.py @@ -199,7 +199,8 @@ def online_write_batch( # TODO: investigate if check and set is a better approach rather than pulling all entity ts and then setting # it may be significantly slower but avoids potential (rare) race conditions for entity_key, _, _, _ in data: - redis_key_bin = _redis_key(project, entity_key) + redis_key_bin = _redis_key(project, entity_key, + entity_key_serialization_version=config.entity_key_serialization_version) keys.append(redis_key_bin) pipe.hmget(redis_key_bin, ts_key) prev_event_timestamps = pipe.execute() @@ -268,7 +269,8 @@ def online_read( keys = [] for entity_key in entity_keys: - redis_key_bin = _redis_key(project, entity_key) + redis_key_bin = _redis_key(project, entity_key, + entity_key_serialization_version=config.entity_key_serialization_version) keys.append(redis_key_bin) with client.pipeline(transaction=False) as pipe: for redis_key_bin in keys: diff --git a/sdk/python/feast/infra/online_stores/sqlite.py b/sdk/python/feast/infra/online_stores/sqlite.py index 8443783c88..d44f8a742e 100644 --- a/sdk/python/feast/infra/online_stores/sqlite.py +++ b/sdk/python/feast/infra/online_stores/sqlite.py @@ -97,7 +97,7 @@ def online_write_batch( for entity_key, values, timestamp, created_ts in data: entity_key_bin = serialize_entity_key( entity_key, - entity_key_serialization_version=table.entity_key_serialization_version, + entity_key_serialization_version=config.entity_key_serialization_version, ) timestamp = to_naive_utc(timestamp) if created_ts is not None: @@ -159,7 +159,7 @@ def online_read( [ serialize_entity_key( entity_key, - entity_key_serialization_version=table.entity_key_serialization_version, + entity_key_serialization_version=config.entity_key_serialization_version, ) for entity_key in entity_keys ], @@ -172,7 +172,7 @@ def online_read( for entity_key in entity_keys: entity_key_bin = serialize_entity_key( entity_key, - entity_key_serialization_version=table.entity_key_serialization_version, + entity_key_serialization_version=config.entity_key_serialization_version, ) res = {} res_ts = None diff --git a/sdk/python/feast/infra/utils/hbase_utils.py b/sdk/python/feast/infra/utils/hbase_utils.py index 78a39caed8..e8e25d1dbd 100644 --- a/sdk/python/feast/infra/utils/hbase_utils.py +++ b/sdk/python/feast/infra/utils/hbase_utils.py @@ -167,13 +167,16 @@ def main(): table = connection.table("test_hbase_driver_hourly_stats") row_keys = [ serialize_entity_key( - EntityKey(join_keys=["driver_id"], entity_values=[Value(int64_val=1004)]) + EntityKey(join_keys=["driver_id"], entity_values=[Value(int64_val=1004)]), + entity_key_serialization_version=2 ).hex(), serialize_entity_key( - EntityKey(join_keys=["driver_id"], entity_values=[Value(int64_val=1005)]) + EntityKey(join_keys=["driver_id"], entity_values=[Value(int64_val=1005)]), + entity_key_serialization_version=2 ).hex(), serialize_entity_key( - EntityKey(join_keys=["driver_id"], entity_values=[Value(int64_val=1024)]) + EntityKey(join_keys=["driver_id"], entity_values=[Value(int64_val=1024)]), + entity_key_serialization_version=2 ).hex(), ] rows = table.rows(row_keys) diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index f7f564df6f..2069025b8b 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -139,6 +139,8 @@ class RepoConfig(FeastBaseModel): go_feature_retrieval: Optional[bool] = False + entity_key_serialization_version: StrictInt = 0 + def __init__(self, **data: Any): super().__init__(**data) diff --git a/sdk/python/tests/unit/infra/online_store/test_dynamodb_online_store.py b/sdk/python/tests/unit/infra/online_store/test_dynamodb_online_store.py index 25eb061930..07e22017b5 100644 --- a/sdk/python/tests/unit/infra/online_store/test_dynamodb_online_store.py +++ b/sdk/python/tests/unit/infra/online_store/test_dynamodb_online_store.py @@ -311,7 +311,7 @@ def test_write_batch_non_duplicates(repo_config, dynamodb_online_store): table_instance = dynamodb_resource.Table(f"{PROJECT}.{dynamodb_tbl}") # Insert duplicate data dynamodb_online_store._write_batch_non_duplicates( - table_instance, data + data_duplicate, progress=None + table_instance, data + data_duplicate, None, repo_config ) # Request more items than inserted response = table_instance.scan(Limit=20) diff --git a/sdk/python/tests/utils/online_store_utils.py b/sdk/python/tests/utils/online_store_utils.py index f72b4d5a2a..9cd7663869 100644 --- a/sdk/python/tests/utils/online_store_utils.py +++ b/sdk/python/tests/utils/online_store_utils.py @@ -45,7 +45,7 @@ def _insert_data_test_table(data, project, tbl_name, region): dynamodb_resource = boto3.resource("dynamodb", region_name=region) table_instance = dynamodb_resource.Table(f"{project}.{tbl_name}") for entity_key, features, timestamp, created_ts in data: - entity_id = compute_entity_id(entity_key) + entity_id = compute_entity_id(entity_key, entity_key_serialization_version=2) with table_instance.batch_writer() as batch: batch.put_item( Item={ From 7ab893cdecd88379753d7870cf347886ccd005c5 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Mon, 18 Jul 2022 14:26:02 -0700 Subject: [PATCH 11/19] fix tests Signed-off-by: Achal Shah --- .../onlinestore/redisonlinestore_test.go | 25 ++++++++++++++++--- .../contrib/hbase_online_store/hbase.py | 12 ++++++--- .../feast/infra/online_stores/datastore.py | 14 +++++++---- .../feast/infra/online_stores/dynamodb.py | 16 ++++++++---- sdk/python/feast/infra/online_stores/redis.py | 14 ++++++++--- sdk/python/feast/infra/utils/hbase_utils.py | 6 ++--- 6 files changed, 62 insertions(+), 25 deletions(-) diff --git a/go/internal/feast/onlinestore/redisonlinestore_test.go b/go/internal/feast/onlinestore/redisonlinestore_test.go index 43cdbe06a2..ad9ef1e1e4 100644 --- a/go/internal/feast/onlinestore/redisonlinestore_test.go +++ b/go/internal/feast/onlinestore/redisonlinestore_test.go @@ -1,6 +1,7 @@ package onlinestore import ( + "github.com/feast-dev/feast/go/internal/feast/registry" "testing" "github.com/stretchr/testify/assert" @@ -10,7 +11,11 @@ func TestNewRedisOnlineStore(t *testing.T) { var config = map[string]interface{}{ "connection_string": "redis://localhost:6379", } - store, err := NewRedisOnlineStore("test", config) + rc := ®istry.RepoConfig{ + OnlineStore: config, + EntityKeySerializationVersion: 2, + } + store, err := NewRedisOnlineStore("test", rc, config) assert.Nil(t, err) var opts = store.client.Options() assert.Equal(t, opts.Addr, "redis://localhost:6379") @@ -23,7 +28,11 @@ func TestNewRedisOnlineStoreWithPassword(t *testing.T) { var config = map[string]interface{}{ "connection_string": "redis://localhost:6379,password=secret", } - store, err := NewRedisOnlineStore("test", config) + rc := ®istry.RepoConfig{ + OnlineStore: config, + EntityKeySerializationVersion: 2, + } + store, err := NewRedisOnlineStore("test", rc, config) assert.Nil(t, err) var opts = store.client.Options() assert.Equal(t, opts.Addr, "redis://localhost:6379") @@ -34,7 +43,11 @@ func TestNewRedisOnlineStoreWithDB(t *testing.T) { var config = map[string]interface{}{ "connection_string": "redis://localhost:6379,db=1", } - store, err := NewRedisOnlineStore("test", config) + rc := ®istry.RepoConfig{ + OnlineStore: config, + EntityKeySerializationVersion: 2, + } + store, err := NewRedisOnlineStore("test", rc, config) assert.Nil(t, err) var opts = store.client.Options() assert.Equal(t, opts.Addr, "redis://localhost:6379") @@ -45,7 +58,11 @@ func TestNewRedisOnlineStoreWithSsl(t *testing.T) { var config = map[string]interface{}{ "connection_string": "redis://localhost:6379,ssl=true", } - store, err := NewRedisOnlineStore("test", config) + rc := ®istry.RepoConfig{ + OnlineStore: config, + EntityKeySerializationVersion: 2, + } + store, err := NewRedisOnlineStore("test", rc, config) assert.Nil(t, err) var opts = store.client.Options() assert.Equal(t, opts.Addr, "redis://localhost:6379") diff --git a/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase.py b/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase.py index 473fb50d9b..aff0c6c42c 100644 --- a/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase.py +++ b/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase.py @@ -108,8 +108,10 @@ def online_write_batch( b = hbase.batch(table_name) for entity_key, values, timestamp, created_ts in data: - row_key = serialize_entity_key(entity_key, - entity_key_serialization_version=config.entity_key_serialization_version).hex() + row_key = serialize_entity_key( + entity_key, + entity_key_serialization_version=config.entity_key_serialization_version, + ).hex() values_dict = {} for feature_name, val in values.items(): values_dict[ @@ -155,8 +157,10 @@ def online_read( result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] row_keys = [ - serialize_entity_key(entity_key, - entity_key_serialization_version=config.entity_key_serialization_version).hex() + serialize_entity_key( + entity_key, + entity_key_serialization_version=config.entity_key_serialization_version, + ).hex() for entity_key in entity_keys ] rows = hbase.rows(table_name, row_keys=row_keys) diff --git a/sdk/python/feast/infra/online_stores/datastore.py b/sdk/python/feast/infra/online_stores/datastore.py index 2da2ba186f..0ce7166355 100644 --- a/sdk/python/feast/infra/online_stores/datastore.py +++ b/sdk/python/feast/infra/online_stores/datastore.py @@ -191,12 +191,14 @@ def _write_minibatch( Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] ], progress: Optional[Callable[[int], Any]], - config: RepoConfig + config: RepoConfig, ): entities = [] for entity_key, features, timestamp, created_ts in data: - document_id = compute_entity_id(entity_key, - entity_key_serialization_version=config.entity_key_serialization_version) + document_id = compute_entity_id( + entity_key, + entity_key_serialization_version=config.entity_key_serialization_version, + ) key = client.key( "Project", project, "Table", table.name, "Row", document_id, @@ -243,8 +245,10 @@ def online_read( keys: List[Key] = [] result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] for entity_key in entity_keys: - document_id = compute_entity_id(entity_key, - entity_key_serialization_version=config.entity_key_serialization_version) + document_id = compute_entity_id( + entity_key, + entity_key_serialization_version=config.entity_key_serialization_version, + ) key = client.key( "Project", feast_project, "Table", table.name, "Row", document_id ) diff --git a/sdk/python/feast/infra/online_stores/dynamodb.py b/sdk/python/feast/infra/online_stores/dynamodb.py index 8fa86409d9..02d2b322d1 100644 --- a/sdk/python/feast/infra/online_stores/dynamodb.py +++ b/sdk/python/feast/infra/online_stores/dynamodb.py @@ -216,9 +216,13 @@ def online_read( ) result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] - entity_ids = [compute_entity_id(entity_key, - entity_key_serialization_version=config.entity_key_serialization_version) - for entity_key in entity_keys] + entity_ids = [ + compute_entity_id( + entity_key, + entity_key_serialization_version=config.entity_key_serialization_version, + ) + for entity_key in entity_keys + ] batch_size = online_config.batch_size entity_ids_iter = iter(entity_ids) while True: @@ -307,8 +311,10 @@ def _write_batch_non_duplicates( """Deduplicate write batch request items on ``entity_id`` primary key.""" with table_instance.batch_writer(overwrite_by_pkeys=["entity_id"]) as batch: for entity_key, features, timestamp, created_ts in data: - entity_id = compute_entity_id(entity_key, - entity_key_serialization_version=config.entity_key_serialization_version) + entity_id = compute_entity_id( + entity_key, + entity_key_serialization_version=config.entity_key_serialization_version, + ) batch.put_item( Item={ "entity_id": entity_id, # PartitionKey diff --git a/sdk/python/feast/infra/online_stores/redis.py b/sdk/python/feast/infra/online_stores/redis.py index 7ee91df44d..da458a3693 100644 --- a/sdk/python/feast/infra/online_stores/redis.py +++ b/sdk/python/feast/infra/online_stores/redis.py @@ -199,8 +199,11 @@ def online_write_batch( # TODO: investigate if check and set is a better approach rather than pulling all entity ts and then setting # it may be significantly slower but avoids potential (rare) race conditions for entity_key, _, _, _ in data: - redis_key_bin = _redis_key(project, entity_key, - entity_key_serialization_version=config.entity_key_serialization_version) + redis_key_bin = _redis_key( + project, + entity_key, + entity_key_serialization_version=config.entity_key_serialization_version, + ) keys.append(redis_key_bin) pipe.hmget(redis_key_bin, ts_key) prev_event_timestamps = pipe.execute() @@ -269,8 +272,11 @@ def online_read( keys = [] for entity_key in entity_keys: - redis_key_bin = _redis_key(project, entity_key, - entity_key_serialization_version=config.entity_key_serialization_version) + redis_key_bin = _redis_key( + project, + entity_key, + entity_key_serialization_version=config.entity_key_serialization_version, + ) keys.append(redis_key_bin) with client.pipeline(transaction=False) as pipe: for redis_key_bin in keys: diff --git a/sdk/python/feast/infra/utils/hbase_utils.py b/sdk/python/feast/infra/utils/hbase_utils.py index e8e25d1dbd..4816a60087 100644 --- a/sdk/python/feast/infra/utils/hbase_utils.py +++ b/sdk/python/feast/infra/utils/hbase_utils.py @@ -168,15 +168,15 @@ def main(): row_keys = [ serialize_entity_key( EntityKey(join_keys=["driver_id"], entity_values=[Value(int64_val=1004)]), - entity_key_serialization_version=2 + entity_key_serialization_version=2, ).hex(), serialize_entity_key( EntityKey(join_keys=["driver_id"], entity_values=[Value(int64_val=1005)]), - entity_key_serialization_version=2 + entity_key_serialization_version=2, ).hex(), serialize_entity_key( EntityKey(join_keys=["driver_id"], entity_values=[Value(int64_val=1024)]), - entity_key_serialization_version=2 + entity_key_serialization_version=2, ).hex(), ] rows = table.rows(row_keys) From 2e1b123b07d41db164b6c632f86efc521c231038 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Mon, 18 Jul 2022 14:37:00 -0700 Subject: [PATCH 12/19] remove protos Signed-off-by: Achal Shah --- protos/feast/core/FeatureView.proto | 3 -- protos/feast/core/OnDemandFeatureView.proto | 3 -- protos/feast/core/RequestFeatureView.proto | 3 -- protos/feast/core/StreamFeatureView.proto | 3 -- sdk/python/feast/base_feature_view.py | 13 -------- sdk/python/feast/batch_feature_view.py | 2 -- sdk/python/feast/feature_view.py | 8 ----- sdk/python/feast/on_demand_feature_view.py | 11 ------- sdk/python/feast/registry.py | 33 ------------------- sdk/python/feast/request_feature_view.py | 8 ----- sdk/python/feast/stream_feature_view.py | 8 ----- .../integration/registration/test_registry.py | 4 --- sdk/python/tests/unit/test_feature_views.py | 5 --- .../unit/test_project/feature_store.yaml | 1 + .../tests/unit/test_serialization_version.py | 19 ++++++----- 15 files changed, 11 insertions(+), 113 deletions(-) diff --git a/protos/feast/core/FeatureView.proto b/protos/feast/core/FeatureView.proto index 522a496c35..c9e38bf344 100644 --- a/protos/feast/core/FeatureView.proto +++ b/protos/feast/core/FeatureView.proto @@ -75,9 +75,6 @@ message FeatureViewSpec { // Whether these features should be served online or not bool online = 8; - - // Needed for backwards compatible behaviour when fixing - int32 entity_key_serialization_version = 13; } message FeatureViewMeta { diff --git a/protos/feast/core/OnDemandFeatureView.proto b/protos/feast/core/OnDemandFeatureView.proto index b3bf431f75..33c51f5c4d 100644 --- a/protos/feast/core/OnDemandFeatureView.proto +++ b/protos/feast/core/OnDemandFeatureView.proto @@ -58,9 +58,6 @@ message OnDemandFeatureViewSpec { // Owner of the on demand feature view. string owner = 8; - - // Needed for backwards compatible behaviour when fixing - int32 entity_key_serialization_version = 9; } message OnDemandFeatureViewMeta { diff --git a/protos/feast/core/RequestFeatureView.proto b/protos/feast/core/RequestFeatureView.proto index 5a3a55bb88..4049053c2b 100644 --- a/protos/feast/core/RequestFeatureView.proto +++ b/protos/feast/core/RequestFeatureView.proto @@ -48,7 +48,4 @@ message RequestFeatureViewSpec { // Owner of the request feature view. string owner = 6; - - // Needed for backwards compatible behaviour when fixing - int32 entity_key_serialization_version = 7; } diff --git a/protos/feast/core/StreamFeatureView.proto b/protos/feast/core/StreamFeatureView.proto index 963577d8aa..06e9ee0612 100644 --- a/protos/feast/core/StreamFeatureView.proto +++ b/protos/feast/core/StreamFeatureView.proto @@ -88,8 +88,5 @@ message StreamFeatureViewSpec { // Timestamp field for aggregation string timestamp_field = 16; - - // Needed for backwards compatible behaviour when fixing - int32 entity_key_serialization_version = 17; } diff --git a/sdk/python/feast/base_feature_view.py b/sdk/python/feast/base_feature_view.py index 9e975a1610..5feb1d7d89 100644 --- a/sdk/python/feast/base_feature_view.py +++ b/sdk/python/feast/base_feature_view.py @@ -49,8 +49,6 @@ class BaseFeatureView(ABC): created_timestamp: Optional[datetime] last_updated_timestamp: Optional[datetime] - _entity_key_serialization_version: Optional[int] - @abstractmethod def __init__( self, @@ -60,7 +58,6 @@ def __init__( description: str = "", tags: Optional[Dict[str, str]] = None, owner: str = "", - entity_key_serialization_version: Optional[int] = None, ): """ Creates a BaseFeatureView object. @@ -85,16 +82,6 @@ def __init__( self.projection = FeatureViewProjection.from_definition(self) self.created_timestamp = None self.last_updated_timestamp = None - self._entity_key_serialization_version = entity_key_serialization_version - - @property - def entity_key_serialization_version(self) -> int: - if self._entity_key_serialization_version: - return self._entity_key_serialization_version - return 2 # The default entity key serialization version. - - def set_entity_key_serialization_version(self, v: int): - self._entity_key_serialization_version = v @property @abstractmethod diff --git a/sdk/python/feast/batch_feature_view.py b/sdk/python/feast/batch_feature_view.py index 4b609a32e3..2f9fb080db 100644 --- a/sdk/python/feast/batch_feature_view.py +++ b/sdk/python/feast/batch_feature_view.py @@ -30,7 +30,6 @@ def __init__( owner: str = "", schema: Optional[List[Field]] = None, source: Optional[DataSource] = None, - entity_key_serialization_version: Optional[int] = None, ): if source is None: @@ -56,5 +55,4 @@ def __init__( owner=owner, schema=schema, source=source, - entity_key_serialization_version=entity_key_serialization_version, ) diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index bca524b413..dd8cb4f0a6 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -110,7 +110,6 @@ def __init__( owner: str = "", schema: Optional[List[Field]] = None, source: Optional[DataSource] = None, - entity_key_serialization_version: Optional[int] = None, ): """ Creates a FeatureView object. @@ -261,7 +260,6 @@ def __init__( description=description, tags=tags, owner=owner, - entity_key_serialization_version=entity_key_serialization_version, ) self.online = online self.materialization_intervals = [] @@ -432,7 +430,6 @@ def to_proto(self) -> FeatureViewProto: online=self.online, batch_source=batch_source_proto, stream_source=stream_source_proto, - entity_key_serialization_version=self.entity_key_serialization_version, ) return FeatureViewProto(spec=spec, meta=meta) @@ -524,11 +521,6 @@ def from_proto(cls, feature_view_proto: FeatureViewProto): ) ) - if feature_view_proto.spec.entity_key_serialization_version <= 1: - feature_view.set_entity_key_serialization_version(1) - else: - feature_view.set_entity_key_serialization_version(2) - return feature_view @property diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index d16dced43f..9a73f46f7f 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -88,7 +88,6 @@ def __init__( # noqa: C901 description: str = "", tags: Optional[Dict[str, str]] = None, owner: str = "", - entity_key_serialization_version: Optional[int] = None, ): """ Creates an OnDemandFeatureView object. @@ -220,7 +219,6 @@ def __init__( # noqa: C901 description=description, tags=tags, owner=owner, - entity_key_serialization_version=entity_key_serialization_version, ) assert _sources is not None self.source_feature_view_projections: Dict[str, FeatureViewProjection] = {} @@ -312,7 +310,6 @@ def to_proto(self) -> OnDemandFeatureViewProto: description=self.description, tags=self.tags, owner=self.owner, - entity_key_serialization_version=self.entity_key_serialization_version, ) return OnDemandFeatureViewProto(spec=spec, meta=meta) @@ -363,11 +360,6 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto): owner=on_demand_feature_view_proto.spec.owner, ) - if on_demand_feature_view_proto.spec.entity_key_serialization_version <= 1: - on_demand_feature_view_obj.set_entity_key_serialization_version(1) - else: - on_demand_feature_view_obj.set_entity_key_serialization_version(2) - # FeatureViewProjections are not saved in the OnDemandFeatureView proto. # Create the default projection. on_demand_feature_view_obj.projection = FeatureViewProjection.from_definition( @@ -533,7 +525,6 @@ def on_demand_feature_view( description: str = "", tags: Optional[Dict[str, str]] = None, owner: str = "", - entity_key_serialization_version: Optional[int] = None, ): """ Creates an OnDemandFeatureView object with the given user function as udf. @@ -553,7 +544,6 @@ def on_demand_feature_view( tags (optional): A dictionary of key-value pairs to store arbitrary metadata. owner (optional): The owner of the on demand feature view, typically the email of the primary maintainer. - entity_key_serialization_version: The serialization version for entities. """ positional_attributes = ["features", "inputs"] @@ -661,7 +651,6 @@ def decorator(user_function): description=description, tags=tags, owner=owner, - entity_key_serialization_version=entity_key_serialization_version, ) functools.update_wrapper( wrapper=on_demand_feature_view_obj, wrapped=user_function diff --git a/sdk/python/feast/registry.py b/sdk/python/feast/registry.py index bb637921f4..71fff49f2d 100644 --- a/sdk/python/feast/registry.py +++ b/sdk/python/feast/registry.py @@ -1162,8 +1162,6 @@ def apply_feature_view( else: raise ValueError(f"Unexpected feature view type: {type(feature_view)}") - does_fv_exist_in_registry: bool = False - saved_entity_key_serialization_version: Optional[int] = None for idx, existing_feature_view_proto in enumerate( existing_feature_views_of_same_type ): @@ -1171,10 +1169,6 @@ def apply_feature_view( existing_feature_view_proto.spec.name == feature_view_proto.spec.name and existing_feature_view_proto.spec.project == project ): - saved_entity_key_serialization_version = ( - existing_feature_view_proto.spec.entity_key_serialization_version - ) - does_fv_exist_in_registry = True if ( feature_view.__class__.from_proto(existing_feature_view_proto) == feature_view @@ -1184,33 +1178,6 @@ def apply_feature_view( del existing_feature_views_of_same_type[idx] break - if not does_fv_exist_in_registry: - # Brand new FV! Lets set the serialization version to 2, AKA the correct version with the - # int64 fix, if the serialization version is "unset" in the FV. - serialization_version = ( - feature_view.entity_key_serialization_version - if feature_view.entity_key_serialization_version - else 2 - ) - feature_view_proto.spec.entity_key_serialization_version = ( - serialization_version - ) - feature_view.set_entity_key_serialization_version(serialization_version) - elif ( - does_fv_exist_in_registry - and saved_entity_key_serialization_version - and saved_entity_key_serialization_version <= 1 - ): - # Here, the feature view already exists in the registry, but doesn't have a serialization version set - # (or it has it set to 0 AKA "unset" in proto) - # in this case, we set the value to 1 (which is the current, broken serialization version) - feature_view_proto.spec.entity_key_serialization_version = 1 - feature_view.set_entity_key_serialization_version(1) - else: - # The FV exists in the registry, and the serialization version is set explicitly. - # In which case, we probably don't need to do anything? - pass - existing_feature_views_of_same_type.append(feature_view_proto) if commit: self.commit() diff --git a/sdk/python/feast/request_feature_view.py b/sdk/python/feast/request_feature_view.py index 6d32eed245..7248ffe989 100644 --- a/sdk/python/feast/request_feature_view.py +++ b/sdk/python/feast/request_feature_view.py @@ -44,7 +44,6 @@ def __init__( description: str = "", tags: Optional[Dict[str, str]] = None, owner: str = "", - entity_key_serialization_version=1, ): """ Creates a RequestFeatureView object. @@ -78,7 +77,6 @@ def __init__( description=description, tags=tags, owner=owner, - entity_key_serialization_version=entity_key_serialization_version, ) self.request_source = request_data_source @@ -99,7 +97,6 @@ def to_proto(self) -> RequestFeatureViewProto: description=self.description, tags=self.tags, owner=self.owner, - entity_key_serialization_version=self.entity_key_serialization_version, ) return RequestFeatureViewProto(spec=spec) @@ -126,11 +123,6 @@ def from_proto(cls, request_feature_view_proto: RequestFeatureViewProto): owner=request_feature_view_proto.spec.owner, ) - if request_feature_view_proto.spec.entity_key_serialization_version <= 1: - request_feature_view_obj.set_entity_key_serialization_version(1) - else: - request_feature_view_obj.set_entity_key_serialization_version(2) - # FeatureViewProjections are not saved in the RequestFeatureView proto. # Create the default projection. request_feature_view_obj.projection = FeatureViewProjection.from_definition( diff --git a/sdk/python/feast/stream_feature_view.py b/sdk/python/feast/stream_feature_view.py index 814cd41193..f19b1fcff7 100644 --- a/sdk/python/feast/stream_feature_view.py +++ b/sdk/python/feast/stream_feature_view.py @@ -135,8 +135,6 @@ def __init__( owner=owner, schema=schema, source=source, - # Since stream feature views are so new, lets make sure they always use the latest serialization version. - entity_key_serialization_version=2, ) def __eq__(self, other): @@ -199,7 +197,6 @@ def to_proto(self): timestamp_field=self.timestamp_field, aggregations=[agg.to_proto() for agg in self.aggregations], mode=self.mode, - entity_key_serialization_version=self.entity_key_serialization_version, ) return StreamFeatureViewProto(spec=spec, meta=meta) @@ -278,11 +275,6 @@ def from_proto(cls, sfv_proto): ) ) - if sfv_proto.spec.entity_key_serialization_version <= 1: - stream_feature_view.set_entity_key_serialization_version(1) - else: - stream_feature_view.set_entity_key_serialization_version(2) - return stream_feature_view def __copy__(self): diff --git a/sdk/python/tests/integration/registration/test_registry.py b/sdk/python/tests/integration/registration/test_registry.py index 54795c3982..27bbbbd2bb 100644 --- a/sdk/python/tests/integration/registration/test_registry.py +++ b/sdk/python/tests/integration/registration/test_registry.py @@ -17,7 +17,6 @@ import pandas as pd import pytest -from assertpy import assertpy from pytest_lazyfixture import lazy_fixture from feast import FileSource @@ -417,9 +416,6 @@ def odfv1(feature_df: pd.DataFrame) -> pd.DataFrame: test_registry.apply_feature_view(odfv1, project) test_registry.apply_feature_view(fv1, project) - assertpy.assert_that(odfv1.entity_key_serialization_version).is_equal_to(2) - assertpy.assert_that(fv1.entity_key_serialization_version).is_equal_to(2) - # Modify odfv by changing a single feature dtype @on_demand_feature_view( features=[ diff --git a/sdk/python/tests/unit/test_feature_views.py b/sdk/python/tests/unit/test_feature_views.py index e91efd01df..067e1afe11 100644 --- a/sdk/python/tests/unit/test_feature_views.py +++ b/sdk/python/tests/unit/test_feature_views.py @@ -24,7 +24,6 @@ def test_create_batch_feature_view(): ttl=timedelta(days=30), source=batch_source, ) - assertpy.assert_that(bfv.entity_key_serialization_version).is_equal_to(2) with pytest.raises(ValueError): BatchFeatureView( @@ -129,15 +128,11 @@ def test_stream_feature_view_serialization(): tags={}, ) - assertpy.assert_that(sfv.entity_key_serialization_version).is_equal_to(2) - sfv_proto = sfv.to_proto() new_sfv = StreamFeatureView.from_proto(sfv_proto=sfv_proto) assert new_sfv == sfv - assertpy.assert_that(new_sfv.entity_key_serialization_version).is_equal_to(2) - def test_stream_feature_view_udfs(): entity = Entity(name="driver_entity", join_keys=["test_key"]) diff --git a/sdk/python/tests/unit/test_project/feature_store.yaml b/sdk/python/tests/unit/test_project/feature_store.yaml index eeec58e2ff..88bf510d6e 100644 --- a/sdk/python/tests/unit/test_project/feature_store.yaml +++ b/sdk/python/tests/unit/test_project/feature_store.yaml @@ -6,3 +6,4 @@ registry: registry.db flags: alpha_features: true on_demand_transforms: true +entity_key_serialization_version: 2 \ No newline at end of file diff --git a/sdk/python/tests/unit/test_serialization_version.py b/sdk/python/tests/unit/test_serialization_version.py index 085983ff8c..fcde087341 100644 --- a/sdk/python/tests/unit/test_serialization_version.py +++ b/sdk/python/tests/unit/test_serialization_version.py @@ -1,4 +1,5 @@ import os +import tempfile from assertpy import assertpy @@ -6,12 +7,12 @@ def test_registry_entity_serialization_version(): - r = RepoConfig( - project="prompt_dory", - provider="local", - online_store="redis", - registry=f"{os.path.dirname(__file__)}/test_project/registry.db", - ) - fs: FeatureStore = FeatureStore(config=r) - fvs = fs.list_feature_views() - assertpy.assert_that(fvs[0].entity_key_serialization_version).is_equal_to(1) + with tempfile.TemporaryDirectory() as tmpdir: + r = RepoConfig( + project="prompt_dory", + provider="local", + online_store="redis", + registry=f"{tmpdir}/registry.db", + entity_key_serialization_version=2, + ) + assertpy.assert_that(r.entity_key_serialization_version).is_equal_to(2) \ No newline at end of file From fd1e8ebf36cf0aead7c30988978deca81fb703f5 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Mon, 18 Jul 2022 14:54:43 -0700 Subject: [PATCH 13/19] fix tests Signed-off-by: Achal Shah --- sdk/python/.gitignore | 2 -- sdk/python/tests/unit/test_feature_views.py | 3 +-- .../tests/unit/test_project/feature_store.yaml | 9 --------- sdk/python/tests/unit/test_project/registry.db | Bin 1178 -> 0 bytes .../tests/unit/test_serialization_version.py | 5 ++--- 5 files changed, 3 insertions(+), 16 deletions(-) delete mode 100644 sdk/python/tests/unit/test_project/feature_store.yaml delete mode 100644 sdk/python/tests/unit/test_project/registry.db diff --git a/sdk/python/.gitignore b/sdk/python/.gitignore index 696eabedbf..9cab5dff20 100644 --- a/sdk/python/.gitignore +++ b/sdk/python/.gitignore @@ -116,5 +116,3 @@ dmypy.json .vscode/* playground - -!tests/unit/test_project/*.db \ No newline at end of file diff --git a/sdk/python/tests/unit/test_feature_views.py b/sdk/python/tests/unit/test_feature_views.py index 067e1afe11..d6be8e0341 100644 --- a/sdk/python/tests/unit/test_feature_views.py +++ b/sdk/python/tests/unit/test_feature_views.py @@ -1,7 +1,6 @@ from datetime import timedelta import pytest -from assertpy import assertpy from feast.aggregation import Aggregation from feast.batch_feature_view import BatchFeatureView @@ -18,7 +17,7 @@ def test_create_batch_feature_view(): batch_source = FileSource(path="some path") - bfv = BatchFeatureView( + BatchFeatureView( name="test batch feature view", entities=[], ttl=timedelta(days=30), diff --git a/sdk/python/tests/unit/test_project/feature_store.yaml b/sdk/python/tests/unit/test_project/feature_store.yaml deleted file mode 100644 index 88bf510d6e..0000000000 --- a/sdk/python/tests/unit/test_project/feature_store.yaml +++ /dev/null @@ -1,9 +0,0 @@ -project: prompt_dory -provider: local -online_store: redis -offline_store: {} -registry: registry.db -flags: - alpha_features: true - on_demand_transforms: true -entity_key_serialization_version: 2 \ No newline at end of file diff --git a/sdk/python/tests/unit/test_project/registry.db b/sdk/python/tests/unit/test_project/registry.db deleted file mode 100644 index f9ab03b57bc7415023d88f22432aae7911171659..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 1178 zcmcIj&x_MQ7&UD-&5o6ZS@2LiNZeDQNgG?Wd+^69ie48l9vr5VFK!@h;$&JZUW&LN zvIiF{h$4tAdr(j>9uyJr;=zj`!u|!yo>kb3=#X~T6yj-hF8N*u+5?ZoEi#dGLBETT*JqY?B>1SZldZv03#{rCx8u z*Y&*)NI3zA$aFnl)>0t@^R%oN;_{KQ)2YW5Y;GGoczQ3kX0+qYwO8xel8#huaQERg z70d6h)fIK_zJjiz11U=+bMx<7)PO{mXc152u7LYnphR>G%731>Bv|+e1$lWn^jm~l&uE;$3FkDWd dKD<3N6SANGTusQ1QR~_Nj_1y!_E Date: Mon, 18 Jul 2022 15:15:53 -0700 Subject: [PATCH 14/19] update feature_store.yaml templates Signed-off-by: Achal Shah --- sdk/python/feast/templates/aws/feature_store.yaml | 1 + sdk/python/feast/templates/gcp/feature_store.yaml | 3 ++- sdk/python/feast/templates/hbase/feature_store.yaml | 1 + sdk/python/feast/templates/local/feature_store.yaml | 3 ++- sdk/python/feast/templates/minimal/feature_store.yaml | 3 ++- sdk/python/feast/templates/postgres/feature_store.yaml | 1 + sdk/python/feast/templates/snowflake/feature_store.yaml | 1 + sdk/python/feast/templates/spark/feature_store.yaml | 1 + 8 files changed, 11 insertions(+), 3 deletions(-) diff --git a/sdk/python/feast/templates/aws/feature_store.yaml b/sdk/python/feast/templates/aws/feature_store.yaml index 27d1c6879f..3745a75347 100644 --- a/sdk/python/feast/templates/aws/feature_store.yaml +++ b/sdk/python/feast/templates/aws/feature_store.yaml @@ -12,3 +12,4 @@ offline_store: user: %REDSHIFT_USER% s3_staging_location: %REDSHIFT_S3_STAGING_LOCATION% iam_role: %REDSHIFT_IAM_ROLE% +entity_key_serialization_version: 2 diff --git a/sdk/python/feast/templates/gcp/feature_store.yaml b/sdk/python/feast/templates/gcp/feature_store.yaml index 14c8d5a94f..74ee729090 100644 --- a/sdk/python/feast/templates/gcp/feature_store.yaml +++ b/sdk/python/feast/templates/gcp/feature_store.yaml @@ -1,3 +1,4 @@ project: my_project registry: data/registry.db -provider: gcp \ No newline at end of file +provider: gcp +entity_key_serialization_version: 2 diff --git a/sdk/python/feast/templates/hbase/feature_store.yaml b/sdk/python/feast/templates/hbase/feature_store.yaml index 83ce237b71..f99e858f7c 100644 --- a/sdk/python/feast/templates/hbase/feature_store.yaml +++ b/sdk/python/feast/templates/hbase/feature_store.yaml @@ -5,3 +5,4 @@ online_store: type: hbase host: 127.0.0.1 port: 9090 +entity_key_serialization_version: 2 diff --git a/sdk/python/feast/templates/local/feature_store.yaml b/sdk/python/feast/templates/local/feature_store.yaml index dcbe32d943..fddde04f90 100644 --- a/sdk/python/feast/templates/local/feature_store.yaml +++ b/sdk/python/feast/templates/local/feature_store.yaml @@ -2,4 +2,5 @@ project: my_project registry: data/registry.db provider: local online_store: - path: data/online_store.db \ No newline at end of file + path: data/online_store.db +entity_key_serialization_version: 2 diff --git a/sdk/python/feast/templates/minimal/feature_store.yaml b/sdk/python/feast/templates/minimal/feature_store.yaml index 2083288ad7..9808690005 100644 --- a/sdk/python/feast/templates/minimal/feature_store.yaml +++ b/sdk/python/feast/templates/minimal/feature_store.yaml @@ -2,4 +2,5 @@ project: my_project registry: /path/to/registry.db provider: local online_store: - path: /path/to/online_store.db \ No newline at end of file + path: /path/to/online_store.db +entity_key_serialization_version: 2 diff --git a/sdk/python/feast/templates/postgres/feature_store.yaml b/sdk/python/feast/templates/postgres/feature_store.yaml index 53b86b7064..0ccd4a6d49 100644 --- a/sdk/python/feast/templates/postgres/feature_store.yaml +++ b/sdk/python/feast/templates/postgres/feature_store.yaml @@ -25,3 +25,4 @@ offline_store: db_schema: DB_SCHEMA user: DB_USERNAME password: DB_PASSWORD +entity_key_serialization_version: 2 diff --git a/sdk/python/feast/templates/snowflake/feature_store.yaml b/sdk/python/feast/templates/snowflake/feature_store.yaml index 9757ea2ead..948869897b 100644 --- a/sdk/python/feast/templates/snowflake/feature_store.yaml +++ b/sdk/python/feast/templates/snowflake/feature_store.yaml @@ -9,3 +9,4 @@ offline_store: role: SNOWFLAKE_ROLE warehouse: SNOWFLAKE_WAREHOUSE database: SNOWFLAKE_DATABASE +entity_key_serialization_version: 2 diff --git a/sdk/python/feast/templates/spark/feature_store.yaml b/sdk/python/feast/templates/spark/feature_store.yaml index 2ea0ddfcc9..91e3ecf472 100644 --- a/sdk/python/feast/templates/spark/feature_store.yaml +++ b/sdk/python/feast/templates/spark/feature_store.yaml @@ -12,3 +12,4 @@ offline_store: spark.sql.session.timeZone: "UTC" online_store: path: data/online_store.db +entity_key_serialization_version: 2 From 37e32e488c22ef52755dda1e6e40a23b210a31a2 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Mon, 18 Jul 2022 15:52:19 -0700 Subject: [PATCH 15/19] fix java Signed-off-by: Achal Shah --- .../serving/config/ApplicationProperties.java | 177 +++++++++--------- .../config/ServingServiceConfigV2.java | 6 +- .../retriever/EntityKeySerializerV2.java | 23 ++- 3 files changed, 113 insertions(+), 93 deletions(-) diff --git a/java/serving/src/main/java/feast/serving/config/ApplicationProperties.java b/java/serving/src/main/java/feast/serving/config/ApplicationProperties.java index 268592d20a..5850eb6483 100644 --- a/java/serving/src/main/java/feast/serving/config/ApplicationProperties.java +++ b/java/serving/src/main/java/feast/serving/config/ApplicationProperties.java @@ -38,22 +38,84 @@ /** Feast Serving properties. */ public class ApplicationProperties { private static final Logger log = org.slf4j.LoggerFactory.getLogger(ApplicationProperties.class); + private FeastProperties feast; + private GrpcServer grpc; + private RestServer rest; - public static class FeastProperties { - /* Feast Serving build version */ - @NotBlank private String version = "unknown"; + public FeastProperties getFeast() { + return feast; + } - public void setRegistry(String registry) { - this.registry = registry; + public void setFeast(FeastProperties feast) { + this.feast = feast; + } + + public GrpcServer getGrpc() { + return grpc; + } + + public void setGrpc(GrpcServer grpc) { + this.grpc = grpc; + } + + public RestServer getRest() { + return rest; + } + + public void setRest(RestServer rest) { + this.rest = rest; + } + + /** + * Validates all FeastProperties. This method runs after properties have been initialized and + * individually and conditionally validates each class. + */ + @PostConstruct + public void validate() { + ValidatorFactory factory = Validation.buildDefaultValidatorFactory(); + Validator validator = factory.getValidator(); + + // Validate root fields in FeastProperties + Set> violations = validator.validate(this); + if (!violations.isEmpty()) { + throw new ConstraintViolationException(violations); } + } + public enum StoreType { + REDIS, + REDIS_CLUSTER; + } + + public static class FeastProperties { + /* Feast Serving build version */ + @NotBlank private String version = "unknown"; @NotBlank private String registry; + @NotBlank private String project; + private int registryRefreshInterval; + private int entityKeySerializationVersion; + /** Name of the active store configuration (only one store can be active at a time). */ + @NotBlank private String activeStore; + /** + * Collection of store configurations. The active store is selected by the "activeStore" field. + */ + @JsonMerge(OptBoolean.FALSE) + private List stores = new ArrayList<>(); + /* Metric tracing properties. */ + private TracingProperties tracing; + /* Feast Audit Logging properties */ + @NotNull private LoggingProperties logging; + private String gcpProject; + private String awsRegion; + private String transformationServiceEndpoint; public String getRegistry() { return registry; } - @NotBlank private String project; + public void setRegistry(String registry) { + this.registry = registry; + } public String getProject() { return project; @@ -63,8 +125,6 @@ public void setProject(final String project) { this.project = project; } - private int registryRefreshInterval; - public int getRegistryRefreshInterval() { return registryRefreshInterval; } @@ -73,6 +133,14 @@ public void setRegistryRefreshInterval(int registryRefreshInterval) { this.registryRefreshInterval = registryRefreshInterval; } + public int getEntityKeySerializationVersion() { + return entityKeySerializationVersion; + } + + public void setEntityKeySerializationVersion(int entityKeySerializationVersion) { + this.entityKeySerializationVersion = entityKeySerializationVersion; + } + /** * Finds and returns the active store * @@ -92,25 +160,6 @@ public void setActiveStore(String activeStore) { this.activeStore = activeStore; } - /** Name of the active store configuration (only one store can be active at a time). */ - @NotBlank private String activeStore; - - /** - * Collection of store configurations. The active store is selected by the "activeStore" field. - */ - @JsonMerge(OptBoolean.FALSE) - private List stores = new ArrayList<>(); - - /* Metric tracing properties. */ - private TracingProperties tracing; - - /* Feast Audit Logging properties */ - @NotNull private LoggingProperties logging; - - public void setStores(List stores) { - this.stores = stores; - } - /** * Gets Serving store configuration as a list of {@link Store}. * @@ -120,6 +169,10 @@ public List getStores() { return stores; } + public void setStores(List stores) { + this.stores = stores; + } + /** * Gets Feast Serving build version. * @@ -129,10 +182,6 @@ public String getVersion() { return version; } - public void setTracing(TracingProperties tracing) { - this.tracing = tracing; - } - /** * Gets tracing properties * @@ -142,6 +191,10 @@ public TracingProperties getTracing() { return tracing; } + public void setTracing(TracingProperties tracing) { + this.tracing = tracing; + } + /** * Gets logging properties * @@ -151,8 +204,6 @@ public LoggingProperties getLogging() { return logging; } - private String gcpProject; - public String getGcpProject() { return gcpProject; } @@ -161,17 +212,13 @@ public void setGcpProject(String gcpProject) { this.gcpProject = gcpProject; } - public void setAwsRegion(String awsRegion) { - this.awsRegion = awsRegion; - } - - private String awsRegion; - public String getAwsRegion() { return awsRegion; } - private String transformationServiceEndpoint; + public void setAwsRegion(String awsRegion) { + this.awsRegion = awsRegion; + } public String getTransformationServiceEndpoint() { return transformationServiceEndpoint; @@ -182,16 +229,6 @@ public void setTransformationServiceEndpoint(String transformationServiceEndpoin } } - private FeastProperties feast; - - public void setFeast(FeastProperties feast) { - this.feast = feast; - } - - public FeastProperties getFeast() { - return feast; - } - /** Store configuration class for database that this Feast Serving uses. */ public static class Store { @@ -327,30 +364,6 @@ public void setServer(Server server) { } } - private GrpcServer grpc; - private RestServer rest; - - public GrpcServer getGrpc() { - return grpc; - } - - public void setGrpc(GrpcServer grpc) { - this.grpc = grpc; - } - - public RestServer getRest() { - return rest; - } - - public void setRest(RestServer rest) { - this.rest = rest; - } - - public enum StoreType { - REDIS, - REDIS_CLUSTER; - } - /** Trace metric collection properties */ public static class TracingProperties { @@ -417,20 +430,4 @@ public void setServiceName(String serviceName) { this.serviceName = serviceName; } } - - /** - * Validates all FeastProperties. This method runs after properties have been initialized and - * individually and conditionally validates each class. - */ - @PostConstruct - public void validate() { - ValidatorFactory factory = Validation.buildDefaultValidatorFactory(); - Validator validator = factory.getValidator(); - - // Validate root fields in FeastProperties - Set> violations = validator.validate(this); - if (!violations.isEmpty()) { - throw new ConstraintViolationException(violations); - } - } } diff --git a/java/serving/src/main/java/feast/serving/config/ServingServiceConfigV2.java b/java/serving/src/main/java/feast/serving/config/ServingServiceConfigV2.java index 4ea0692ccd..868e3b83d1 100644 --- a/java/serving/src/main/java/feast/serving/config/ServingServiceConfigV2.java +++ b/java/serving/src/main/java/feast/serving/config/ServingServiceConfigV2.java @@ -48,7 +48,8 @@ public ServingServiceV2 registryBasedServingServiceV2( new OnlineRetriever( applicationProperties.getFeast().getProject(), redisClusterClient, - new EntityKeySerializerV2()); + new EntityKeySerializerV2( + applicationProperties.getFeast().getEntityKeySerializationVersion())); break; case REDIS: RedisClientAdapter redisClient = RedisClient.create(store.getRedisConfig()); @@ -57,7 +58,8 @@ public ServingServiceV2 registryBasedServingServiceV2( new OnlineRetriever( applicationProperties.getFeast().getProject(), redisClient, - new EntityKeySerializerV2()); + new EntityKeySerializerV2( + applicationProperties.getFeast().getEntityKeySerializationVersion())); break; default: throw new RuntimeException( diff --git a/java/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/EntityKeySerializerV2.java b/java/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/EntityKeySerializerV2.java index 3e9ab7e8ab..96d1141dd7 100644 --- a/java/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/EntityKeySerializerV2.java +++ b/java/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/EntityKeySerializerV2.java @@ -30,6 +30,15 @@ // https://github.com/feast-dev/feast/blob/b1ccf8dd1535f721aee8bea937ee38feff80bec5/sdk/python/feast/infra/key_encoding_utils.py#L22 // and must be kept up to date with any changes in that logic. public class EntityKeySerializerV2 implements EntityKeySerializer { + private final int entityKeySerializationVersion; + + public EntityKeySerializerV2() { + this(1); + } + + public EntityKeySerializerV2(int entityKeySerializationVersion) { + this.entityKeySerializationVersion = entityKeySerializationVersion; + } @Override public byte[] serialize(RedisProto.RedisKeyV2 entityKey) { @@ -83,7 +92,11 @@ public byte[] serialize(RedisProto.RedisKeyV2 entityKey) { we use `struct.pack(" encodeInteger(Integer value) { return Arrays.asList(ArrayUtils.toObject(buffer.array())); } + private List encodeLong(Long value) { + ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); + buffer.order(ByteOrder.LITTLE_ENDIAN); + buffer.putInt(value); + + return Arrays.asList(ArrayUtils.toObject(buffer.array())); + } + private List encodeString(String value) { byte[] stringBytes = value.getBytes(StandardCharsets.UTF_8); return encodeBytes(stringBytes); From e1a47fcc2f2942a13b55c972dbe08b2ba33d1ea5 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Mon, 18 Jul 2022 16:02:21 -0700 Subject: [PATCH 16/19] fix java test Signed-off-by: Achal Shah --- .../connectors/redis/retriever/EntityKeySerializerV2.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/EntityKeySerializerV2.java b/java/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/EntityKeySerializerV2.java index 96d1141dd7..f99e5cbdb1 100644 --- a/java/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/EntityKeySerializerV2.java +++ b/java/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/EntityKeySerializerV2.java @@ -129,7 +129,7 @@ private List encodeInteger(Integer value) { private List encodeLong(Long value) { ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); buffer.order(ByteOrder.LITTLE_ENDIAN); - buffer.putInt(value); + buffer.putLong(value); return Arrays.asList(ArrayUtils.toObject(buffer.array())); } From bbceb68ba2862e7328bd656c9a86ff75386ac1c5 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Tue, 19 Jul 2022 14:31:19 -0700 Subject: [PATCH 17/19] docs Signed-off-by: Achal Shah --- .../feast/onlinestore/redisonlinestore.go | 2 +- sdk/python/feast/repo_config.py | 25 +++++++++++++++++-- 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/go/internal/feast/onlinestore/redisonlinestore.go b/go/internal/feast/onlinestore/redisonlinestore.go index 5d40834b4f..8fb85085d4 100644 --- a/go/internal/feast/onlinestore/redisonlinestore.go +++ b/go/internal/feast/onlinestore/redisonlinestore.go @@ -360,8 +360,8 @@ func serializeValue(value interface{}, entityKeySerializationVersion int64) (*[] binary.LittleEndian.PutUint32(valueBuffer, uint32(x.Int32Val)) return &valueBuffer, types.ValueType_INT32, nil case *types.Value_Int64Val: - // TODO (woop): We unfortunately have to use 32 bit here for backward compatibility :( if entityKeySerializationVersion <= 1 { + // We unfortunately have to use 32 bit here for backward compatibility :( valueBuffer := make([]byte, 4) binary.LittleEndian.PutUint32(valueBuffer, uint32(x.Int64Val)) return &valueBuffer, types.ValueType_INT64, nil diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index 2069025b8b..eb836b31d0 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -1,5 +1,6 @@ import logging import os +import warnings from pathlib import Path from typing import Any @@ -25,6 +26,8 @@ from feast.importer import import_class from feast.usage import log_exceptions +warnings.simplefilter("once", RuntimeWarning) + _logger = logging.getLogger(__name__) # These dict exists so that: @@ -35,7 +38,6 @@ "lambda": "feast.infra.materialization.lambda.lambda_engine.LambdaMaterializationEngine", } - ONLINE_STORE_CLASS_FOR_TYPE = { "sqlite": "feast.infra.online_stores.sqlite.SqliteOnlineStore", "datastore": "feast.infra.online_stores.datastore.DatastoreOnlineStore", @@ -140,6 +142,15 @@ class RepoConfig(FeastBaseModel): go_feature_retrieval: Optional[bool] = False entity_key_serialization_version: StrictInt = 0 + """ Entity key serialization version: This version is used to control what serialization scheme is + used when writing data to the online store. + A value <= 1 uses the serialization scheme used by feast up to Feast 0.22. + A value of 2 uses a newer serialization scheme, supported as of Feast 0.23. + The main difference between the two scheme is that the serialization scheme v1 stored `long` values as `int`s, + which would result in errors trying to serialize a range of values. + v2 fixes this error, but v1 is kept around to ensure backwards compatibility - specifically the ability to read + feature values for entities that have already been written into the online store. + """ def __init__(self, **data: Any): super().__init__(**data) @@ -180,6 +191,16 @@ def __init__(self, **data: Any): self.feature_server["type"] )(**self.feature_server) + if self.entity_key_serialization_version <= 1: + warnings.warn( + "`entity_key_serialization_version` is either not specified in the feature_store.yaml, " + "or is specified to a value <= 1." + "This serialization version may cause errors when trying to write fields with the `Long` data type" + " into the online store. Specifying `entity_key_serialization_version` to 2 is recommended for" + " new projects. ", + RuntimeWarning, + ) + def get_registry_config(self): if isinstance(self.registry, str): return RegistryConfig(path=self.registry) @@ -379,7 +400,7 @@ def write_to_path(self, repo_path: Path): config_path = repo_path / "feature_store.yaml" with open(config_path, mode="w") as f: yaml.dump( - yaml.safe_load(self.json(exclude={"repo_path"}, exclude_unset=True,)), + yaml.safe_load(self.json(exclude={"repo_path"}, exclude_unset=True, )), f, sort_keys=False, ) From 4aa2ba71dee40ebbe1783b24f80f8d390e196d4e Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Tue, 19 Jul 2022 14:33:34 -0700 Subject: [PATCH 18/19] docs Signed-off-by: Achal Shah --- sdk/python/feast/repo_config.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index eb836b31d0..fbcc6c8787 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -142,14 +142,14 @@ class RepoConfig(FeastBaseModel): go_feature_retrieval: Optional[bool] = False entity_key_serialization_version: StrictInt = 0 - """ Entity key serialization version: This version is used to control what serialization scheme is - used when writing data to the online store. - A value <= 1 uses the serialization scheme used by feast up to Feast 0.22. + """ Entity key serialization version: This version is used to control what serialization scheme is + used when writing data to the online store. + A value <= 1 uses the serialization scheme used by feast up to Feast 0.22. A value of 2 uses a newer serialization scheme, supported as of Feast 0.23. - The main difference between the two scheme is that the serialization scheme v1 stored `long` values as `int`s, + The main difference between the two scheme is that the serialization scheme v1 stored `long` values as `int`s, which would result in errors trying to serialize a range of values. v2 fixes this error, but v1 is kept around to ensure backwards compatibility - specifically the ability to read - feature values for entities that have already been written into the online store. + feature values for entities that have already been written into the online store. """ def __init__(self, **data: Any): @@ -400,7 +400,7 @@ def write_to_path(self, repo_path: Path): config_path = repo_path / "feature_store.yaml" with open(config_path, mode="w") as f: yaml.dump( - yaml.safe_load(self.json(exclude={"repo_path"}, exclude_unset=True, )), + yaml.safe_load(self.json(exclude={"repo_path"}, exclude_unset=True,)), f, sort_keys=False, ) From 82259d40d2d200b0701db251162e40c4fccd3a57 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Tue, 19 Jul 2022 15:06:57 -0700 Subject: [PATCH 19/19] docs Signed-off-by: Achal Shah --- sdk/python/feast/repo_config.py | 2 +- sdk/python/tests/unit/infra/test_key_encoding_utils.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index fbcc6c8787..0823b93ba6 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -141,7 +141,7 @@ class RepoConfig(FeastBaseModel): go_feature_retrieval: Optional[bool] = False - entity_key_serialization_version: StrictInt = 0 + entity_key_serialization_version: StrictInt = 1 """ Entity key serialization version: This version is used to control what serialization scheme is used when writing data to the online store. A value <= 1 uses the serialization scheme used by feast up to Feast 0.22. diff --git a/sdk/python/tests/unit/infra/test_key_encoding_utils.py b/sdk/python/tests/unit/infra/test_key_encoding_utils.py index eb1cb67c6d..449d6819a1 100644 --- a/sdk/python/tests/unit/infra/test_key_encoding_utils.py +++ b/sdk/python/tests/unit/infra/test_key_encoding_utils.py @@ -21,7 +21,7 @@ def test_serialize_entity_key(): entity_key_serialization_version=2, ) - # Old serilization scheme, should fail. + # Old serialization scheme, should fail. with pytest.raises(BaseException): serialize_entity_key( EntityKeyProto(