diff --git a/sdk/python/feast/infra/aws.py b/sdk/python/feast/infra/aws.py index 735b2f62e7..104e20388a 100644 --- a/sdk/python/feast/infra/aws.py +++ b/sdk/python/feast/infra/aws.py @@ -62,14 +62,16 @@ def update_infra( entities_to_keep: Sequence[Entity], partial: bool, ): - self.online_store.update( - config=self.repo_config, - tables_to_delete=tables_to_delete, - tables_to_keep=tables_to_keep, - entities_to_keep=entities_to_keep, - entities_to_delete=entities_to_delete, - partial=partial, - ) + # Call update only if there is an online store + if self.online_store: + self.online_store.update( + config=self.repo_config, + tables_to_delete=tables_to_delete, + tables_to_keep=tables_to_keep, + entities_to_keep=entities_to_keep, + entities_to_delete=entities_to_delete, + partial=partial, + ) if self.repo_config.feature_server and self.repo_config.feature_server.enabled: if not enable_aws_lambda_feature_server(self.repo_config): @@ -194,7 +196,8 @@ def _deploy_feature_server(self, project: str, image_uri: str): def teardown_infra( self, project: str, tables: Sequence[FeatureView], entities: Sequence[Entity], ) -> None: - self.online_store.teardown(self.repo_config, tables, entities) + if self.online_store: + self.online_store.teardown(self.repo_config, tables, entities) if ( self.repo_config.feature_server is not None diff --git a/sdk/python/feast/infra/local.py b/sdk/python/feast/infra/local.py index 060ac64d53..c5a15c8a91 100644 --- a/sdk/python/feast/infra/local.py +++ b/sdk/python/feast/infra/local.py @@ -19,11 +19,12 @@ class LocalProvider(PassthroughProvider): def plan_infra( self, config: RepoConfig, desired_registry_proto: RegistryProto ) -> Infra: - infra_objects: List[InfraObject] = self.online_store.plan( - config, desired_registry_proto - ) infra = Infra() - infra.infra_objects += infra_objects + if self.online_store: + infra_objects: List[InfraObject] = self.online_store.plan( + config, desired_registry_proto + ) + infra.infra_objects += infra_objects return infra diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py index de2aca7cc1..3468b9dc92 100644 --- a/sdk/python/feast/infra/passthrough_provider.py +++ b/sdk/python/feast/infra/passthrough_provider.py @@ -37,7 +37,11 @@ def __init__(self, config: RepoConfig): self.repo_config = config self.offline_store = get_offline_store_from_config(config.offline_store) - self.online_store = get_online_store_from_config(config.online_store) + self.online_store = ( + get_online_store_from_config(config.online_store) + if config.online_store + else None + ) def update_infra( self, @@ -49,20 +53,24 @@ def update_infra( partial: bool, ): set_usage_attribute("provider", self.__class__.__name__) - self.online_store.update( - config=self.repo_config, - tables_to_delete=tables_to_delete, - tables_to_keep=tables_to_keep, - entities_to_keep=entities_to_keep, - entities_to_delete=entities_to_delete, - partial=partial, - ) + + # Call update only if there is an online store + if self.online_store: + self.online_store.update( + config=self.repo_config, + tables_to_delete=tables_to_delete, + tables_to_keep=tables_to_keep, + entities_to_keep=entities_to_keep, + entities_to_delete=entities_to_delete, + partial=partial, + ) def teardown_infra( self, project: str, tables: Sequence[FeatureView], entities: Sequence[Entity], ) -> None: set_usage_attribute("provider", self.__class__.__name__) - self.online_store.teardown(self.repo_config, tables, entities) + if self.online_store: + self.online_store.teardown(self.repo_config, tables, entities) def online_write_batch( self, @@ -74,7 +82,8 @@ def online_write_batch( progress: Optional[Callable[[int], Any]], ) -> None: set_usage_attribute("provider", self.__class__.__name__) - self.online_store.online_write_batch(config, table, data, progress) + if self.online_store: + self.online_store.online_write_batch(config, table, data, progress) @log_exceptions_and_usage(sampler=RatioSampler(ratio=0.001)) def online_read( @@ -83,12 +92,13 @@ def online_read( table: FeatureView, entity_keys: List[EntityKeyProto], requested_features: List[str] = None, - ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: + ) -> List: set_usage_attribute("provider", self.__class__.__name__) - result = self.online_store.online_read( - config, table, entity_keys, requested_features - ) - + result = [] + if self.online_store: + result = self.online_store.online_read( + config, table, entity_keys, requested_features + ) return result def ingest_df( diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index 26309fe9d7..e8ba180568 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -152,8 +152,12 @@ def _validate_online_store_config(cls, values): if "online_store" not in values: values["online_store"] = dict() - # Skip if we aren't creating the configuration from a dict + # Skip if we aren't creating the configuration from a dict or online store is null or it is a string like "None" or "null" if not isinstance(values["online_store"], Dict): + if isinstance(values["online_store"], str) and values[ + "online_store" + ].lower() in {"none", "null"}: + values["online_store"] = None return values # Make sure that the provider configuration is set. We need it to set the defaults diff --git a/sdk/python/tests/integration/registration/test_cli.py b/sdk/python/tests/integration/registration/test_cli.py index 5dc3772265..2cf5ccd672 100644 --- a/sdk/python/tests/integration/registration/test_cli.py +++ b/sdk/python/tests/integration/registration/test_cli.py @@ -1,18 +1,32 @@ +import os import tempfile import uuid from contextlib import contextmanager from pathlib import Path from textwrap import dedent +from typing import List import pytest import yaml from assertpy import assertpy from feast import FeatureStore, RepoConfig +from tests.integration.feature_repos.integration_test_repo_config import ( + IntegrationTestRepoConfig, +) from tests.integration.feature_repos.repo_configuration import FULL_REPO_CONFIGS from tests.integration.feature_repos.universal.data_source_creator import ( DataSourceCreator, ) +from tests.integration.feature_repos.universal.data_sources.bigquery import ( + BigQueryDataSourceCreator, +) +from tests.integration.feature_repos.universal.data_sources.file import ( + FileDataSourceCreator, +) +from tests.integration.feature_repos.universal.data_sources.redshift import ( + RedshiftDataSourceCreator, +) from tests.utils.cli_utils import CliRunner, get_example_repo from tests.utils.online_read_write_test import basic_rw_test @@ -21,7 +35,6 @@ @pytest.mark.parametrize("test_repo_config", FULL_REPO_CONFIGS) def test_universal_cli(test_repo_config) -> None: project = f"test_universal_cli_{str(uuid.uuid4()).replace('-', '')[:8]}" - runner = CliRunner() with tempfile.TemporaryDirectory() as repo_dir_name: @@ -128,6 +141,56 @@ def make_feature_store_yaml(project, test_repo_config, repo_dir_name: Path): return yaml.safe_dump(config_dict) +NULLABLE_ONLINE_STORE_CONFIGS: List[IntegrationTestRepoConfig] = [ + IntegrationTestRepoConfig( + provider="local", + offline_store_creator=FileDataSourceCreator, + online_store=None, + ), +] + +if os.getenv("FEAST_IS_LOCAL_TEST", "False") == "True": + NULLABLE_ONLINE_STORE_CONFIGS.extend( + [ + IntegrationTestRepoConfig( + provider="gcp", + offline_store_creator=BigQueryDataSourceCreator, + online_store=None, + ), + IntegrationTestRepoConfig( + provider="aws", + offline_store_creator=RedshiftDataSourceCreator, + online_store=None, + ), + ] + ) + + +@pytest.mark.integration +@pytest.mark.parametrize("test_nullable_online_store", NULLABLE_ONLINE_STORE_CONFIGS) +def test_nullable_online_store(test_nullable_online_store) -> None: + project = f"test_nullable_online_store{str(uuid.uuid4()).replace('-', '')[:8]}" + runner = CliRunner() + + with tempfile.TemporaryDirectory() as repo_dir_name: + try: + feature_store_yaml = make_feature_store_yaml( + project, test_nullable_online_store, repo_dir_name + ) + repo_path = Path(repo_dir_name) + + repo_config = repo_path / "feature_store.yaml" + + repo_config.write_text(dedent(feature_store_yaml)) + + repo_example = repo_path / "example.py" + repo_example.write_text(get_example_repo("example_feature_repo_1.py")) + result = runner.run(["apply"], cwd=repo_path) + assertpy.assert_that(result.returncode).is_equal_to(0) + finally: + runner.run(["teardown"], cwd=repo_path) + + @contextmanager def setup_third_party_provider_repo(provider_name: str): with tempfile.TemporaryDirectory() as repo_dir_name: diff --git a/sdk/python/tests/integration/scaffolding/test_repo_config.py b/sdk/python/tests/integration/scaffolding/test_repo_config.py index dfa80cb618..3ec91c0044 100644 --- a/sdk/python/tests/integration/scaffolding/test_repo_config.py +++ b/sdk/python/tests/integration/scaffolding/test_repo_config.py @@ -34,6 +34,49 @@ def _test_config(config_text, expect_error: Optional[str]): return rc +def test_nullable_online_store_aws(): + _test_config( + dedent( + """ + project: foo + registry: "registry.db" + provider: aws + online_store: null + """ + ), + expect_error="__root__ -> offline_store -> cluster_id\n" + " field required (type=value_error.missing)", + ) + + +def test_nullable_online_store_gcp(): + _test_config( + dedent( + """ + project: foo + registry: "registry.db" + provider: gcp + online_store: null + """ + ), + expect_error=None, + ) + + +def test_nullable_online_store_local(): + _test_config( + dedent( + """ + project: foo + registry: "registry.db" + provider: local + online_store: null + """ + ), + expect_error=None, + ) + + def test_local_config(): _test_config( dedent(