Skip to content

Commit

Permalink
Registry teardown (#1718)
Browse files Browse the repository at this point in the history
* Teardown Registry during FeatureStore teardown

Signed-off-by: Felix Wang <wangfelix98@gmail.com>

* Modify CLI teardown command to delegate to FeatureStore teardown method

Signed-off-by: Felix Wang <wangfelix98@gmail.com>

* Ensure tests teardown FeatureStores

Signed-off-by: Felix Wang <wangfelix98@gmail.com>

* Ensure tests teardown Registries

Signed-off-by: Felix Wang <wangfelix98@gmail.com>

* Remove unnecessary return statements

Signed-off-by: Felix Wang <wangfelix98@gmail.com>

* Move import

Signed-off-by: Felix Wang <wangfelix98@gmail.com>

* Clarify teardown comment

Signed-off-by: Felix Wang <wangfelix98@gmail.com>
  • Loading branch information
felixwang9817 authored Jul 22, 2021
1 parent 13645c1 commit 4e6a3f7
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 29 deletions.
5 changes: 1 addition & 4 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,10 +267,7 @@ def teardown(self):
entities = self.list_entities()

self._get_provider().teardown_infra(self.project, tables, entities)
for feature_view in feature_views:
self.delete_feature_view(feature_view.name)
for feature_table in feature_tables:
self._registry.delete_feature_table(feature_table.name, self.project)
self._registry.teardown()

@log_exceptions_and_usage
def get_historical_features(
Expand Down
40 changes: 32 additions & 8 deletions sdk/python/feast/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ def __init__(self, registry_path: str, repo_path: Path, cache_ttl: timedelta):
f"Registry path {registry_path} has unsupported scheme {uri.scheme}. Supported schemes are file and gs."
)
self.cached_registry_proto_ttl = cache_ttl
return

def _initialize_registry(self):
"""Explicitly initializes the registry with an empty proto."""
Expand Down Expand Up @@ -109,7 +108,6 @@ def apply_entity(self, entity: Entity, project: str, commit: bool = True):
self.cached_registry_proto.entities.append(entity_proto)
if commit:
self.commit()
return

def list_entities(self, project: str, allow_cache: bool = False) -> List[Entity]:
"""
Expand Down Expand Up @@ -396,6 +394,10 @@ def refresh(self):
"""Refreshes the state of the registry cache by fetching the registry state from the remote registry store."""
self._get_registry_proto(allow_cache=False)

def teardown(self):
"""Tears down (removes) the registry."""
self._registry_store.teardown()

def _prepare_registry_for_changes(self):
"""Prepares the Registry for changes by refreshing the cache if necessary."""
try:
Expand Down Expand Up @@ -469,6 +471,13 @@ def update_registry_proto(self, registry_proto: RegistryProto):
"""
pass

@abstractmethod
def teardown(self):
"""
Tear down all resources.
"""
pass


class LocalRegistryStore(RegistryStore):
def __init__(self, repo_path: Path, registry_path_string: str):
Expand All @@ -489,15 +498,21 @@ def get_registry_proto(self):

def update_registry_proto(self, registry_proto: RegistryProto):
self._write_registry(registry_proto)
return

def teardown(self):
try:
self._filepath.unlink()
except FileNotFoundError:
# If the file deletion fails with FileNotFoundError, the file has already
# been deleted.
pass

def _write_registry(self, registry_proto: RegistryProto):
registry_proto.version_id = str(uuid.uuid4())
registry_proto.last_updated.FromDatetime(datetime.utcnow())
file_dir = self._filepath.parent
file_dir.mkdir(exist_ok=True)
self._filepath.write_bytes(registry_proto.SerializeToString())
return


class GCSRegistryStore(RegistryStore):
Expand All @@ -513,7 +528,6 @@ def __init__(self, uri: str):
self._uri = urlparse(uri)
self._bucket = self._uri.hostname
self._blob = self._uri.path.lstrip("/")
return

def get_registry_proto(self):
from google.cloud import storage
Expand All @@ -540,7 +554,16 @@ def get_registry_proto(self):

def update_registry_proto(self, registry_proto: RegistryProto):
self._write_registry(registry_proto)
return

def teardown(self):
from google.cloud.exceptions import NotFound

gs_bucket = self.gcs_client.get_bucket(self._bucket)
try:
gs_bucket.delete_blob(self._blob)
except NotFound:
# If the blob deletion fails with NotFound, it has already been deleted.
pass

def _write_registry(self, registry_proto: RegistryProto):
registry_proto.version_id = str(uuid.uuid4())
Expand All @@ -552,7 +575,6 @@ def _write_registry(self, registry_proto: RegistryProto):
file_obj.write(registry_proto.SerializeToString())
file_obj.seek(0)
blob.upload_from_file(file_obj)
return


class S3RegistryStore(RegistryStore):
Expand Down Expand Up @@ -605,7 +627,9 @@ def get_registry_proto(self):

def update_registry_proto(self, registry_proto: RegistryProto):
self._write_registry(registry_proto)
return

def teardown(self):
self.s3_client.Object(self._bucket, self._key).delete()

def _write_registry(self, registry_proto: RegistryProto):
registry_proto.version_id = str(uuid.uuid4())
Expand Down
21 changes: 4 additions & 17 deletions sdk/python/feast/repo_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from click.exceptions import BadParameter

from feast import Entity, FeatureTable
from feast.feature_store import FeatureStore
from feast.feature_view import FeatureView
from feast.inference import (
update_data_sources_with_inferred_event_timestamp_col,
Expand Down Expand Up @@ -242,23 +243,9 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation

@log_exceptions_and_usage
def teardown(repo_config: RepoConfig, repo_path: Path):
registry_config = repo_config.get_registry_config()
registry = Registry(
registry_path=registry_config.path,
repo_path=repo_path,
cache_ttl=timedelta(seconds=registry_config.cache_ttl_seconds),
)
project = repo_config.project
registry_tables: List[Union[FeatureTable, FeatureView]] = []
registry_tables.extend(registry.list_feature_tables(project=project))
registry_tables.extend(registry.list_feature_views(project=project))

registry_entities: List[Entity] = registry.list_entities(project=project)

infra_provider = get_provider(repo_config, repo_path)
infra_provider.teardown_infra(
project, tables=registry_tables, entities=registry_entities
)
# Cannot pass in both repo_path and repo_config to FeatureStore.
feature_store = FeatureStore(repo_path=repo_path, config=None)
feature_store.teardown()


@log_exceptions_and_usage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@ def prep_redshift_fs_and_fv(

yield fs, fv

fs.teardown()

# Clean up the uploaded Redshift table
aws_utils.execute_redshift_statement(
client,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,8 @@ def test_historical_features_from_parquet_sources(
).reset_index(drop=True),
)

store.teardown()


@pytest.mark.integration
@pytest.mark.parametrize(
Expand Down Expand Up @@ -596,6 +598,8 @@ def test_historical_features_from_bigquery_sources(
actual_df_from_df_entities, table_from_df_entities.to_pandas()
)

store.teardown()


@pytest.mark.integration
def test_timestamp_bound_inference_from_entity_df_using_bigquery():
Expand Down
14 changes: 14 additions & 0 deletions sdk/python/tests/integration/registration/test_feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ def test_apply_entity_success(test_feature_store):
and entity.labels["team"] == "matchmaking"
)

test_feature_store.teardown()


@pytest.mark.integration
@pytest.mark.parametrize(
Expand Down Expand Up @@ -154,6 +156,8 @@ def test_apply_entity_integration(test_feature_store):
and entity.labels["team"] == "matchmaking"
)

test_feature_store.teardown()


@pytest.mark.parametrize(
"test_feature_store", [lazy_fixture("feature_store_with_local_registry")],
Expand Down Expand Up @@ -202,6 +206,8 @@ def test_apply_feature_view_success(test_feature_store):
and feature_views[0].entities[0] == "fs1_my_entity_1"
)

test_feature_store.teardown()


@pytest.mark.integration
@pytest.mark.parametrize(
Expand Down Expand Up @@ -266,6 +272,8 @@ def test_feature_view_inference_success(test_feature_store, dataframe_source):
== actual_bq_using_query_arg_source
)

test_feature_store.teardown()


@pytest.mark.integration
@pytest.mark.parametrize(
Expand Down Expand Up @@ -337,6 +345,8 @@ def test_apply_feature_view_integration(test_feature_store):
feature_views = test_feature_store.list_feature_views()
assert len(feature_views) == 0

test_feature_store.teardown()


@pytest.mark.parametrize(
"test_feature_store", [lazy_fixture("feature_store_with_local_registry")],
Expand Down Expand Up @@ -398,6 +408,8 @@ def test_apply_object_and_read(test_feature_store):
assert fv2 != fv1_actual
assert e2 != e1_actual

test_feature_store.teardown()


def test_apply_remote_repo():
fd, registry_path = mkstemp()
Expand Down Expand Up @@ -466,3 +478,5 @@ def test_reapply_feature_view_success(test_feature_store, dataframe_source):
# Check Feature View
fv_stored = test_feature_store.get_feature_view(fv1.name)
assert len(fv_stored.materialization_intervals) == 0

test_feature_store.teardown()
30 changes: 30 additions & 0 deletions sdk/python/tests/integration/registration/test_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ def test_apply_entity_success(test_registry):
and entity.labels["team"] == "matchmaking"
)

test_registry.teardown()

# Will try to reload registry, which will fail because the file has been deleted
with pytest.raises(FileNotFoundError):
test_registry._get_registry_proto()


@pytest.mark.integration
@pytest.mark.parametrize(
Expand Down Expand Up @@ -135,6 +141,12 @@ def test_apply_entity_integration(test_registry):
and entity.labels["team"] == "matchmaking"
)

test_registry.teardown()

# Will try to reload registry, which will fail because the file has been deleted
with pytest.raises(FileNotFoundError):
test_registry._get_registry_proto()


@pytest.mark.parametrize(
"test_registry", [lazy_fixture("local_registry")],
Expand Down Expand Up @@ -203,6 +215,12 @@ def test_apply_feature_view_success(test_registry):
feature_views = test_registry.list_feature_views(project)
assert len(feature_views) == 0

test_registry.teardown()

# Will try to reload registry, which will fail because the file has been deleted
with pytest.raises(FileNotFoundError):
test_registry._get_registry_proto()


@pytest.mark.integration
@pytest.mark.parametrize(
Expand Down Expand Up @@ -272,6 +290,12 @@ def test_apply_feature_view_integration(test_registry):
feature_views = test_registry.list_feature_views(project)
assert len(feature_views) == 0

test_registry.teardown()

# Will try to reload registry, which will fail because the file has been deleted
with pytest.raises(FileNotFoundError):
test_registry._get_registry_proto()


def test_commit():
fd, registry_path = mkstemp()
Expand Down Expand Up @@ -345,3 +369,9 @@ def test_commit():
and "team" in entity.labels
and entity.labels["team"] == "matchmaking"
)

test_registry.teardown()

# Will try to reload registry, which will fail because the file has been deleted
with pytest.raises(FileNotFoundError):
test_registry._get_registry_proto()

0 comments on commit 4e6a3f7

Please sign in to comment.