Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Enable registry caching in SQL Registry #3395

Merged
merged 5 commits into from
Dec 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions docs/getting-started/concepts/registry.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ a remote file registry, you need to create a GCS / S3 bucket that Feast can unde
```yaml
project: feast_demo_aws
provider: aws
registry: s3://[YOUR BUCKET YOU CREATED]/registry.pb
registry:
path: s3://[YOUR BUCKET YOU CREATED]/registry.pb
cache_ttl_seconds: 60
online_store: null
offline_store:
type: file
Expand All @@ -27,7 +29,9 @@ offline_store:
```yaml
project: feast_demo_gcp
provider: gcp
registry: gs://[YOUR BUCKET YOU CREATED]/registry.pb
registry:
path: gs://[YOUR BUCKET YOU CREATED]/registry.pb
cache_ttl_seconds: 60
online_store: null
offline_store:
type: file
Expand All @@ -43,6 +47,18 @@ multiple feature views or time ranges concurrently).
#### SQL Registry
Alternatively, a [SQL Registry](../../tutorials/using-scalable-registry.md) can be used for a more scalable registry.

The configuration roughly looks like:
```yaml
project: <your project name>
provider: <provider name>
online_store: redis
offline_store: file
registry:
registry_type: sql
path: postgresql://postgres:mysecretpassword@127.0.0.1:55001/feast
cache_ttl_seconds: 60
```

This supports any SQLAlchemy compatible database as a backend. The exact schema can be seen in [sql.py](https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/registry/sql.py)

### Updating the registry
Expand Down
1 change: 1 addition & 0 deletions docs/tutorials/using-scalable-registry.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ offline_store: file
registry:
registry_type: sql
path: postgresql://postgres:mysecretpassword@127.0.0.1:55001/feast
cache_ttl_seconds: 60
```

Specifically, the registry_type needs to be set to sql in the registry config block. On doing so, the path should refer to the [Database URL](https://docs.sqlalchemy.org/en/14/core/engines.html#database-urls) for the database to be used, as expected by SQLAlchemy. No other additional commands are currently needed to configure this registry.
Expand Down
208 changes: 208 additions & 0 deletions sdk/python/feast/infra/registry/proto_registry_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
from typing import List

from feast.data_source import DataSource
from feast.entity import Entity
from feast.errors import (
DataSourceObjectNotFoundException,
EntityNotFoundException,
FeatureServiceNotFoundException,
FeatureViewNotFoundException,
OnDemandFeatureViewNotFoundException,
SavedDatasetNotFound,
ValidationReferenceNotFound,
)
from feast.feature_service import FeatureService
from feast.feature_view import FeatureView
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.project_metadata import ProjectMetadata
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
from feast.request_feature_view import RequestFeatureView
from feast.saved_dataset import SavedDataset, ValidationReference
from feast.stream_feature_view import StreamFeatureView


def get_feature_service(
registry_proto: RegistryProto, name: str, project: str
) -> FeatureService:
for feature_service_proto in registry_proto.feature_services:
if (
feature_service_proto.spec.project == project
and feature_service_proto.spec.name == name
):
return FeatureService.from_proto(feature_service_proto)
raise FeatureServiceNotFoundException(name, project=project)


def get_feature_view(
registry_proto: RegistryProto, name: str, project: str
) -> FeatureView:
for feature_view_proto in registry_proto.feature_views:
if (
feature_view_proto.spec.name == name
and feature_view_proto.spec.project == project
):
return FeatureView.from_proto(feature_view_proto)
raise FeatureViewNotFoundException(name, project)


def get_stream_feature_view(
registry_proto: RegistryProto, name: str, project: str
) -> StreamFeatureView:
for feature_view_proto in registry_proto.stream_feature_views:
if (
feature_view_proto.spec.name == name
and feature_view_proto.spec.project == project
):
return StreamFeatureView.from_proto(feature_view_proto)
raise FeatureViewNotFoundException(name, project)


def get_request_feature_view(registry_proto: RegistryProto, name: str, project: str):
for feature_view_proto in registry_proto.feature_views:
if (
feature_view_proto.spec.name == name
and feature_view_proto.spec.project == project
):
return RequestFeatureView.from_proto(feature_view_proto)
raise FeatureViewNotFoundException(name, project)


def get_on_demand_feature_view(
registry_proto: RegistryProto, name: str, project: str
) -> OnDemandFeatureView:
for on_demand_feature_view in registry_proto.on_demand_feature_views:
if (
on_demand_feature_view.spec.project == project
and on_demand_feature_view.spec.name == name
):
return OnDemandFeatureView.from_proto(on_demand_feature_view)
raise OnDemandFeatureViewNotFoundException(name, project=project)


def get_data_source(
registry_proto: RegistryProto, name: str, project: str
) -> DataSource:
for data_source in registry_proto.data_sources:
if data_source.project == project and data_source.name == name:
return DataSource.from_proto(data_source)
raise DataSourceObjectNotFoundException(name, project=project)


def get_entity(registry_proto: RegistryProto, name: str, project: str) -> Entity:
for entity_proto in registry_proto.entities:
if entity_proto.spec.name == name and entity_proto.spec.project == project:
return Entity.from_proto(entity_proto)
raise EntityNotFoundException(name, project=project)


def get_saved_dataset(
registry_proto: RegistryProto, name: str, project: str
) -> SavedDataset:
for saved_dataset in registry_proto.saved_datasets:
if saved_dataset.spec.name == name and saved_dataset.spec.project == project:
return SavedDataset.from_proto(saved_dataset)
raise SavedDatasetNotFound(name, project=project)


def get_validation_reference(
registry_proto: RegistryProto, name: str, project: str
) -> ValidationReference:
for validation_reference in registry_proto.validation_references:
if (
validation_reference.name == name
and validation_reference.project == project
):
return ValidationReference.from_proto(validation_reference)
raise ValidationReferenceNotFound(name, project=project)


def list_feature_services(
registry_proto: RegistryProto, project: str, allow_cache: bool = False
) -> List[FeatureService]:
feature_services = []
for feature_service_proto in registry_proto.feature_services:
if feature_service_proto.spec.project == project:
feature_services.append(FeatureService.from_proto(feature_service_proto))
return feature_services


def list_feature_views(
registry_proto: RegistryProto, project: str
) -> List[FeatureView]:
feature_views: List[FeatureView] = []
for feature_view_proto in registry_proto.feature_views:
if feature_view_proto.spec.project == project:
feature_views.append(FeatureView.from_proto(feature_view_proto))
return feature_views


def list_request_feature_views(
registry_proto: RegistryProto, project: str
) -> List[RequestFeatureView]:
feature_views: List[RequestFeatureView] = []
for request_feature_view_proto in registry_proto.request_feature_views:
if request_feature_view_proto.spec.project == project:
feature_views.append(
RequestFeatureView.from_proto(request_feature_view_proto)
)
return feature_views


def list_stream_feature_views(
registry_proto: RegistryProto, project: str
) -> List[StreamFeatureView]:
stream_feature_views = []
for stream_feature_view in registry_proto.stream_feature_views:
if stream_feature_view.spec.project == project:
stream_feature_views.append(
StreamFeatureView.from_proto(stream_feature_view)
)
return stream_feature_views


def list_on_demand_feature_views(
registry_proto: RegistryProto, project: str
) -> List[OnDemandFeatureView]:
on_demand_feature_views = []
for on_demand_feature_view in registry_proto.on_demand_feature_views:
if on_demand_feature_view.spec.project == project:
on_demand_feature_views.append(
OnDemandFeatureView.from_proto(on_demand_feature_view)
)
return on_demand_feature_views


def list_entities(registry_proto: RegistryProto, project: str) -> List[Entity]:
entities = []
for entity_proto in registry_proto.entities:
if entity_proto.spec.project == project:
entities.append(Entity.from_proto(entity_proto))
return entities


def list_data_sources(registry_proto: RegistryProto, project: str) -> List[DataSource]:
data_sources = []
for data_source_proto in registry_proto.data_sources:
if data_source_proto.project == project:
data_sources.append(DataSource.from_proto(data_source_proto))
return data_sources


def list_saved_datasets(
registry_proto: RegistryProto, project: str, allow_cache: bool = False
) -> List[SavedDataset]:
return [
SavedDataset.from_proto(saved_dataset)
for saved_dataset in registry_proto.saved_datasets
if saved_dataset.spec.project == project
]


def list_project_metadata(
registry_proto: RegistryProto, project: str
) -> List[ProjectMetadata]:
return [
ProjectMetadata.from_proto(project_metadata)
for project_metadata in registry_proto.project_metadata
if project_metadata.project == project
]
Loading