Skip to content

Commit

Permalink
Make online store nullable (#2224)
Browse files Browse the repository at this point in the history
* make online_store optional

Signed-off-by: Miray Yuce <myuce@twitter.com>

* make online store optional

Signed-off-by: Miray Yuce <myuce@twitter.com>

* make default online store sqlite

Signed-off-by: Miray Yuce <myuce@twitter.com>

* remove unsused import

Signed-off-by: Miray Yuce <myuce@twitter.com>

* remove unsused condition

Signed-off-by: Miray Yuce <myuce@twitter.com>

* make online store nullable

Signed-off-by: Miray Yuce <myuce@twitter.com>

* delete dummy values

Signed-off-by: Miray Yuce <myuce@twitter.com>

* adding testing, addressing comments

Signed-off-by: Miray Yuce <myuce@twitter.com>

* removed return type in helpers

Signed-off-by: Miray Yuce <myuce@twitter.com>

* cleaned repo_configuration, changed test_cli

Signed-off-by: Miray Yuce <myuce@twitter.com>

* updates after review

Signed-off-by: Miray Yuce <myuce@twitter.com>

* fixing broken integration test

Signed-off-by: Miray Yuce <myuce@twitter.com>

* fix integration test

Signed-off-by: Miray Yuce <myuce@twitter.com>

* updates after review

Signed-off-by: Miray Yuce <myuce@twitter.com>

* reformat imports

Signed-off-by: Miray Yuce <myuce@twitter.com>

Co-authored-by: Miray Yuce <myuce@twitter.com>
  • Loading branch information
mirayyuce and Miray Yuce authored Jan 26, 2022
1 parent ef1884f commit 88fac8b
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 31 deletions.
21 changes: 12 additions & 9 deletions sdk/python/feast/infra/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,16 @@ def update_infra(
entities_to_keep: Sequence[Entity],
partial: bool,
):
self.online_store.update(
config=self.repo_config,
tables_to_delete=tables_to_delete,
tables_to_keep=tables_to_keep,
entities_to_keep=entities_to_keep,
entities_to_delete=entities_to_delete,
partial=partial,
)
# Call update only if there is an online store
if self.online_store:
self.online_store.update(
config=self.repo_config,
tables_to_delete=tables_to_delete,
tables_to_keep=tables_to_keep,
entities_to_keep=entities_to_keep,
entities_to_delete=entities_to_delete,
partial=partial,
)

if self.repo_config.feature_server and self.repo_config.feature_server.enabled:
if not enable_aws_lambda_feature_server(self.repo_config):
Expand Down Expand Up @@ -194,7 +196,8 @@ def _deploy_feature_server(self, project: str, image_uri: str):
def teardown_infra(
self, project: str, tables: Sequence[FeatureView], entities: Sequence[Entity],
) -> None:
self.online_store.teardown(self.repo_config, tables, entities)
if self.online_store:
self.online_store.teardown(self.repo_config, tables, entities)

if (
self.repo_config.feature_server is not None
Expand Down
9 changes: 5 additions & 4 deletions sdk/python/feast/infra/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ class LocalProvider(PassthroughProvider):
def plan_infra(
self, config: RepoConfig, desired_registry_proto: RegistryProto
) -> Infra:
infra_objects: List[InfraObject] = self.online_store.plan(
config, desired_registry_proto
)
infra = Infra()
infra.infra_objects += infra_objects
if self.online_store:
infra_objects: List[InfraObject] = self.online_store.plan(
config, desired_registry_proto
)
infra.infra_objects += infra_objects
return infra


Expand Down
42 changes: 26 additions & 16 deletions sdk/python/feast/infra/passthrough_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@ def __init__(self, config: RepoConfig):

self.repo_config = config
self.offline_store = get_offline_store_from_config(config.offline_store)
self.online_store = get_online_store_from_config(config.online_store)
self.online_store = (
get_online_store_from_config(config.online_store)
if config.online_store
else None
)

def update_infra(
self,
Expand All @@ -49,20 +53,24 @@ def update_infra(
partial: bool,
):
set_usage_attribute("provider", self.__class__.__name__)
self.online_store.update(
config=self.repo_config,
tables_to_delete=tables_to_delete,
tables_to_keep=tables_to_keep,
entities_to_keep=entities_to_keep,
entities_to_delete=entities_to_delete,
partial=partial,
)

# Call update only if there is an online store
if self.online_store:
self.online_store.update(
config=self.repo_config,
tables_to_delete=tables_to_delete,
tables_to_keep=tables_to_keep,
entities_to_keep=entities_to_keep,
entities_to_delete=entities_to_delete,
partial=partial,
)

def teardown_infra(
self, project: str, tables: Sequence[FeatureView], entities: Sequence[Entity],
) -> None:
set_usage_attribute("provider", self.__class__.__name__)
self.online_store.teardown(self.repo_config, tables, entities)
if self.online_store:
self.online_store.teardown(self.repo_config, tables, entities)

def online_write_batch(
self,
Expand All @@ -74,7 +82,8 @@ def online_write_batch(
progress: Optional[Callable[[int], Any]],
) -> None:
set_usage_attribute("provider", self.__class__.__name__)
self.online_store.online_write_batch(config, table, data, progress)
if self.online_store:
self.online_store.online_write_batch(config, table, data, progress)

@log_exceptions_and_usage(sampler=RatioSampler(ratio=0.001))
def online_read(
Expand All @@ -83,12 +92,13 @@ def online_read(
table: FeatureView,
entity_keys: List[EntityKeyProto],
requested_features: List[str] = None,
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
) -> List:
set_usage_attribute("provider", self.__class__.__name__)
result = self.online_store.online_read(
config, table, entity_keys, requested_features
)

result = []
if self.online_store:
result = self.online_store.online_read(
config, table, entity_keys, requested_features
)
return result

def ingest_df(
Expand Down
6 changes: 5 additions & 1 deletion sdk/python/feast/repo_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,12 @@ def _validate_online_store_config(cls, values):
if "online_store" not in values:
values["online_store"] = dict()

# Skip if we aren't creating the configuration from a dict
# Skip if we aren't creating the configuration from a dict or online store is null or it is a string like "None" or "null"
if not isinstance(values["online_store"], Dict):
if isinstance(values["online_store"], str) and values[
"online_store"
].lower() in {"none", "null"}:
values["online_store"] = None
return values

# Make sure that the provider configuration is set. We need it to set the defaults
Expand Down
65 changes: 64 additions & 1 deletion sdk/python/tests/integration/registration/test_cli.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,32 @@
import os
import tempfile
import uuid
from contextlib import contextmanager
from pathlib import Path
from textwrap import dedent
from typing import List

import pytest
import yaml
from assertpy import assertpy

from feast import FeatureStore, RepoConfig
from tests.integration.feature_repos.integration_test_repo_config import (
IntegrationTestRepoConfig,
)
from tests.integration.feature_repos.repo_configuration import FULL_REPO_CONFIGS
from tests.integration.feature_repos.universal.data_source_creator import (
DataSourceCreator,
)
from tests.integration.feature_repos.universal.data_sources.bigquery import (
BigQueryDataSourceCreator,
)
from tests.integration.feature_repos.universal.data_sources.file import (
FileDataSourceCreator,
)
from tests.integration.feature_repos.universal.data_sources.redshift import (
RedshiftDataSourceCreator,
)
from tests.utils.cli_utils import CliRunner, get_example_repo
from tests.utils.online_read_write_test import basic_rw_test

Expand All @@ -21,7 +35,6 @@
@pytest.mark.parametrize("test_repo_config", FULL_REPO_CONFIGS)
def test_universal_cli(test_repo_config) -> None:
project = f"test_universal_cli_{str(uuid.uuid4()).replace('-', '')[:8]}"

runner = CliRunner()

with tempfile.TemporaryDirectory() as repo_dir_name:
Expand Down Expand Up @@ -128,6 +141,56 @@ def make_feature_store_yaml(project, test_repo_config, repo_dir_name: Path):
return yaml.safe_dump(config_dict)


NULLABLE_ONLINE_STORE_CONFIGS: List[IntegrationTestRepoConfig] = [
IntegrationTestRepoConfig(
provider="local",
offline_store_creator=FileDataSourceCreator,
online_store=None,
),
]

if os.getenv("FEAST_IS_LOCAL_TEST", "False") == "True":
NULLABLE_ONLINE_STORE_CONFIGS.extend(
[
IntegrationTestRepoConfig(
provider="gcp",
offline_store_creator=BigQueryDataSourceCreator,
online_store=None,
),
IntegrationTestRepoConfig(
provider="aws",
offline_store_creator=RedshiftDataSourceCreator,
online_store=None,
),
]
)


@pytest.mark.integration
@pytest.mark.parametrize("test_nullable_online_store", NULLABLE_ONLINE_STORE_CONFIGS)
def test_nullable_online_store(test_nullable_online_store) -> None:
project = f"test_nullable_online_store{str(uuid.uuid4()).replace('-', '')[:8]}"
runner = CliRunner()

with tempfile.TemporaryDirectory() as repo_dir_name:
try:
feature_store_yaml = make_feature_store_yaml(
project, test_nullable_online_store, repo_dir_name
)
repo_path = Path(repo_dir_name)

repo_config = repo_path / "feature_store.yaml"

repo_config.write_text(dedent(feature_store_yaml))

repo_example = repo_path / "example.py"
repo_example.write_text(get_example_repo("example_feature_repo_1.py"))
result = runner.run(["apply"], cwd=repo_path)
assertpy.assert_that(result.returncode).is_equal_to(0)
finally:
runner.run(["teardown"], cwd=repo_path)


@contextmanager
def setup_third_party_provider_repo(provider_name: str):
with tempfile.TemporaryDirectory() as repo_dir_name:
Expand Down
43 changes: 43 additions & 0 deletions sdk/python/tests/integration/scaffolding/test_repo_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,49 @@ def _test_config(config_text, expect_error: Optional[str]):
return rc


def test_nullable_online_store_aws():
_test_config(
dedent(
"""
project: foo
registry: "registry.db"
provider: aws
online_store: null
"""
),
expect_error="__root__ -> offline_store -> cluster_id\n"
" field required (type=value_error.missing)",
)


def test_nullable_online_store_gcp():
_test_config(
dedent(
"""
project: foo
registry: "registry.db"
provider: gcp
online_store: null
"""
),
expect_error=None,
)


def test_nullable_online_store_local():
_test_config(
dedent(
"""
project: foo
registry: "registry.db"
provider: local
online_store: null
"""
),
expect_error=None,
)


def test_local_config():
_test_config(
dedent(
Expand Down

0 comments on commit 88fac8b

Please sign in to comment.