diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 2ddd0c73bd..abc1e22a37 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -82,6 +82,59 @@ def _get_provider(self) -> Provider: def _get_registry(self) -> Registry: return Registry(self.config.metadata_store) + def list_entities(self) -> List[Entity]: + """ + Retrieve a list of entities from the registry + + Returns: + List of entities + """ + return self._get_registry().list_entities(self.project) + + def list_feature_views(self) -> List[FeatureView]: + """ + Retrieve a list of feature views from the registry + + Returns: + List of feature views + """ + return self._get_registry().list_feature_views(self.project) + + def get_entity(self, name: str) -> Entity: + """ + Retrieves an entity. + + Args: + name: Name of entity + + Returns: + Returns either the specified entity, or raises an exception if + none is found + """ + return self._get_registry().get_entity(name, self.project) + + def get_feature_view(self, name: str) -> FeatureView: + """ + Retrieves a feature view. + + Args: + name: Name of feature view + + Returns: + Returns either the specified feature view, or raises an exception if + none is found + """ + return self._get_registry().get_feature_view(name, self.project) + + def delete_feature_view(self, name: str): + """ + Deletes a feature view or raises an exception if not found. + + Args: + name: Name of feature view + """ + return self._get_registry().delete_feature_view(name, self.project) + def apply(self, objects: List[Union[FeatureView, Entity]]): """Register objects to metadata store and update related infrastructure. diff --git a/sdk/python/feast/registry.py b/sdk/python/feast/registry.py index dd224bcde0..84ec9becb3 100644 --- a/sdk/python/feast/registry.py +++ b/sdk/python/feast/registry.py @@ -200,7 +200,7 @@ def list_feature_views(self, project: str) -> List[FeatureView]: project: Filter feature tables based on project name Returns: - List of feature tables + List of feature views """ registry_proto = self._registry_store.get_registry() feature_views = [] diff --git a/sdk/python/tests/test_client.py b/sdk/python/tests/test_client.py index fcdbdd26ee..cf09fa236c 100644 --- a/sdk/python/tests/test_client.py +++ b/sdk/python/tests/test_client.py @@ -16,7 +16,6 @@ import socket from concurrent import futures from datetime import datetime, timedelta -from tempfile import mkstemp from typing import Tuple from unittest import mock @@ -135,24 +134,6 @@ def secure_mock_client_with_auth(self): client._serving_url = SERVING_URL return client - @pytest.fixture - def client_with_object_registry(self): - fd, path = mkstemp() - return Client(registry_path=path,) - - @pytest.fixture - def client_with_object_registry_gcs(self): - # clear any old registry left there - from google.cloud import storage - - storage_client = storage.Client() - bucket = storage_client.bucket("feast-registry-test") - blob = bucket.blob("test") - if blob.exists(): - blob.delete() - - return Client(registry_path="gs://feast-registry-test/test",) - @pytest.fixture def server_credentials(self): private_key = pkgutil.get_data(__name__, _PRIVATE_KEY_RESOURCE_PATH) @@ -389,12 +370,7 @@ def test_get_historical_features(self, mocked_client, mocker): assert 1 == 1 @pytest.mark.parametrize( - "test_client", - [ - lazy_fixture("client"), - lazy_fixture("secure_client"), - lazy_fixture("client_with_object_registry"), - ], + "test_client", [lazy_fixture("client"), lazy_fixture("secure_client")], ) def test_apply_entity_success(self, test_client): @@ -420,50 +396,8 @@ def test_apply_entity_success(self, test_client): and entity.labels["team"] == "matchmaking" ) - @pytest.mark.integration - @pytest.mark.parametrize( - "test_client", [lazy_fixture("client_with_object_registry_gcs")], - ) - def test_apply_entity_integration(self, test_client): - - entity = Entity( - name="driver_car_id", - description="Car driver id", - value_type=ValueType.STRING, - labels={"team": "matchmaking"}, - ) - - # Register Entity with Core - test_client.apply(entity) - - entities = test_client.list_entities() - - entity = entities[0] - assert ( - len(entities) == 1 - and entity.name == "driver_car_id" - and entity.value_type == ValueType(ValueProto.ValueType.STRING) - and entity.description == "Car driver id" - and "team" in entity.labels - and entity.labels["team"] == "matchmaking" - ) - - entity = test_client.get_entity("driver_car_id") - assert ( - entity.name == "driver_car_id" - and entity.value_type == ValueType(ValueProto.ValueType.STRING) - and entity.description == "Car driver id" - and "team" in entity.labels - and entity.labels["team"] == "matchmaking" - ) - @pytest.mark.parametrize( - "test_client", - [ - lazy_fixture("client"), - lazy_fixture("secure_client"), - lazy_fixture("client_with_object_registry"), - ], + "test_client", [lazy_fixture("client"), lazy_fixture("secure_client")], ) def test_apply_feature_table_success(self, test_client): @@ -517,80 +451,6 @@ def test_apply_feature_table_success(self, test_client): and feature_tables[0].entities[0] == "fs1-my-entity-1" ) - @pytest.mark.integration - @pytest.mark.parametrize( - "test_client", [lazy_fixture("client_with_object_registry_gcs")], - ) - def test_apply_feature_table_integration(self, test_client): - - # Create Feature Tables - batch_source = FileSource( - file_format=ParquetFormat(), - file_url="file://feast/*", - event_timestamp_column="ts_col", - created_timestamp_column="timestamp", - date_partition_column="date_partition_col", - ) - - stream_source = KafkaSource( - bootstrap_servers="localhost:9094", - message_format=ProtoFormat("class.path"), - topic="test_topic", - event_timestamp_column="ts_col", - ) - - ft1 = FeatureTable( - name="my-feature-table-1", - features=[ - Feature(name="fs1-my-feature-1", dtype=ValueType.INT64), - Feature(name="fs1-my-feature-2", dtype=ValueType.STRING), - Feature(name="fs1-my-feature-3", dtype=ValueType.STRING_LIST), - Feature(name="fs1-my-feature-4", dtype=ValueType.BYTES_LIST), - ], - entities=["fs1-my-entity-1"], - labels={"team": "matchmaking"}, - batch_source=batch_source, - stream_source=stream_source, - ) - - # Register Feature Table with Core - test_client.apply(ft1) - - feature_tables = test_client.list_feature_tables() - - # List Feature Tables - assert ( - len(feature_tables) == 1 - and feature_tables[0].name == "my-feature-table-1" - and feature_tables[0].features[0].name == "fs1-my-feature-1" - and feature_tables[0].features[0].dtype == ValueType.INT64 - and feature_tables[0].features[1].name == "fs1-my-feature-2" - and feature_tables[0].features[1].dtype == ValueType.STRING - and feature_tables[0].features[2].name == "fs1-my-feature-3" - and feature_tables[0].features[2].dtype == ValueType.STRING_LIST - and feature_tables[0].features[3].name == "fs1-my-feature-4" - and feature_tables[0].features[3].dtype == ValueType.BYTES_LIST - and feature_tables[0].entities[0] == "fs1-my-entity-1" - ) - - feature_table = test_client.get_feature_table("my-feature-table-1") - assert ( - feature_table.name == "my-feature-table-1" - and feature_table.features[0].name == "fs1-my-feature-1" - and feature_table.features[0].dtype == ValueType.INT64 - and feature_table.features[1].name == "fs1-my-feature-2" - and feature_table.features[1].dtype == ValueType.STRING - and feature_table.features[2].name == "fs1-my-feature-3" - and feature_table.features[2].dtype == ValueType.STRING_LIST - and feature_table.features[3].name == "fs1-my-feature-4" - and feature_table.features[3].dtype == ValueType.BYTES_LIST - and feature_table.entities[0] == "fs1-my-entity-1" - ) - - test_client.delete_feature_table("my-feature-table-1") - feature_tables = test_client.list_feature_tables() - assert len(feature_tables) == 0 - @pytest.mark.parametrize( "test_client", [lazy_fixture("client"), lazy_fixture("secure_client")] ) diff --git a/sdk/python/tests/test_feature_store.py b/sdk/python/tests/test_feature_store.py new file mode 100644 index 0000000000..7627cdf85f --- /dev/null +++ b/sdk/python/tests/test_feature_store.py @@ -0,0 +1,247 @@ +# Copyright 2019 The Feast Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import time +from datetime import timedelta +from tempfile import mkstemp + +import pytest +from pytest_lazyfixture import lazy_fixture + +from feast.data_format import ParquetFormat +from feast.data_source import FileSource +from feast.entity import Entity +from feast.feature import Feature +from feast.feature_store import FeatureStore +from feast.feature_view import FeatureView +from feast.protos.feast.types import Value_pb2 as ValueProto +from feast.repo_config import LocalOnlineStoreConfig, OnlineStoreConfig, RepoConfig +from feast.value_type import ValueType + + +class TestFeatureStore: + @pytest.fixture + def feature_store_with_local_registry(self): + fd, registry_path = mkstemp() + fd, online_store_path = mkstemp() + return FeatureStore( + config=RepoConfig( + metadata_store=registry_path, + project="default", + provider="local", + online_store=OnlineStoreConfig( + local=LocalOnlineStoreConfig(path=online_store_path) + ), + ) + ) + + @pytest.fixture + def feature_store_with_gcs_registry(self): + from google.cloud import storage + + storage_client = storage.Client() + bucket_name = f"feast-registry-test-{int(time.time())}" + bucket = storage_client.bucket(bucket_name) + bucket = storage_client.create_bucket(bucket) + bucket.add_lifecycle_delete_rule( + age=14 + ) # delete buckets automatically after 14 days + bucket.patch() + bucket.blob("metadata.db") + + return FeatureStore( + config=RepoConfig( + metadata_store=f"gs://{bucket_name}/metadata.db", + project="default", + provider="gcp", + ) + ) + + @pytest.mark.parametrize( + "test_feature_store", [lazy_fixture("feature_store_with_local_registry")], + ) + def test_apply_entity_success(self, test_feature_store): + + entity = Entity( + name="driver_car_id", + description="Car driver id", + value_type=ValueType.STRING, + labels={"team": "matchmaking"}, + ) + + # Register Entity with Core + test_feature_store.apply([entity]) + + entities = test_feature_store.list_entities() + + entity = entities[0] + assert ( + len(entities) == 1 + and entity.name == "driver_car_id" + and entity.value_type == ValueType(ValueProto.ValueType.STRING) + and entity.description == "Car driver id" + and "team" in entity.labels + and entity.labels["team"] == "matchmaking" + ) + + @pytest.mark.integration + @pytest.mark.parametrize( + "test_feature_store", [lazy_fixture("feature_store_with_gcs_registry")], + ) + def test_apply_entity_integration(self, test_feature_store): + + entity = Entity( + name="driver_car_id", + description="Car driver id", + value_type=ValueType.STRING, + labels={"team": "matchmaking"}, + ) + + # Register Entity with Core + test_feature_store.apply([entity]) + + entities = test_feature_store.list_entities() + + entity = entities[0] + assert ( + len(entities) == 1 + and entity.name == "driver_car_id" + and entity.value_type == ValueType(ValueProto.ValueType.STRING) + and entity.description == "Car driver id" + and "team" in entity.labels + and entity.labels["team"] == "matchmaking" + ) + + entity = test_feature_store.get_entity("driver_car_id") + assert ( + entity.name == "driver_car_id" + and entity.value_type == ValueType(ValueProto.ValueType.STRING) + and entity.description == "Car driver id" + and "team" in entity.labels + and entity.labels["team"] == "matchmaking" + ) + + @pytest.mark.parametrize( + "test_feature_store", [lazy_fixture("feature_store_with_local_registry")], + ) + def test_apply_feature_view_success(self, test_feature_store): + + # Create Feature Views + batch_source = FileSource( + file_format=ParquetFormat(), + file_url="file://feast/*", + event_timestamp_column="ts_col", + created_timestamp_column="timestamp", + date_partition_column="date_partition_col", + ) + + fv1 = FeatureView( + name="my_feature_view_1", + features=[ + Feature(name="fs1_my_feature_1", dtype=ValueType.INT64), + Feature(name="fs1_my_feature_2", dtype=ValueType.STRING), + Feature(name="fs1_my_feature_3", dtype=ValueType.STRING_LIST), + Feature(name="fs1_my_feature_4", dtype=ValueType.BYTES_LIST), + ], + entities=["fs1_my_entity_1"], + tags={"team": "matchmaking"}, + input=batch_source, + ttl=timedelta(minutes=5), + ) + + # Register Feature View + test_feature_store.apply([fv1]) + + feature_views = test_feature_store.list_feature_views() + + # List Feature Views + assert ( + len(feature_views) == 1 + and feature_views[0].name == "my_feature_view_1" + and feature_views[0].features[0].name == "fs1_my_feature_1" + and feature_views[0].features[0].dtype == ValueType.INT64 + and feature_views[0].features[1].name == "fs1_my_feature_2" + and feature_views[0].features[1].dtype == ValueType.STRING + and feature_views[0].features[2].name == "fs1_my_feature_3" + and feature_views[0].features[2].dtype == ValueType.STRING_LIST + and feature_views[0].features[3].name == "fs1_my_feature_4" + and feature_views[0].features[3].dtype == ValueType.BYTES_LIST + and feature_views[0].entities[0] == "fs1_my_entity_1" + ) + + @pytest.mark.integration + @pytest.mark.parametrize( + "test_feature_store", [lazy_fixture("feature_store_with_gcs_registry")], + ) + def test_apply_feature_view_integration(self, test_feature_store): + + # Create Feature Views + batch_source = FileSource( + file_format=ParquetFormat(), + file_url="file://feast/*", + event_timestamp_column="ts_col", + created_timestamp_column="timestamp", + date_partition_column="date_partition_col", + ) + + fv1 = FeatureView( + name="my_feature_view_1", + features=[ + Feature(name="fs1_my_feature_1", dtype=ValueType.INT64), + Feature(name="fs1_my_feature_2", dtype=ValueType.STRING), + Feature(name="fs1_my_feature_3", dtype=ValueType.STRING_LIST), + Feature(name="fs1_my_feature_4", dtype=ValueType.BYTES_LIST), + ], + entities=["fs1_my_entity_1"], + tags={"team": "matchmaking"}, + input=batch_source, + ttl=timedelta(minutes=5), + ) + + # Register Feature View + test_feature_store.apply([fv1]) + + feature_views = test_feature_store.list_feature_views() + + # List Feature Views + assert ( + len(feature_views) == 1 + and feature_views[0].name == "my_feature_view_1" + and feature_views[0].features[0].name == "fs1_my_feature_1" + and feature_views[0].features[0].dtype == ValueType.INT64 + and feature_views[0].features[1].name == "fs1_my_feature_2" + and feature_views[0].features[1].dtype == ValueType.STRING + and feature_views[0].features[2].name == "fs1_my_feature_3" + and feature_views[0].features[2].dtype == ValueType.STRING_LIST + and feature_views[0].features[3].name == "fs1_my_feature_4" + and feature_views[0].features[3].dtype == ValueType.BYTES_LIST + and feature_views[0].entities[0] == "fs1_my_entity_1" + ) + + feature_view = test_feature_store.get_feature_view("my_feature_view_1") + assert ( + feature_view.name == "my_feature_view_1" + and feature_view.features[0].name == "fs1_my_feature_1" + and feature_view.features[0].dtype == ValueType.INT64 + and feature_view.features[1].name == "fs1_my_feature_2" + and feature_view.features[1].dtype == ValueType.STRING + and feature_view.features[2].name == "fs1_my_feature_3" + and feature_view.features[2].dtype == ValueType.STRING_LIST + and feature_view.features[3].name == "fs1_my_feature_4" + and feature_view.features[3].dtype == ValueType.BYTES_LIST + and feature_view.entities[0] == "fs1_my_entity_1" + ) + + test_feature_store.delete_feature_view("my_feature_view_1") + feature_views = test_feature_store.list_feature_views() + assert len(feature_views) == 0