From 14a9e103f8f809d11a4c8c8d0bb32ac3f41380dd Mon Sep 17 00:00:00 2001 From: tokoko Date: Wed, 7 Feb 2024 22:38:50 +0000 Subject: [PATCH 1/3] add remote registry Signed-off-by: tokoko --- sdk/python/feast/feature_store.py | 4 + .../feast/infra/registry/base_registry.py | 2 +- sdk/python/feast/infra/registry/registry.py | 4 + sdk/python/feast/infra/registry/remote.py | 375 ++++++++++++++++++ sdk/python/feast/repo_config.py | 1 + .../tests/unit/infra/registry/test_remote.py | 60 +++ 6 files changed, 445 insertions(+), 1 deletion(-) create mode 100644 sdk/python/feast/infra/registry/remote.py create mode 100644 sdk/python/tests/unit/infra/registry/test_remote.py diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 4a53672b2e..e38120c33d 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -164,6 +164,10 @@ def __init__( self._registry = SnowflakeRegistry( registry_config, self.config.project, None ) + elif registry_config and registry_config.registry_type == "remote": + from feast.infra.registry.remote import RemoteRegistry + + self._registry = RemoteRegistry(registry_config, self.config.project, None) else: r = Registry(self.config.project, registry_config, repo_path=self.repo_path) r._initialize_registry(self.config.project) diff --git a/sdk/python/feast/infra/registry/base_registry.py b/sdk/python/feast/infra/registry/base_registry.py index 8928a5800d..e68fdd0eb8 100644 --- a/sdk/python/feast/infra/registry/base_registry.py +++ b/sdk/python/feast/infra/registry/base_registry.py @@ -232,7 +232,7 @@ def delete_feature_view(self, name: str, project: str, commit: bool = True): @abstractmethod def get_stream_feature_view( self, name: str, project: str, allow_cache: bool = False - ): + ) -> StreamFeatureView: """ Retrieves a stream feature view. diff --git a/sdk/python/feast/infra/registry/registry.py b/sdk/python/feast/infra/registry/registry.py index fc7be75e0d..a9d6c44f38 100644 --- a/sdk/python/feast/infra/registry/registry.py +++ b/sdk/python/feast/infra/registry/registry.py @@ -178,6 +178,10 @@ def __new__( from feast.infra.registry.snowflake import SnowflakeRegistry return SnowflakeRegistry(registry_config, project, repo_path) + elif registry_config and registry_config.registry_type == "remote": + from feast.infra.registry.remote import RemoteRegistry + + return RemoteRegistry(registry_config, project, repo_path) else: return super(Registry, cls).__new__(cls) diff --git a/sdk/python/feast/infra/registry/remote.py b/sdk/python/feast/infra/registry/remote.py new file mode 100644 index 0000000000..2f463065c7 --- /dev/null +++ b/sdk/python/feast/infra/registry/remote.py @@ -0,0 +1,375 @@ +import logging +import grpc +from datetime import datetime +from pathlib import Path +from typing import List, Optional, Union +from google.protobuf.timestamp_pb2 import Timestamp +from google.protobuf.empty_pb2 import Empty +from pydantic import StrictStr +from feast.base_feature_view import BaseFeatureView +from feast.data_source import DataSource +from feast.entity import Entity +from feast.feature_service import FeatureService +from feast.feature_view import FeatureView +from feast.request_feature_view import RequestFeatureView +from feast.saved_dataset import SavedDataset, ValidationReference +from feast.stream_feature_view import StreamFeatureView +from feast.on_demand_feature_view import OnDemandFeatureView +from feast.infra.infra_object import Infra +from feast.project_metadata import ProjectMetadata +from feast.infra.registry.base_registry import BaseRegistry +from feast.repo_config import RegistryConfig +from feast.protos.feast.registry import RegistryServer_pb2_grpc, RegistryServer_pb2 +from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto + +class RemoteRegistryConfig(RegistryConfig): + registry_type: StrictStr = "remote" + """ str: Provider name or a class name that implements Registry.""" + + path: StrictStr = "" + """ str: Path to metadata store. + If registry_type is 'remote', then this is a URL for registry server """ + +class RemoteRegistry(BaseRegistry): + def __init__( + self, + registry_config: Optional[Union[RegistryConfig, RemoteRegistryConfig]], + project: str, + repo_path: Optional[Path], + ): + self.channel = grpc.insecure_channel(registry_config.path) + self.stub = RegistryServer_pb2_grpc.RegistryServerStub(self.channel) + + def apply_entity(self, entity: Entity, project: str, commit: bool = True): + raise Exception('Unimplemented: remote registry is read-only') + + def delete_entity(self, name: str, project: str, commit: bool = True): + raise Exception('Unimplemented: remote registry is read-only') + + def get_entity(self, name: str, project: str, allow_cache: bool = False) -> Entity: + request = RegistryServer_pb2.GetEntityRequest( + name=name, + project=project, + allow_cache=allow_cache + ) + + response = self.stub.GetEntity(request) + + return Entity.from_proto(response) + + def list_entities(self, project: str, allow_cache: bool = False) -> List[Entity]: + request = RegistryServer_pb2.ListEntitiesRequest( + project=project, + allow_cache=allow_cache + ) + + response = self.stub.ListEntities(request) + + return [Entity.from_proto(entity) for entity in response.entities] + + def apply_data_source( + self, data_source: DataSource, project: str, commit: bool = True + ): + raise Exception('Unimplemented: remote registry is read-only') + + def delete_data_source(self, name: str, project: str, commit: bool = True): + raise Exception('Unimplemented: remote registry is read-only') + + def get_data_source( + self, name: str, project: str, allow_cache: bool = False + ) -> DataSource: + request = RegistryServer_pb2.GetDataSourceRequest( + name=name, + project=project, + allow_cache=allow_cache + ) + + response = self.stub.GetDataSource(request) + + return DataSource.from_proto(response) + + def list_data_sources( + self, project: str, allow_cache: bool = False + ) -> List[DataSource]: + request = RegistryServer_pb2.ListDataSourcesRequest( + project=project, + allow_cache=allow_cache + ) + + response = self.stub.ListDataSources(request) + + return [DataSource.from_proto(data_source) for data_source in response.data_sources] + + def apply_feature_service( + self, feature_service: FeatureService, project: str, commit: bool = True + ): + raise Exception('Unimplemented: remote registry is read-only') + + def delete_feature_service(self, name: str, project: str, commit: bool = True): + raise Exception('Unimplemented: remote registry is read-only') + + def get_feature_service( + self, name: str, project: str, allow_cache: bool = False + ) -> FeatureService: + request = RegistryServer_pb2.GetFeatureServiceRequest( + name=name, + project=project, + allow_cache=allow_cache + ) + + response = self.stub.GetFeatureService(request) + + return FeatureService.from_proto(response) + + def list_feature_services( + self, project: str, allow_cache: bool = False + ) -> List[FeatureService]: + request = RegistryServer_pb2.ListFeatureServicesRequest( + project=project, + allow_cache=allow_cache + ) + + response = self.stub.ListFeatureServices(request) + + return [FeatureService.from_proto(feature_service) for feature_service in response.feature_services] + + def apply_feature_view( + self, feature_view: BaseFeatureView, project: str, commit: bool = True + ): + raise Exception('Unimplemented: remote registry is read-only') + + def delete_feature_view(self, name: str, project: str, commit: bool = True): + raise Exception('Unimplemented: remote registry is read-only') + + def get_stream_feature_view( + self, name: str, project: str, allow_cache: bool = False + ) -> StreamFeatureView: + request = RegistryServer_pb2.GetStreamFeatureViewRequest( + name=name, + project=project, + allow_cache=allow_cache + ) + + response = self.stub.GetStreamFeatureView(request) + + return StreamFeatureView.from_proto(response) + + def list_stream_feature_views( + self, project: str, allow_cache: bool = False + ) -> List[StreamFeatureView]: + request = RegistryServer_pb2.ListStreamFeatureViewsRequest( + project=project, + allow_cache=allow_cache + ) + + response = self.stub.ListStreamFeatureViews(request) + + return [StreamFeatureView.from_proto(stream_feature_view) for stream_feature_view in response.stream_feature_views] + + def get_on_demand_feature_view( + self, name: str, project: str, allow_cache: bool = False + ) -> OnDemandFeatureView: + request = RegistryServer_pb2.GetOnDemandFeatureViewRequest( + name=name, + project=project, + allow_cache=allow_cache + ) + + response = self.stub.GetOnDemandFeatureView(request) + + return OnDemandFeatureView.from_proto(response) + + def list_on_demand_feature_views( + self, project: str, allow_cache: bool = False + ) -> List[OnDemandFeatureView]: + request = RegistryServer_pb2.ListOnDemandFeatureViewsRequest( + project=project, + allow_cache=allow_cache + ) + + response = self.stub.ListOnDemandFeatureViews(request) + + return [OnDemandFeatureView.from_proto(on_demand_feature_view) for on_demand_feature_view in response.on_demand_feature_views] + + def get_feature_view( + self, name: str, project: str, allow_cache: bool = False + ) -> FeatureView: + request = RegistryServer_pb2.GetFeatureViewRequest( + name=name, + project=project, + allow_cache=allow_cache + ) + + response = self.stub.GetFeatureView(request) + + return FeatureView.from_proto(response) + + def list_feature_views( + self, project: str, allow_cache: bool = False + ) -> List[FeatureView]: + request = RegistryServer_pb2.ListFeatureViewsRequest( + project=project, + allow_cache=allow_cache + ) + + response = self.stub.ListFeatureViews(request) + + return [FeatureView.from_proto(feature_view) for feature_view in response.feature_views] + + def get_request_feature_view( + self, name: str, project: str, allow_cache: bool = False + ) -> RequestFeatureView: + request = RegistryServer_pb2.GetRequestFeatureViewRequest( + name=name, + project=project, + allow_cache=allow_cache + ) + + response = self.stub.GetRequestFeatureView(request) + + return RequestFeatureView.from_proto(response) + + def list_request_feature_views( + self, project: str, allow_cache: bool = False + ) -> List[RequestFeatureView]: + request = RegistryServer_pb2.ListRequestFeatureViewsRequest( + project=project, + allow_cache=allow_cache + ) + + response = self.stub.ListRequestFeatureViews(request) + + return [RequestFeatureView.from_proto(request_feature_view) for request_feature_view in response.request_feature_views] + + def apply_materialization( + self, + feature_view: FeatureView, + project: str, + start_date: datetime, + end_date: datetime, + commit: bool = True, + ): + raise Exception('Unimplemented: remote registry is read-only') + + def apply_saved_dataset( + self, + saved_dataset: SavedDataset, + project: str, + commit: bool = True, + ): + raise Exception('Unimplemented: remote registry is read-only') + + def delete_saved_dataset(self, name: str, project: str, allow_cache: bool = False): + raise Exception('Unimplemented: remote registry is read-only') + + def get_saved_dataset( + self, name: str, project: str, allow_cache: bool = False + ) -> SavedDataset: + request = RegistryServer_pb2.GetSavedDatasetRequest( + name=name, + project=project, + allow_cache=allow_cache + ) + + response = self.stub.GetSavedDataset(request) + + return SavedDataset.from_proto(response) + + def list_saved_datasets( + self, project: str, allow_cache: bool = False + ) -> List[SavedDataset]: + request = RegistryServer_pb2.ListSavedDatasetsResponse( + project=project, + allow_cache=allow_cache + ) + + response = self.stub.ListSavedDatasets(request) + + return [SavedDataset.from_proto(saved_dataset) for saved_dataset in response.saved_datasets] + + def apply_validation_reference( + self, + validation_reference: ValidationReference, + project: str, + commit: bool = True, + ): + raise Exception('Unimplemented: remote registry is read-only') + + def delete_validation_reference(self, name: str, project: str, commit: bool = True): + raise Exception('Unimplemented: remote registry is read-only') + + def get_validation_reference( + self, name: str, project: str, allow_cache: bool = False + ) -> ValidationReference: + request = RegistryServer_pb2.GetValidationReferenceRequest( + name=name, + project=project, + allow_cache=allow_cache + ) + + response = self.stub.GetValidationReference(request) + + return ValidationReference.from_proto(response) + + def list_validation_references( + self, project: str, allow_cache: bool = False + ) -> List[ValidationReference]: + request = RegistryServer_pb2.ListValidationReferencesRequest( + project=project, + allow_cache=allow_cache + ) + + response = self.stub.ListValidationReferences(request) + + return [ValidationReference.from_proto(validation_reference) for validation_reference in response.validation_references] + + def list_project_metadata( + self, project: str, allow_cache: bool = False + ) -> List[ProjectMetadata]: + request = RegistryServer_pb2.ListProjectMetadataRequest( + project=project, + allow_cache=allow_cache + ) + + response = self.stub.ListProjectMetadata(request) + + return [ProjectMetadata.from_proto(pm) for pm in response.project_metadata] + + def update_infra(self, infra: Infra, project: str, commit: bool = True): + raise Exception('Unimplemented: remote registry is read-only') + + def get_infra(self, project: str, allow_cache: bool = False) -> Infra: + request = RegistryServer_pb2.GetInfraRequest( + project=project, + allow_cache=allow_cache + ) + + response = self.stub.GetInfra(request) + + return Infra.from_proto(response) + + def apply_user_metadata( + self, + project: str, + feature_view: BaseFeatureView, + metadata_bytes: Optional[bytes], + ): + pass + + def get_user_metadata( + self, project: str, feature_view: BaseFeatureView + ) -> Optional[bytes]: + pass + + def proto(self) -> RegistryProto: + return self.stub.Proto(Empty()) + + def commit(self): + raise Exception('Unimplemented: remote registry is read-only') + + def refresh(self, project: Optional[str] = None): + request = RegistryServer_pb2.RefreshRequest( + project=project + ) + + self.stub.Refresh(request) \ No newline at end of file diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index 3461ae058b..eafc57228a 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -40,6 +40,7 @@ "file": "feast.infra.registry.registry.Registry", "sql": "feast.infra.registry.sql.SqlRegistry", "snowflake.registry": "feast.infra.registry.snowflake.SnowflakeRegistry", + "remote": "feast.infra.registry.remote.RemoteRegistry", } BATCH_ENGINE_CLASS_FOR_TYPE = { diff --git a/sdk/python/tests/unit/infra/registry/test_remote.py b/sdk/python/tests/unit/infra/registry/test_remote.py new file mode 100644 index 0000000000..93d7658c66 --- /dev/null +++ b/sdk/python/tests/unit/infra/registry/test_remote.py @@ -0,0 +1,60 @@ +import assertpy +import grpc_testing +import pytest +from google.protobuf.empty_pb2 import Empty + +from feast import Entity, FeatureStore +from feast.protos.feast.registry import RegistryServer_pb2, RegistryServer_pb2_grpc +from feast.registry_server import RegistryServer +from feast.infra.registry.remote import RemoteRegistry, RemoteRegistryConfig + + +class GrpcMockChannel: + def __init__(self, service, servicer): + self.service = service + self.test_server = grpc_testing.server_from_dictionary( + {service: servicer}, + grpc_testing.strict_real_time(), + ) + + def unary_unary(self, method: str, request_serializer=None, response_deserializer=None): + method_name = method.split('/')[-1] + method_descriptor = self.service.methods_by_name[method_name] + + def handler(request): + rpc = self.test_server.invoke_unary_unary( + method_descriptor, (), request, None + ) + + response, trailing_metadata, code, details = rpc.termination() + return response + + return handler + +@pytest.fixture +def mock_remote_registry(environment): + store: FeatureStore = environment.feature_store + registry = RemoteRegistry(registry_config=RemoteRegistryConfig(path=''), project=None, repo_path=None) + mock_channel = GrpcMockChannel(RegistryServer_pb2.DESCRIPTOR.services_by_name["RegistryServer"], RegistryServer(store=store)) + registry.stub = RegistryServer_pb2_grpc.RegistryServerStub(mock_channel) + return registry + +def test_registry_server_get_entity(environment, mock_remote_registry): + store: FeatureStore = environment.feature_store + entity = Entity(name="driver", join_keys=["driver_id"]) + store.apply(entity) + + expected = store.get_entity(entity.name) + response_entity = mock_remote_registry.get_entity(entity.name, store.project) + + assertpy.assert_that(response_entity).is_equal_to(expected) + +def test_registry_server_proto(environment, mock_remote_registry): + store: FeatureStore = environment.feature_store + entity = Entity(name="driver", join_keys=["driver_id"]) + store.apply(entity) + + expected = store.registry.proto() + response = mock_remote_registry.proto() + + assertpy.assert_that(response).is_equal_to(expected) \ No newline at end of file From 9aa0370a5cfe85bb1afefe656dc2a157aa9cdb42 Mon Sep 17 00:00:00 2001 From: tokoko Date: Wed, 7 Feb 2024 23:11:17 +0000 Subject: [PATCH 2/3] format and lint remote registry code Signed-off-by: tokoko --- sdk/python/feast/infra/registry/remote.py | 172 +++++++++--------- .../tests/unit/infra/registry/test_remote.py | 25 ++- 2 files changed, 100 insertions(+), 97 deletions(-) diff --git a/sdk/python/feast/infra/registry/remote.py b/sdk/python/feast/infra/registry/remote.py index 2f463065c7..231bdab5da 100644 --- a/sdk/python/feast/infra/registry/remote.py +++ b/sdk/python/feast/infra/registry/remote.py @@ -1,26 +1,27 @@ -import logging -import grpc from datetime import datetime from pathlib import Path from typing import List, Optional, Union -from google.protobuf.timestamp_pb2 import Timestamp + +import grpc from google.protobuf.empty_pb2 import Empty from pydantic import StrictStr + from feast.base_feature_view import BaseFeatureView from feast.data_source import DataSource from feast.entity import Entity from feast.feature_service import FeatureService from feast.feature_view import FeatureView -from feast.request_feature_view import RequestFeatureView -from feast.saved_dataset import SavedDataset, ValidationReference -from feast.stream_feature_view import StreamFeatureView -from feast.on_demand_feature_view import OnDemandFeatureView from feast.infra.infra_object import Infra -from feast.project_metadata import ProjectMetadata from feast.infra.registry.base_registry import BaseRegistry -from feast.repo_config import RegistryConfig -from feast.protos.feast.registry import RegistryServer_pb2_grpc, RegistryServer_pb2 +from feast.on_demand_feature_view import OnDemandFeatureView +from feast.project_metadata import ProjectMetadata from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto +from feast.protos.feast.registry import RegistryServer_pb2, RegistryServer_pb2_grpc +from feast.repo_config import RegistryConfig +from feast.request_feature_view import RequestFeatureView +from feast.saved_dataset import SavedDataset, ValidationReference +from feast.stream_feature_view import StreamFeatureView + class RemoteRegistryConfig(RegistryConfig): registry_type: StrictStr = "remote" @@ -30,10 +31,11 @@ class RemoteRegistryConfig(RegistryConfig): """ str: Path to metadata store. If registry_type is 'remote', then this is a URL for registry server """ + class RemoteRegistry(BaseRegistry): def __init__( self, - registry_config: Optional[Union[RegistryConfig, RemoteRegistryConfig]], + registry_config: Union[RegistryConfig, RemoteRegistryConfig], project: str, repo_path: Optional[Path], ): @@ -41,16 +43,14 @@ def __init__( self.stub = RegistryServer_pb2_grpc.RegistryServerStub(self.channel) def apply_entity(self, entity: Entity, project: str, commit: bool = True): - raise Exception('Unimplemented: remote registry is read-only') + raise Exception("Unimplemented: remote registry is read-only") def delete_entity(self, name: str, project: str, commit: bool = True): - raise Exception('Unimplemented: remote registry is read-only') + raise Exception("Unimplemented: remote registry is read-only") def get_entity(self, name: str, project: str, allow_cache: bool = False) -> Entity: request = RegistryServer_pb2.GetEntityRequest( - name=name, - project=project, - allow_cache=allow_cache + name=name, project=project, allow_cache=allow_cache ) response = self.stub.GetEntity(request) @@ -59,8 +59,7 @@ def get_entity(self, name: str, project: str, allow_cache: bool = False) -> Enti def list_entities(self, project: str, allow_cache: bool = False) -> List[Entity]: request = RegistryServer_pb2.ListEntitiesRequest( - project=project, - allow_cache=allow_cache + project=project, allow_cache=allow_cache ) response = self.stub.ListEntities(request) @@ -70,18 +69,16 @@ def list_entities(self, project: str, allow_cache: bool = False) -> List[Entity] def apply_data_source( self, data_source: DataSource, project: str, commit: bool = True ): - raise Exception('Unimplemented: remote registry is read-only') + raise Exception("Unimplemented: remote registry is read-only") def delete_data_source(self, name: str, project: str, commit: bool = True): - raise Exception('Unimplemented: remote registry is read-only') + raise Exception("Unimplemented: remote registry is read-only") def get_data_source( self, name: str, project: str, allow_cache: bool = False ) -> DataSource: request = RegistryServer_pb2.GetDataSourceRequest( - name=name, - project=project, - allow_cache=allow_cache + name=name, project=project, allow_cache=allow_cache ) response = self.stub.GetDataSource(request) @@ -92,29 +89,28 @@ def list_data_sources( self, project: str, allow_cache: bool = False ) -> List[DataSource]: request = RegistryServer_pb2.ListDataSourcesRequest( - project=project, - allow_cache=allow_cache + project=project, allow_cache=allow_cache ) response = self.stub.ListDataSources(request) - return [DataSource.from_proto(data_source) for data_source in response.data_sources] + return [ + DataSource.from_proto(data_source) for data_source in response.data_sources + ] def apply_feature_service( self, feature_service: FeatureService, project: str, commit: bool = True ): - raise Exception('Unimplemented: remote registry is read-only') + raise Exception("Unimplemented: remote registry is read-only") def delete_feature_service(self, name: str, project: str, commit: bool = True): - raise Exception('Unimplemented: remote registry is read-only') + raise Exception("Unimplemented: remote registry is read-only") def get_feature_service( self, name: str, project: str, allow_cache: bool = False ) -> FeatureService: request = RegistryServer_pb2.GetFeatureServiceRequest( - name=name, - project=project, - allow_cache=allow_cache + name=name, project=project, allow_cache=allow_cache ) response = self.stub.GetFeatureService(request) @@ -125,29 +121,29 @@ def list_feature_services( self, project: str, allow_cache: bool = False ) -> List[FeatureService]: request = RegistryServer_pb2.ListFeatureServicesRequest( - project=project, - allow_cache=allow_cache + project=project, allow_cache=allow_cache ) response = self.stub.ListFeatureServices(request) - return [FeatureService.from_proto(feature_service) for feature_service in response.feature_services] + return [ + FeatureService.from_proto(feature_service) + for feature_service in response.feature_services + ] def apply_feature_view( self, feature_view: BaseFeatureView, project: str, commit: bool = True ): - raise Exception('Unimplemented: remote registry is read-only') + raise Exception("Unimplemented: remote registry is read-only") def delete_feature_view(self, name: str, project: str, commit: bool = True): - raise Exception('Unimplemented: remote registry is read-only') + raise Exception("Unimplemented: remote registry is read-only") def get_stream_feature_view( self, name: str, project: str, allow_cache: bool = False ) -> StreamFeatureView: request = RegistryServer_pb2.GetStreamFeatureViewRequest( - name=name, - project=project, - allow_cache=allow_cache + name=name, project=project, allow_cache=allow_cache ) response = self.stub.GetStreamFeatureView(request) @@ -158,21 +154,21 @@ def list_stream_feature_views( self, project: str, allow_cache: bool = False ) -> List[StreamFeatureView]: request = RegistryServer_pb2.ListStreamFeatureViewsRequest( - project=project, - allow_cache=allow_cache + project=project, allow_cache=allow_cache ) response = self.stub.ListStreamFeatureViews(request) - return [StreamFeatureView.from_proto(stream_feature_view) for stream_feature_view in response.stream_feature_views] + return [ + StreamFeatureView.from_proto(stream_feature_view) + for stream_feature_view in response.stream_feature_views + ] def get_on_demand_feature_view( self, name: str, project: str, allow_cache: bool = False ) -> OnDemandFeatureView: request = RegistryServer_pb2.GetOnDemandFeatureViewRequest( - name=name, - project=project, - allow_cache=allow_cache + name=name, project=project, allow_cache=allow_cache ) response = self.stub.GetOnDemandFeatureView(request) @@ -183,21 +179,21 @@ def list_on_demand_feature_views( self, project: str, allow_cache: bool = False ) -> List[OnDemandFeatureView]: request = RegistryServer_pb2.ListOnDemandFeatureViewsRequest( - project=project, - allow_cache=allow_cache + project=project, allow_cache=allow_cache ) response = self.stub.ListOnDemandFeatureViews(request) - return [OnDemandFeatureView.from_proto(on_demand_feature_view) for on_demand_feature_view in response.on_demand_feature_views] - + return [ + OnDemandFeatureView.from_proto(on_demand_feature_view) + for on_demand_feature_view in response.on_demand_feature_views + ] + def get_feature_view( self, name: str, project: str, allow_cache: bool = False ) -> FeatureView: request = RegistryServer_pb2.GetFeatureViewRequest( - name=name, - project=project, - allow_cache=allow_cache + name=name, project=project, allow_cache=allow_cache ) response = self.stub.GetFeatureView(request) @@ -208,21 +204,21 @@ def list_feature_views( self, project: str, allow_cache: bool = False ) -> List[FeatureView]: request = RegistryServer_pb2.ListFeatureViewsRequest( - project=project, - allow_cache=allow_cache + project=project, allow_cache=allow_cache ) response = self.stub.ListFeatureViews(request) - return [FeatureView.from_proto(feature_view) for feature_view in response.feature_views] + return [ + FeatureView.from_proto(feature_view) + for feature_view in response.feature_views + ] def get_request_feature_view( self, name: str, project: str, allow_cache: bool = False ) -> RequestFeatureView: request = RegistryServer_pb2.GetRequestFeatureViewRequest( - name=name, - project=project, - allow_cache=allow_cache + name=name, project=project, allow_cache=allow_cache ) response = self.stub.GetRequestFeatureView(request) @@ -233,13 +229,15 @@ def list_request_feature_views( self, project: str, allow_cache: bool = False ) -> List[RequestFeatureView]: request = RegistryServer_pb2.ListRequestFeatureViewsRequest( - project=project, - allow_cache=allow_cache + project=project, allow_cache=allow_cache ) response = self.stub.ListRequestFeatureViews(request) - return [RequestFeatureView.from_proto(request_feature_view) for request_feature_view in response.request_feature_views] + return [ + RequestFeatureView.from_proto(request_feature_view) + for request_feature_view in response.request_feature_views + ] def apply_materialization( self, @@ -249,7 +247,7 @@ def apply_materialization( end_date: datetime, commit: bool = True, ): - raise Exception('Unimplemented: remote registry is read-only') + raise Exception("Unimplemented: remote registry is read-only") def apply_saved_dataset( self, @@ -257,18 +255,16 @@ def apply_saved_dataset( project: str, commit: bool = True, ): - raise Exception('Unimplemented: remote registry is read-only') + raise Exception("Unimplemented: remote registry is read-only") def delete_saved_dataset(self, name: str, project: str, allow_cache: bool = False): - raise Exception('Unimplemented: remote registry is read-only') + raise Exception("Unimplemented: remote registry is read-only") def get_saved_dataset( self, name: str, project: str, allow_cache: bool = False ) -> SavedDataset: request = RegistryServer_pb2.GetSavedDatasetRequest( - name=name, - project=project, - allow_cache=allow_cache + name=name, project=project, allow_cache=allow_cache ) response = self.stub.GetSavedDataset(request) @@ -278,14 +274,16 @@ def get_saved_dataset( def list_saved_datasets( self, project: str, allow_cache: bool = False ) -> List[SavedDataset]: - request = RegistryServer_pb2.ListSavedDatasetsResponse( - project=project, - allow_cache=allow_cache + request = RegistryServer_pb2.ListSavedDatasetsRequest( + project=project, allow_cache=allow_cache ) response = self.stub.ListSavedDatasets(request) - return [SavedDataset.from_proto(saved_dataset) for saved_dataset in response.saved_datasets] + return [ + SavedDataset.from_proto(saved_dataset) + for saved_dataset in response.saved_datasets + ] def apply_validation_reference( self, @@ -293,18 +291,16 @@ def apply_validation_reference( project: str, commit: bool = True, ): - raise Exception('Unimplemented: remote registry is read-only') + raise Exception("Unimplemented: remote registry is read-only") def delete_validation_reference(self, name: str, project: str, commit: bool = True): - raise Exception('Unimplemented: remote registry is read-only') + raise Exception("Unimplemented: remote registry is read-only") def get_validation_reference( self, name: str, project: str, allow_cache: bool = False ) -> ValidationReference: request = RegistryServer_pb2.GetValidationReferenceRequest( - name=name, - project=project, - allow_cache=allow_cache + name=name, project=project, allow_cache=allow_cache ) response = self.stub.GetValidationReference(request) @@ -315,20 +311,21 @@ def list_validation_references( self, project: str, allow_cache: bool = False ) -> List[ValidationReference]: request = RegistryServer_pb2.ListValidationReferencesRequest( - project=project, - allow_cache=allow_cache + project=project, allow_cache=allow_cache ) response = self.stub.ListValidationReferences(request) - return [ValidationReference.from_proto(validation_reference) for validation_reference in response.validation_references] + return [ + ValidationReference.from_proto(validation_reference) + for validation_reference in response.validation_references + ] def list_project_metadata( self, project: str, allow_cache: bool = False ) -> List[ProjectMetadata]: request = RegistryServer_pb2.ListProjectMetadataRequest( - project=project, - allow_cache=allow_cache + project=project, allow_cache=allow_cache ) response = self.stub.ListProjectMetadata(request) @@ -336,12 +333,11 @@ def list_project_metadata( return [ProjectMetadata.from_proto(pm) for pm in response.project_metadata] def update_infra(self, infra: Infra, project: str, commit: bool = True): - raise Exception('Unimplemented: remote registry is read-only') + raise Exception("Unimplemented: remote registry is read-only") def get_infra(self, project: str, allow_cache: bool = False) -> Infra: request = RegistryServer_pb2.GetInfraRequest( - project=project, - allow_cache=allow_cache + project=project, allow_cache=allow_cache ) response = self.stub.GetInfra(request) @@ -365,11 +361,9 @@ def proto(self) -> RegistryProto: return self.stub.Proto(Empty()) def commit(self): - raise Exception('Unimplemented: remote registry is read-only') + raise Exception("Unimplemented: remote registry is read-only") def refresh(self, project: Optional[str] = None): - request = RegistryServer_pb2.RefreshRequest( - project=project - ) + request = RegistryServer_pb2.RefreshRequest(project=str(project)) - self.stub.Refresh(request) \ No newline at end of file + self.stub.Refresh(request) diff --git a/sdk/python/tests/unit/infra/registry/test_remote.py b/sdk/python/tests/unit/infra/registry/test_remote.py index 93d7658c66..16c6f0abfb 100644 --- a/sdk/python/tests/unit/infra/registry/test_remote.py +++ b/sdk/python/tests/unit/infra/registry/test_remote.py @@ -1,12 +1,11 @@ import assertpy import grpc_testing import pytest -from google.protobuf.empty_pb2 import Empty from feast import Entity, FeatureStore +from feast.infra.registry.remote import RemoteRegistry, RemoteRegistryConfig from feast.protos.feast.registry import RegistryServer_pb2, RegistryServer_pb2_grpc from feast.registry_server import RegistryServer -from feast.infra.registry.remote import RemoteRegistry, RemoteRegistryConfig class GrpcMockChannel: @@ -17,10 +16,12 @@ def __init__(self, service, servicer): grpc_testing.strict_real_time(), ) - def unary_unary(self, method: str, request_serializer=None, response_deserializer=None): - method_name = method.split('/')[-1] + def unary_unary( + self, method: str, request_serializer=None, response_deserializer=None + ): + method_name = method.split("/")[-1] method_descriptor = self.service.methods_by_name[method_name] - + def handler(request): rpc = self.test_server.invoke_unary_unary( method_descriptor, (), request, None @@ -31,14 +32,21 @@ def handler(request): return handler + @pytest.fixture def mock_remote_registry(environment): store: FeatureStore = environment.feature_store - registry = RemoteRegistry(registry_config=RemoteRegistryConfig(path=''), project=None, repo_path=None) - mock_channel = GrpcMockChannel(RegistryServer_pb2.DESCRIPTOR.services_by_name["RegistryServer"], RegistryServer(store=store)) + registry = RemoteRegistry( + registry_config=RemoteRegistryConfig(path=""), project=None, repo_path=None + ) + mock_channel = GrpcMockChannel( + RegistryServer_pb2.DESCRIPTOR.services_by_name["RegistryServer"], + RegistryServer(store=store), + ) registry.stub = RegistryServer_pb2_grpc.RegistryServerStub(mock_channel) return registry + def test_registry_server_get_entity(environment, mock_remote_registry): store: FeatureStore = environment.feature_store entity = Entity(name="driver", join_keys=["driver_id"]) @@ -49,6 +57,7 @@ def test_registry_server_get_entity(environment, mock_remote_registry): assertpy.assert_that(response_entity).is_equal_to(expected) + def test_registry_server_proto(environment, mock_remote_registry): store: FeatureStore = environment.feature_store entity = Entity(name="driver", join_keys=["driver_id"]) @@ -57,4 +66,4 @@ def test_registry_server_proto(environment, mock_remote_registry): expected = store.registry.proto() response = mock_remote_registry.proto() - assertpy.assert_that(response).is_equal_to(expected) \ No newline at end of file + assertpy.assert_that(response).is_equal_to(expected) From 5cbf440ac8d3443d333471a31c2eab3d6fac40cf Mon Sep 17 00:00:00 2001 From: tokoko Date: Sun, 18 Feb 2024 12:58:26 +0000 Subject: [PATCH 3/3] add read-only registry exception Signed-off-by: tokoko --- sdk/python/feast/errors.py | 5 ++++ sdk/python/feast/infra/registry/remote.py | 31 ++++++++++++----------- 2 files changed, 21 insertions(+), 15 deletions(-) diff --git a/sdk/python/feast/errors.py b/sdk/python/feast/errors.py index 9097e40c94..b7151ff0c8 100644 --- a/sdk/python/feast/errors.py +++ b/sdk/python/feast/errors.py @@ -415,3 +415,8 @@ def __init__(self): class PushSourceNotFoundException(Exception): def __init__(self, push_source_name: str): super().__init__(f"Unable to find push source '{push_source_name}'.") + + +class ReadOnlyRegistryException(Exception): + def __init__(self): + super().__init__("Registry implementation is read-only.") diff --git a/sdk/python/feast/infra/registry/remote.py b/sdk/python/feast/infra/registry/remote.py index 231bdab5da..67d61ffec7 100644 --- a/sdk/python/feast/infra/registry/remote.py +++ b/sdk/python/feast/infra/registry/remote.py @@ -9,6 +9,7 @@ from feast.base_feature_view import BaseFeatureView from feast.data_source import DataSource from feast.entity import Entity +from feast.errors import ReadOnlyRegistryException from feast.feature_service import FeatureService from feast.feature_view import FeatureView from feast.infra.infra_object import Infra @@ -43,10 +44,10 @@ def __init__( self.stub = RegistryServer_pb2_grpc.RegistryServerStub(self.channel) def apply_entity(self, entity: Entity, project: str, commit: bool = True): - raise Exception("Unimplemented: remote registry is read-only") + raise ReadOnlyRegistryException() def delete_entity(self, name: str, project: str, commit: bool = True): - raise Exception("Unimplemented: remote registry is read-only") + raise ReadOnlyRegistryException() def get_entity(self, name: str, project: str, allow_cache: bool = False) -> Entity: request = RegistryServer_pb2.GetEntityRequest( @@ -69,10 +70,10 @@ def list_entities(self, project: str, allow_cache: bool = False) -> List[Entity] def apply_data_source( self, data_source: DataSource, project: str, commit: bool = True ): - raise Exception("Unimplemented: remote registry is read-only") + raise ReadOnlyRegistryException() def delete_data_source(self, name: str, project: str, commit: bool = True): - raise Exception("Unimplemented: remote registry is read-only") + raise ReadOnlyRegistryException() def get_data_source( self, name: str, project: str, allow_cache: bool = False @@ -101,10 +102,10 @@ def list_data_sources( def apply_feature_service( self, feature_service: FeatureService, project: str, commit: bool = True ): - raise Exception("Unimplemented: remote registry is read-only") + raise ReadOnlyRegistryException() def delete_feature_service(self, name: str, project: str, commit: bool = True): - raise Exception("Unimplemented: remote registry is read-only") + raise ReadOnlyRegistryException() def get_feature_service( self, name: str, project: str, allow_cache: bool = False @@ -134,10 +135,10 @@ def list_feature_services( def apply_feature_view( self, feature_view: BaseFeatureView, project: str, commit: bool = True ): - raise Exception("Unimplemented: remote registry is read-only") + raise ReadOnlyRegistryException() def delete_feature_view(self, name: str, project: str, commit: bool = True): - raise Exception("Unimplemented: remote registry is read-only") + raise ReadOnlyRegistryException() def get_stream_feature_view( self, name: str, project: str, allow_cache: bool = False @@ -247,7 +248,7 @@ def apply_materialization( end_date: datetime, commit: bool = True, ): - raise Exception("Unimplemented: remote registry is read-only") + raise ReadOnlyRegistryException() def apply_saved_dataset( self, @@ -255,10 +256,10 @@ def apply_saved_dataset( project: str, commit: bool = True, ): - raise Exception("Unimplemented: remote registry is read-only") + raise ReadOnlyRegistryException() def delete_saved_dataset(self, name: str, project: str, allow_cache: bool = False): - raise Exception("Unimplemented: remote registry is read-only") + raise ReadOnlyRegistryException() def get_saved_dataset( self, name: str, project: str, allow_cache: bool = False @@ -291,10 +292,10 @@ def apply_validation_reference( project: str, commit: bool = True, ): - raise Exception("Unimplemented: remote registry is read-only") + raise ReadOnlyRegistryException() def delete_validation_reference(self, name: str, project: str, commit: bool = True): - raise Exception("Unimplemented: remote registry is read-only") + raise ReadOnlyRegistryException() def get_validation_reference( self, name: str, project: str, allow_cache: bool = False @@ -333,7 +334,7 @@ def list_project_metadata( return [ProjectMetadata.from_proto(pm) for pm in response.project_metadata] def update_infra(self, infra: Infra, project: str, commit: bool = True): - raise Exception("Unimplemented: remote registry is read-only") + raise ReadOnlyRegistryException() def get_infra(self, project: str, allow_cache: bool = False) -> Infra: request = RegistryServer_pb2.GetInfraRequest( @@ -361,7 +362,7 @@ def proto(self) -> RegistryProto: return self.stub.Proto(Empty()) def commit(self): - raise Exception("Unimplemented: remote registry is read-only") + raise ReadOnlyRegistryException() def refresh(self, project: Optional[str] = None): request = RegistryServer_pb2.RefreshRequest(project=str(project))