Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make online store nullable #2224

Merged
21 changes: 12 additions & 9 deletions sdk/python/feast/infra/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,16 @@ def update_infra(
entities_to_keep: Sequence[Entity],
partial: bool,
):
self.online_store.update(
config=self.repo_config,
tables_to_delete=tables_to_delete,
tables_to_keep=tables_to_keep,
entities_to_keep=entities_to_keep,
entities_to_delete=entities_to_delete,
partial=partial,
)
# Call update only if there is an online store
if self.online_store:
self.online_store.update(
config=self.repo_config,
tables_to_delete=tables_to_delete,
tables_to_keep=tables_to_keep,
entities_to_keep=entities_to_keep,
entities_to_delete=entities_to_delete,
partial=partial,
)

if self.repo_config.feature_server and self.repo_config.feature_server.enabled:
if not enable_aws_lambda_feature_server(self.repo_config):
Expand Down Expand Up @@ -194,7 +196,8 @@ def _deploy_feature_server(self, project: str, image_uri: str):
def teardown_infra(
self, project: str, tables: Sequence[FeatureView], entities: Sequence[Entity],
) -> None:
self.online_store.teardown(self.repo_config, tables, entities)
if self.online_store:
self.online_store.teardown(self.repo_config, tables, entities)

if (
self.repo_config.feature_server is not None
Expand Down
9 changes: 5 additions & 4 deletions sdk/python/feast/infra/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ class LocalProvider(PassthroughProvider):
def plan_infra(
self, config: RepoConfig, desired_registry_proto: RegistryProto
) -> Infra:
infra_objects: List[InfraObject] = self.online_store.plan(
config, desired_registry_proto
)
infra = Infra()
infra.infra_objects += infra_objects
if self.online_store:
infra_objects: List[InfraObject] = self.online_store.plan(
config, desired_registry_proto
)
infra.infra_objects += infra_objects
return infra


Expand Down
42 changes: 26 additions & 16 deletions sdk/python/feast/infra/passthrough_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,11 @@ def __init__(self, config: RepoConfig):

self.repo_config = config
self.offline_store = get_offline_store_from_config(config.offline_store)
self.online_store = get_online_store_from_config(config.online_store)
self.online_store = (
get_online_store_from_config(config.online_store)
if config.online_store
else None
)

def update_infra(
self,
Expand All @@ -47,20 +51,24 @@ def update_infra(
partial: bool,
):
set_usage_attribute("provider", self.__class__.__name__)
self.online_store.update(
config=self.repo_config,
tables_to_delete=tables_to_delete,
tables_to_keep=tables_to_keep,
entities_to_keep=entities_to_keep,
entities_to_delete=entities_to_delete,
partial=partial,
)

# Call update only if there is an online store
if self.online_store:
self.online_store.update(
config=self.repo_config,
tables_to_delete=tables_to_delete,
tables_to_keep=tables_to_keep,
entities_to_keep=entities_to_keep,
entities_to_delete=entities_to_delete,
partial=partial,
)

def teardown_infra(
self, project: str, tables: Sequence[FeatureView], entities: Sequence[Entity],
mirayyuce marked this conversation as resolved.
Show resolved Hide resolved
) -> None:
set_usage_attribute("provider", self.__class__.__name__)
self.online_store.teardown(self.repo_config, tables, entities)
if self.online_store:
self.online_store.teardown(self.repo_config, tables, entities)

def online_write_batch(
self,
Expand All @@ -72,7 +80,8 @@ def online_write_batch(
progress: Optional[Callable[[int], Any]],
) -> None:
set_usage_attribute("provider", self.__class__.__name__)
self.online_store.online_write_batch(config, table, data, progress)
if self.online_store:
self.online_store.online_write_batch(config, table, data, progress)

@log_exceptions_and_usage(sampler=RatioSampler(ratio=0.001))
def online_read(
Expand All @@ -81,12 +90,13 @@ def online_read(
table: FeatureView,
entity_keys: List[EntityKeyProto],
requested_features: List[str] = None,
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
) -> List:
set_usage_attribute("provider", self.__class__.__name__)
result = self.online_store.online_read(
config, table, entity_keys, requested_features
)

result = []
if self.online_store:
result = self.online_store.online_read(
config, table, entity_keys, requested_features
)
return result

def ingest_df(
Expand Down
6 changes: 5 additions & 1 deletion sdk/python/feast/repo_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,12 @@ def _validate_online_store_config(cls, values):
if "online_store" not in values:
values["online_store"] = dict()

# Skip if we aren't creating the configuration from a dict
# Skip if we aren't creating the configuration from a dict or online store is null or it is a string like "None" or "null"
if not isinstance(values["online_store"], Dict):
if isinstance(values["online_store"], str) and values[
"online_store"
].lower() in {"none", "null"}:
values["online_store"] = None
return values

# Make sure that the provider configuration is set. We need it to set the defaults
Expand Down
65 changes: 64 additions & 1 deletion sdk/python/tests/integration/registration/test_cli.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,32 @@
import os
import tempfile
import uuid
from contextlib import contextmanager
from pathlib import Path, PosixPath
from textwrap import dedent
from typing import List

import pytest
import yaml
from assertpy import assertpy

from feast import FeatureStore, RepoConfig
from tests.integration.feature_repos.integration_test_repo_config import (
IntegrationTestRepoConfig,
)
from tests.integration.feature_repos.repo_configuration import FULL_REPO_CONFIGS
from tests.integration.feature_repos.universal.data_source_creator import (
DataSourceCreator,
)
from tests.integration.feature_repos.universal.data_sources.bigquery import (
BigQueryDataSourceCreator,
)
from tests.integration.feature_repos.universal.data_sources.file import (
FileDataSourceCreator,
)
from tests.integration.feature_repos.universal.data_sources.redshift import (
RedshiftDataSourceCreator,
)
from tests.utils.cli_utils import CliRunner, get_example_repo
from tests.utils.online_read_write_test import basic_rw_test

Expand All @@ -21,7 +35,6 @@
@pytest.mark.parametrize("test_repo_config", FULL_REPO_CONFIGS)
def test_universal_cli(test_repo_config) -> None:
project = f"test_universal_cli_{str(uuid.uuid4()).replace('-', '')[:8]}"

runner = CliRunner()

with tempfile.TemporaryDirectory() as repo_dir_name:
Expand Down Expand Up @@ -128,6 +141,56 @@ def make_feature_store_yaml(project, test_repo_config, repo_dir_name: PosixPath)
return yaml.safe_dump(config_dict)


NULLABLE_ONLINE_STORE_CONFIGS: List[IntegrationTestRepoConfig] = [
mirayyuce marked this conversation as resolved.
Show resolved Hide resolved
IntegrationTestRepoConfig(
provider="local",
offline_store_creator=FileDataSourceCreator,
online_store=None,
),
]

if os.getenv("FEAST_IS_LOCAL_TEST", "False") == "True":
mirayyuce marked this conversation as resolved.
Show resolved Hide resolved
NULLABLE_ONLINE_STORE_CONFIGS.extend(
[
IntegrationTestRepoConfig(
provider="gcp",
offline_store_creator=BigQueryDataSourceCreator,
online_store=None,
),
IntegrationTestRepoConfig(
provider="aws",
offline_store_creator=RedshiftDataSourceCreator,
online_store=None,
),
]
)


@pytest.mark.integration
@pytest.mark.parametrize("test_nullable_online_store", NULLABLE_ONLINE_STORE_CONFIGS)
def test_nullable_online_store(test_nullable_online_store) -> None:
project = f"test_nullable_online_store{str(uuid.uuid4()).replace('-', '')[:8]}"
runner = CliRunner()

with tempfile.TemporaryDirectory() as repo_dir_name:
try:
feature_store_yaml = make_feature_store_yaml(
project, test_nullable_online_store, repo_dir_name
)
repo_path = Path(repo_dir_name)

repo_config = repo_path / "feature_store.yaml"

repo_config.write_text(dedent(feature_store_yaml))

repo_example = repo_path / "example.py"
repo_example.write_text(get_example_repo("example_feature_repo_1.py"))
result = runner.run(["apply"], cwd=repo_path)
assertpy.assert_that(result.returncode).is_equal_to(0)
finally:
runner.run(["teardown"], cwd=repo_path)


@contextmanager
def setup_third_party_provider_repo(provider_name: str):
with tempfile.TemporaryDirectory() as repo_dir_name:
Expand Down
43 changes: 43 additions & 0 deletions sdk/python/tests/integration/scaffolding/test_repo_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,49 @@ def _test_config(config_text, expect_error: Optional[str]):
return rc


def test_nullable_online_store_aws():
_test_config(
dedent(
"""
project: foo
registry: "registry.db"
provider: aws
online_store: null
"""
),
expect_error="__root__ -> offline_store -> cluster_id\n"
" field required (type=value_error.missing)",
)


def test_nullable_online_store_gcp():
_test_config(
dedent(
"""
project: foo
registry: "registry.db"
provider: gcp
online_store: null
"""
),
expect_error=None,
)


def test_nullable_online_store_local():
_test_config(
dedent(
"""
project: foo
registry: "registry.db"
provider: local
online_store: null
"""
),
expect_error=None,
)


def test_local_config():
_test_config(
dedent(
Expand Down