From f685726176db7a62e463c0a71a6f0560a50ea42e Mon Sep 17 00:00:00 2001 From: Willem Pienaar <6728866+woop@users.noreply.github.com> Date: Sun, 28 Jun 2020 17:49:24 +0800 Subject: [PATCH] Enable linting and formatting for e2e tests (#832) * Enable linting and formating for e2e pytests * Enable Mypy for e2e tests * Fix missing column on BQ polling * Fix incorrect Python SDK import path in e2e tests * Convert iterator to list in e2e tests --- Makefile | 9 + tests/e2e/bq/bq-batch-retrieval.py | 165 ++++---- tests/e2e/bq/feature-stats.py | 25 +- tests/e2e/bq/testutils.py | 9 +- tests/e2e/pyproject.toml | 22 + tests/e2e/redis/basic-ingest-redis-serving.py | 388 ++++++++++-------- tests/e2e/setup.cfg | 16 + 7 files changed, 364 insertions(+), 270 deletions(-) create mode 100644 tests/e2e/pyproject.toml create mode 100644 tests/e2e/setup.cfg diff --git a/Makefile b/Makefile index 0e6aab30ef..86615c3b47 100644 --- a/Makefile +++ b/Makefile @@ -69,14 +69,23 @@ test-python: pytest --verbose --color=yes sdk/python/tests format-python: + # Sort cd ${ROOT_DIR}/sdk/python; isort -rc feast tests + cd ${ROOT_DIR}/tests/e2e; isort -rc . + + # Format cd ${ROOT_DIR}/sdk/python; black --target-version py37 feast tests + cd ${ROOT_DIR}/tests/e2e; black --target-version py37 . lint-python: cd ${ROOT_DIR}/sdk/python; mypy feast/ tests/ cd ${ROOT_DIR}/sdk/python; flake8 feast/ tests/ cd ${ROOT_DIR}/sdk/python; black --check feast tests + cd ${ROOT_DIR}/tests/e2e; mypy bq/ redis/ + cd ${ROOT_DIR}/tests/e2e; flake8 . + cd ${ROOT_DIR}/tests/e2e; black --check . + # Go SDK install-go-ci-dependencies: diff --git a/tests/e2e/bq/bq-batch-retrieval.py b/tests/e2e/bq/bq-batch-retrieval.py index 3918f69fb3..cdc892101a 100644 --- a/tests/e2e/bq/bq-batch-retrieval.py +++ b/tests/e2e/bq/bq-batch-retrieval.py @@ -15,6 +15,8 @@ from google.protobuf.duration_pb2 import Duration from pandavro import to_avro +import tensorflow_data_validation as tfdv +from bq.testutils import assert_stats_equal, clear_unsupported_fields from feast.client import Client from feast.core.CoreService_pb2 import ListStoresRequest from feast.core.IngestionJob_pb2 import IngestionJobStatus @@ -22,12 +24,6 @@ from feast.feature import Feature from feast.feature_set import FeatureSet from feast.type_map import ValueType -from google.cloud import storage, bigquery -from google.cloud.storage import Blob -from google.protobuf.duration_pb2 import Duration -from pandavro import to_avro -import tensorflow_data_validation as tfdv -from bq.testutils import * pd.set_option("display.max_columns", None) @@ -74,7 +70,7 @@ def client(core_url, serving_url, allow_dirty): def wait_for(fn, timeout: timedelta, sleep=5): until = datetime.now() + timeout - last_exc = None + last_exc = BaseException() while datetime.now() <= until: try: @@ -184,7 +180,9 @@ def test_batch_get_batch_features_with_file(client): client.ingest(file_fs1, features_1_df, timeout=480) # Rename column (datetime -> event_timestamp) - features_1_df['datetime'] + pd.Timedelta(seconds=1) # adds buffer to avoid rounding errors + features_1_df["datetime"] + pd.Timedelta( + seconds=1 + ) # adds buffer to avoid rounding errors features_1_df = features_1_df.rename(columns={"datetime": "event_timestamp"}) to_avro( @@ -230,7 +228,9 @@ def test_batch_get_batch_features_with_gs_path(client, gcs_path): client.ingest(gcs_fs1, features_1_df, timeout=360) # Rename column (datetime -> event_timestamp) - features_1_df['datetime'] + pd.Timedelta(seconds=1) # adds buffer to avoid rounding errors + features_1_df["datetime"] + pd.Timedelta( + seconds=1 + ) # adds buffer to avoid rounding errors features_1_df = features_1_df.rename(columns={"datetime": "event_timestamp"}) # Output file to local @@ -342,17 +342,22 @@ def check(): feature_refs=["feature_value4"], project=PROJECT_NAME, ) - output = feature_retrieval_job.to_dataframe(timeout_sec=180).sort_values(by=["entity_id"]) + output = feature_retrieval_job.to_dataframe(timeout_sec=180).sort_values( + by=["entity_id"] + ) print(output.head(10)) assert np.allclose( output["additional_float_col"], entity_df["additional_float_col"] ) assert ( - output["additional_string_col"].to_list() - == entity_df["additional_string_col"].to_list() + output["additional_string_col"].to_list() + == entity_df["additional_string_col"].to_list() + ) + assert ( + output["feature_value4"].to_list() + == features_df["feature_value4"].to_list() ) - assert output["feature_value4"].to_list() == features_df["feature_value4"].to_list() clean_up_remote_files(feature_retrieval_job.get_avro_files()) wait_for(check, timedelta(minutes=5)) @@ -368,11 +373,11 @@ def test_batch_point_in_time_correctness_join(client): historical_df = pd.DataFrame( { "datetime": [ - time_offset - timedelta(seconds=50), - time_offset - timedelta(seconds=30), - time_offset - timedelta(seconds=10), - ] - * N_EXAMPLES, + time_offset - timedelta(seconds=50), + time_offset - timedelta(seconds=30), + time_offset - timedelta(seconds=10), + ] + * N_EXAMPLES, "entity_id": [i for i in range(N_EXAMPLES) for _ in range(3)], "feature_value5": ["WRONG", "WRONG", "CORRECT"] * N_EXAMPLES, } @@ -440,10 +445,7 @@ def test_batch_multiple_featureset_joins(client): def check(): feature_retrieval_job = client.get_batch_features( entity_rows=entity_df, - feature_refs=[ - "feature_value6", - "feature_set_2:other_feature_value7", - ], + feature_refs=["feature_value6", "feature_set_2:other_feature_value7"], project=PROJECT_NAME, ) output = feature_retrieval_job.to_dataframe(timeout_sec=180) @@ -453,7 +455,8 @@ def check(): int(i) for i in output["feature_value6"].to_list() ] assert ( - output["other_entity_id"].to_list() == output["feature_set_2__other_feature_value7"].to_list() + output["other_entity_id"].to_list() + == output["feature_set_2__other_feature_value7"].to_list() ) clean_up_remote_files(feature_retrieval_job.get_avro_files()) @@ -516,15 +519,15 @@ def infra_teardown(pytestconfig, core_url, serving_url): print("Cleaning up not required") -''' -This suite of tests tests the apply feature set - update feature set - retrieve -event sequence. It ensures that when a feature set is updated, tombstoned features +""" +This suite of tests tests the apply feature set - update feature set - retrieve +event sequence. It ensures that when a feature set is updated, tombstoned features are no longer retrieved, and added features are null for previously ingested rows. It is marked separately because of the length of time required to perform this test, due to bigquery schema caching for streaming writes. -''' +""" @pytest.fixture(scope="module") @@ -546,7 +549,7 @@ def update_featureset_dataframe(): @pytest.mark.fs_update @pytest.mark.run(order=20) def test_update_featureset_apply_featureset_and_ingest_first_subset( - client, update_featureset_dataframe + client, update_featureset_dataframe ): subset_columns = ["datetime", "entity_id", "update_feature1", "update_feature2"] subset_df = update_featureset_dataframe.iloc[:5][subset_columns] @@ -563,18 +566,23 @@ def test_update_featureset_apply_featureset_and_ingest_first_subset( def check(): feature_retrieval_job = client.get_batch_features( entity_rows=update_featureset_dataframe[["datetime", "entity_id"]].iloc[:5], - feature_refs=[ - "update_feature1", - "update_feature2", - ], - project=PROJECT_NAME + feature_refs=["update_feature1", "update_feature2"], + project=PROJECT_NAME, ) - output = feature_retrieval_job.to_dataframe(timeout_sec=180).sort_values(by=["entity_id"]) + output = feature_retrieval_job.to_dataframe(timeout_sec=180).sort_values( + by=["entity_id"] + ) print(output.head()) - assert output["update_feature1"].to_list() == subset_df["update_feature1"].to_list() - assert output["update_feature2"].to_list() == subset_df["update_feature2"].to_list() + assert ( + output["update_feature1"].to_list() + == subset_df["update_feature1"].to_list() + ) + assert ( + output["update_feature2"].to_list() + == subset_df["update_feature2"].to_list() + ) clean_up_remote_files(feature_retrieval_job.get_avro_files()) @@ -585,7 +593,7 @@ def check(): @pytest.mark.timeout(600) @pytest.mark.run(order=21) def test_update_featureset_update_featureset_and_ingest_second_subset( - client, update_featureset_dataframe + client, update_featureset_dataframe ): subset_columns = [ "datetime", @@ -621,20 +629,27 @@ def test_update_featureset_update_featureset_and_ingest_second_subset( def check(): feature_retrieval_job = client.get_batch_features( entity_rows=update_featureset_dataframe[["datetime", "entity_id"]].iloc[5:], - feature_refs=[ - "update_feature1", - "update_feature3", - "update_feature4", - ], + feature_refs=["update_feature1", "update_feature3", "update_feature4"], project=PROJECT_NAME, ) - output = feature_retrieval_job.to_dataframe(timeout_sec=180).sort_values(by=["entity_id"]) + output = feature_retrieval_job.to_dataframe(timeout_sec=180).sort_values( + by=["entity_id"] + ) print(output.head()) - assert output["update_feature1"].to_list() == subset_df["update_feature1"].to_list() - assert output["update_feature3"].to_list() == subset_df["update_feature3"].to_list() - assert output["update_feature4"].to_list() == subset_df["update_feature4"].to_list() + assert ( + output["update_feature1"].to_list() + == subset_df["update_feature1"].to_list() + ) + assert ( + output["update_feature3"].to_list() + == subset_df["update_feature3"].to_list() + ) + assert ( + output["update_feature4"].to_list() + == subset_df["update_feature4"].to_list() + ) clean_up_remote_files(feature_retrieval_job.get_avro_files()) wait_for(check, timedelta(minutes=5)) @@ -662,19 +677,17 @@ def test_update_featureset_retrieve_all_fields(client, update_featureset_datafra def test_update_featureset_retrieve_valid_fields(client, update_featureset_dataframe): feature_retrieval_job = client.get_batch_features( entity_rows=update_featureset_dataframe[["datetime", "entity_id"]], - feature_refs=[ - "update_feature1", - "update_feature3", - "update_feature4", - ], + feature_refs=["update_feature1", "update_feature3", "update_feature4"], project=PROJECT_NAME, ) - output = feature_retrieval_job.to_dataframe(timeout_sec=180).sort_values(by=["entity_id"]) + output = feature_retrieval_job.to_dataframe(timeout_sec=180).sort_values( + by=["entity_id"] + ) clean_up_remote_files(feature_retrieval_job.get_avro_files()) print(output.head(10)) assert ( - output["update_feature1"].to_list() - == update_featureset_dataframe["update_feature1"].to_list() + output["update_feature1"].to_list() + == update_featureset_dataframe["update_feature1"].to_list() ) # we have to convert to float because the column contains np.NaN assert [math.isnan(i) for i in output["update_feature3"].to_list()[:5]] == [ @@ -684,8 +697,8 @@ def test_update_featureset_retrieve_valid_fields(client, update_featureset_dataf float(i) for i in update_featureset_dataframe["update_feature3"].to_list()[5:] ] assert ( - output["update_feature4"].to_list() - == [None] * 5 + update_featureset_dataframe["update_feature4"].to_list()[5:] + output["update_feature4"].to_list() + == [None] * 5 + update_featureset_dataframe["update_feature4"].to_list()[5:] ) @@ -697,31 +710,31 @@ def test_batch_dataset_statistics(client): fs2 = client.get_feature_set(name="feature_set_2") id_offset = 20 - N_ROWS = 21 + n_rows = 21 time_offset = datetime.utcnow().replace(tzinfo=pytz.utc) features_1_df = pd.DataFrame( { - "datetime": [time_offset] * N_ROWS, - "entity_id": [id_offset + i for i in range(N_ROWS)], - "feature_value6": ["a" for i in range(N_ROWS)], + "datetime": [time_offset] * n_rows, + "entity_id": [id_offset + i for i in range(n_rows)], + "feature_value6": ["a" for i in range(n_rows)], } ) ingestion_id1 = client.ingest(fs1, features_1_df) features_2_df = pd.DataFrame( { - "datetime": [time_offset] * N_ROWS, - "other_entity_id": [id_offset + i for i in range(N_ROWS)], - "other_feature_value7": [int(i) % 10 for i in range(0, N_ROWS)], + "datetime": [time_offset] * n_rows, + "other_entity_id": [id_offset + i for i in range(n_rows)], + "other_feature_value7": [int(i) % 10 for i in range(0, n_rows)], } ) ingestion_id2 = client.ingest(fs2, features_2_df) entity_df = pd.DataFrame( { - "datetime": [time_offset] * N_ROWS, - "entity_id": [id_offset + i for i in range(N_ROWS)], - "other_entity_id": [id_offset + i for i in range(N_ROWS)], + "datetime": [time_offset] * n_rows, + "entity_id": [id_offset + i for i in range(n_rows)], + "other_entity_id": [id_offset + i for i in range(n_rows)], } ) @@ -729,17 +742,18 @@ def test_batch_dataset_statistics(client): while True: rows_ingested1 = get_rows_ingested(client, fs1, ingestion_id1) rows_ingested2 = get_rows_ingested(client, fs2, ingestion_id2) - if rows_ingested1 == len(features_1_df) and rows_ingested2 == len(features_2_df): - print(f"Number of rows successfully ingested: {rows_ingested1}, {rows_ingested2}. Continuing.") + if rows_ingested1 == len(features_1_df) and rows_ingested2 == len( + features_2_df + ): + print( + f"Number of rows successfully ingested: {rows_ingested1}, {rows_ingested2}. Continuing." + ) break time.sleep(30) feature_retrieval_job = client.get_batch_features( entity_rows=entity_df, - feature_refs=[ - "feature_value6", - "feature_set_2:other_feature_value7", - ], + feature_refs=["feature_value6", "feature_set_2:other_feature_value7"], project=PROJECT_NAME, compute_statistics=True, ) @@ -765,9 +779,9 @@ def test_batch_dataset_statistics(client): def get_rows_ingested( - client: Client, feature_set: FeatureSet, ingestion_id: str + client: Client, feature_set: FeatureSet, ingestion_id: str ) -> int: - response = client._core_service_stub.ListStores( + response = client._core_service.ListStores( ListStoresRequest(filter=ListStoresRequest.Filter(name="historical")) ) bq_config = response.store[0].bigquery_config @@ -780,8 +794,7 @@ def get_rows_ingested( f'SELECT COUNT(*) as count FROM `{project}.{dataset}.{table}` WHERE ingestion_id = "{ingestion_id}"' ).result() - for row in rows: - return row["count"] + return list(rows)[0]["count"] def clean_up_remote_files(files): diff --git a/tests/e2e/bq/feature-stats.py b/tests/e2e/bq/feature-stats.py index b145f891fe..995b9ee90c 100644 --- a/tests/e2e/bq/feature-stats.py +++ b/tests/e2e/bq/feature-stats.py @@ -7,19 +7,18 @@ import pytest import pytz from google.protobuf.duration_pb2 import Duration -from google.protobuf.json_format import MessageToDict import tensorflow_data_validation as tfdv -from deepdiff import DeepDiff +from bq.testutils import ( + assert_stats_equal, + clear_unsupported_agg_fields, + clear_unsupported_fields, +) from feast.client import Client from feast.entity import Entity from feast.feature import Feature from feast.feature_set import FeatureSet from feast.type_map import ValueType -from google.protobuf.duration_pb2 import Duration -import tensorflow_data_validation as tfdv -from bq.testutils import * - pd.set_option("display.max_columns", None) @@ -85,16 +84,16 @@ def feature_stats_feature_set(client): @pytest.fixture(scope="module") def feature_stats_dataset_basic(client, feature_stats_feature_set): - N_ROWS = 20 + n_rows = 20 time_offset = datetime.utcnow().replace(tzinfo=pytz.utc) df = pd.DataFrame( { - "datetime": [time_offset] * N_ROWS, - "entity_id": [i for i in range(N_ROWS)], - "strings": ["a", "b"] * int(N_ROWS / 2), - "ints": [int(i) for i in range(N_ROWS)], - "floats": [10.5 - i for i in range(N_ROWS)], + "datetime": [time_offset] * n_rows, + "entity_id": [i for i in range(n_rows)], + "strings": ["a", "b"] * int(n_rows / 2), + "ints": [int(i) for i in range(n_rows)], + "floats": [10.5 - i for i in range(n_rows)], } ) @@ -175,7 +174,7 @@ def feature_stats_dataset_agg(client, feature_stats_feature_set): def test_feature_stats_retrieval_by_single_dataset(client, feature_stats_dataset_basic): stats = client.get_statistics( - f"feature_stats", + "feature_stats", features=["strings", "ints", "floats"], store=STORE_NAME, ingestion_ids=[feature_stats_dataset_basic["id"]], diff --git a/tests/e2e/bq/testutils.py b/tests/e2e/bq/testutils.py index ea1831fdd1..7327b2c763 100644 --- a/tests/e2e/bq/testutils.py +++ b/tests/e2e/bq/testutils.py @@ -1,6 +1,7 @@ -from deepdiff import DeepDiff from google.protobuf.json_format import MessageToDict +from deepdiff import DeepDiff + def clear_unsupported_fields(datasets): dataset = datasets.datasets[0] @@ -44,10 +45,12 @@ def assert_stats_equal(left, right): left_stats = MessageToDict(left)["datasets"][0] right_stats = MessageToDict(right)["datasets"][0] assert ( - left_stats["numExamples"] == right_stats["numExamples"] + left_stats["numExamples"] == right_stats["numExamples"] ), f"Number of examples do not match. Expected {left_stats['numExamples']}, got {right_stats['numExamples']}" left_features = sorted(left_stats["features"], key=lambda k: k["path"]["step"][0]) right_features = sorted(right_stats["features"], key=lambda k: k["path"]["step"][0]) diff = DeepDiff(left_features, right_features, significant_digits=3) - assert len(diff) == 0, f"Feature statistics do not match: \nwanted: {left_features}\n got: {right_features}" + assert ( + len(diff) == 0 + ), f"Feature statistics do not match: \nwanted: {left_features}\n got: {right_features}" diff --git a/tests/e2e/pyproject.toml b/tests/e2e/pyproject.toml new file mode 100644 index 0000000000..e63ebc1f1a --- /dev/null +++ b/tests/e2e/pyproject.toml @@ -0,0 +1,22 @@ +[tool.black] +line-length = 88 +target-version = ['py37'] +include = '\.pyi?$' +exclude = ''' +( + /( + \.eggs # exclude a few common directories in the + | \.git # root of the project + | \.hg + | \.mypy_cache + | \.tox + | \.venv + | _build + | buck-out + | build + | dist + | pb2.py + | \.pyi + )/ +) +''' \ No newline at end of file diff --git a/tests/e2e/redis/basic-ingest-redis-serving.py b/tests/e2e/redis/basic-ingest-redis-serving.py index f3b1dacc62..66d89cb396 100644 --- a/tests/e2e/redis/basic-ingest-redis-serving.py +++ b/tests/e2e/redis/basic-ingest-redis-serving.py @@ -3,24 +3,6 @@ import random import tempfile import time -import grpc -from collections import OrderedDict -from feast.entity import Entity -from feast.source import KafkaSource -from feast.serving.ServingService_pb2 import ( - GetOnlineFeaturesRequest, - GetOnlineFeaturesResponse, -) -from feast.core.IngestionJob_pb2 import IngestionJobStatus -from feast.core.CoreService_pb2_grpc import CoreServiceStub -from feast.core import CoreService_pb2 -from feast.types.Value_pb2 import Value -from feast.client import Client -from feast.feature_set import FeatureSet, FeatureSetRef -from feast.type_map import ValueType -from feast.wait import wait_retry_backoff -from feast.constants import FEAST_DEFAULT_OPTIONS, CONFIG_PROJECT_KEY -from google.protobuf.duration_pb2 import Duration import uuid from datetime import datetime @@ -32,22 +14,27 @@ from google.protobuf.duration_pb2 import Duration from feast.client import Client -from feast.constants import CONFIG_PROJECT_KEY, FEAST_DEFAULT_OPTIONS from feast.core import CoreService_pb2 +from feast.core.CoreService_pb2 import ApplyFeatureSetResponse, GetFeatureSetResponse from feast.core.CoreService_pb2_grpc import CoreServiceStub from feast.core.IngestionJob_pb2 import IngestionJobStatus from feast.entity import Entity from feast.feature import Feature from feast.feature_set import FeatureSet, FeatureSetRef -from feast.serving.ServingService_pb2 import (GetOnlineFeaturesRequest, - GetOnlineFeaturesResponse) +from feast.serving.ServingService_pb2 import ( + GetOnlineFeaturesRequest, + GetOnlineFeaturesResponse, +) +from feast.source import KafkaSource from feast.type_map import ValueType from feast.types.Value_pb2 import Value as Value +from feast.wait import wait_retry_backoff FLOAT_TOLERANCE = 0.00001 -PROJECT_NAME = 'basic_' + uuid.uuid4().hex.upper()[0:6] +PROJECT_NAME = "basic_" + uuid.uuid4().hex.upper()[0:6] DIR_PATH = os.path.dirname(os.path.realpath(__file__)) + def basic_dataframe(entities, features, ingest_time, n_size, null_features=[]): """ Generate a basic feast-ingestable dataframe for testing. @@ -60,10 +47,8 @@ def basic_dataframe(entities, features, ingest_time, n_size, null_features=[]): null_features - names of features that contain null values Returns the generated dataframe """ - offset = random.randint(1000, 100000) # ensure a unique key space is used df_dict = { - "datetime": [ingest_time.replace(tzinfo=pytz.utc) for _ in - range(n_size)], + "datetime": [ingest_time.replace(tzinfo=pytz.utc) for _ in range(n_size)], } for entity_name in entities: df_dict[entity_name] = list(range(1, n_size + 1)) @@ -73,6 +58,7 @@ def basic_dataframe(entities, features, ingest_time, n_size, null_features=[]): df_dict[null_feature_name] = [None for _ in range(n_size)] return pd.DataFrame(df_dict) + def check_online_response(feature_ref, ingest_df, response): """ Check the feature value and status in the given online serving response. @@ -99,36 +85,37 @@ def check_online_response(feature_ref, ingest_df, response): ) -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def core_url(pytestconfig): return pytestconfig.getoption("core_url") -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def serving_url(pytestconfig): return pytestconfig.getoption("serving_url") -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def allow_dirty(pytestconfig): - return True if pytestconfig.getoption( - "allow_dirty").lower() == "true" else False + return True if pytestconfig.getoption("allow_dirty").lower() == "true" else False -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def enable_auth(pytestconfig): return pytestconfig.getoption("enable_auth") - - -@pytest.fixture(scope='module') + + +@pytest.fixture(scope="module") def client(core_url, serving_url, allow_dirty, enable_auth): # Get client for core and serving - # if enable_auth is True, Google Id token will be - # passed in the metadata for authentication. - client = Client(core_url=core_url, - serving_url=serving_url, - core_enable_auth=enable_auth, - core_auth_provider="google") + # if enable_auth is True, Google Id token will be + # passed in the metadata for authentication. + client = Client( + core_url=core_url, + serving_url=serving_url, + core_enable_auth=enable_auth, + core_auth_provider="google", + ) client.create_project(PROJECT_NAME) # Ensure Feast core is active, but empty @@ -149,30 +136,38 @@ def ingest_time(): @pytest.fixture(scope="module") def cust_trans_df(ingest_time): - return basic_dataframe(entities=["customer_id"], - features=["daily_transactions", "total_transactions"], - null_features=["null_values"], - ingest_time=ingest_time, - n_size=5) + return basic_dataframe( + entities=["customer_id"], + features=["daily_transactions", "total_transactions"], + null_features=["null_values"], + ingest_time=ingest_time, + n_size=5, + ) + @pytest.fixture(scope="module") def driver_df(ingest_time): - return basic_dataframe(entities=["driver_id"], - features=["rating", "cost"], - ingest_time=ingest_time, - n_size=5) + return basic_dataframe( + entities=["driver_id"], + features=["rating", "cost"], + ingest_time=ingest_time, + n_size=5, + ) + def test_version_returns_results(client): version_info = client.version() - assert not version_info['core'] is 'not configured' - assert not version_info['serving'] is 'not configured' + assert not version_info["core"] == "not configured" + assert not version_info["serving"] == "not configured" @pytest.mark.timeout(45) @pytest.mark.run(order=10) def test_basic_register_feature_set_success(client): # Register feature set without project - cust_trans_fs_expected = FeatureSet.from_yaml(f"{DIR_PATH}/basic/cust_trans_fs.yaml") + cust_trans_fs_expected = FeatureSet.from_yaml( + f"{DIR_PATH}/basic/cust_trans_fs.yaml" + ) driver_fs_expected = FeatureSet.from_yaml(f"{DIR_PATH}/basic/driver_fs.yaml") client.apply(cust_trans_fs_expected) client.apply(driver_fs_expected) @@ -182,32 +177,29 @@ def test_basic_register_feature_set_success(client): assert driver_fs_actual == driver_fs_expected # Register feature set with project - cust_trans_fs_expected = FeatureSet.from_yaml(f"{DIR_PATH}/basic/cust_trans_fs.yaml") + cust_trans_fs_expected = FeatureSet.from_yaml( + f"{DIR_PATH}/basic/cust_trans_fs.yaml" + ) client.set_project(PROJECT_NAME) client.apply(cust_trans_fs_expected) - cust_trans_fs_actual = client.get_feature_set("customer_transactions", - project=PROJECT_NAME) + cust_trans_fs_actual = client.get_feature_set( + "customer_transactions", project=PROJECT_NAME + ) assert cust_trans_fs_actual == cust_trans_fs_expected # Register feature set with labels driver_unlabelled_fs = FeatureSet( "driver_unlabelled", - features=[ - Feature("rating", ValueType.FLOAT), - Feature("cost", ValueType.FLOAT) - ], + features=[Feature("rating", ValueType.FLOAT), Feature("cost", ValueType.FLOAT)], entities=[Entity("entity_id", ValueType.INT64)], - max_age=Duration(seconds=100) + max_age=Duration(seconds=100), ) driver_labeled_fs_expected = FeatureSet( "driver_labeled", - features=[ - Feature("rating", ValueType.FLOAT), - Feature("cost", ValueType.FLOAT) - ], + features=[Feature("rating", ValueType.FLOAT), Feature("cost", ValueType.FLOAT)], entities=[Entity("entity_id", ValueType.INT64)], max_age=Duration(seconds=100), - labels={"key1":"val1"} + labels={"key1": "val1"}, ) client.set_project(PROJECT_NAME) client.apply(driver_unlabelled_fs) @@ -236,11 +228,8 @@ def test_basic_ingest_success(client, cust_trans_df, driver_df): @pytest.mark.timeout(90) @pytest.mark.run(order=12) def test_basic_retrieve_online_success(client, cust_trans_df): - feature_refs=[ - "daily_transactions", - "total_transactions", - "null_values" - ] + feature_refs = ["daily_transactions", "total_transactions", "null_values"] + # Poll serving for feature values until the correct values are returned def try_get_features(): response = client.get_online_features( @@ -254,13 +243,21 @@ def try_get_features(): ) ], feature_refs=feature_refs, - )# type: GetOnlineFeaturesResponse - is_ok = all([check_online_response(ref, cust_trans_df, response) for ref in feature_refs]) + ) # type: GetOnlineFeaturesResponse + is_ok = all( + [ + check_online_response(ref, cust_trans_df, response) + for ref in feature_refs + ] + ) return response, is_ok - wait_retry_backoff(retry_fn=try_get_features, - timeout_secs=90, - timeout_msg="Timed out trying to get online feature values") + wait_retry_backoff( + retry_fn=try_get_features, + timeout_secs=90, + timeout_msg="Timed out trying to get online feature values", + ) + @pytest.mark.timeout(90) @pytest.mark.run(order=13) @@ -272,6 +269,7 @@ def test_basic_retrieve_online_multiple_featureset(client, cust_trans_df, driver ("driver:rating", driver_df), ("total_transactions", cust_trans_df), ] + # Poll serving for feature values until the correct values are returned def try_get_features(): feature_refs = [mapping[0] for mapping in feature_ref_df_mapping] @@ -282,21 +280,25 @@ def try_get_features(): "customer_id": Value( int64_val=cust_trans_df.iloc[0]["customer_id"] ), - "driver_id": Value( - int64_val=driver_df.iloc[0]["driver_id"] - ) + "driver_id": Value(int64_val=driver_df.iloc[0]["driver_id"]), } ) ], feature_refs=feature_refs, ) # type: GetOnlineFeaturesResponse - is_ok = all([check_online_response(ref, df, response) - for ref, df in feature_ref_df_mapping]) + is_ok = all( + [ + check_online_response(ref, df, response) + for ref, df in feature_ref_df_mapping + ] + ) return response, is_ok - wait_retry_backoff(retry_fn=try_get_features, - timeout_secs=90, - timeout_msg="Timed out trying to get online feature values") + wait_retry_backoff( + retry_fn=try_get_features, + timeout_secs=90, + timeout_msg="Timed out trying to get online feature values", + ) @pytest.mark.timeout(300) @@ -305,9 +307,12 @@ def test_basic_ingest_jobs(client): # list ingestion jobs given featureset cust_trans_fs = client.get_feature_set(name="customer_transactions") ingest_jobs = client.list_ingest_jobs( - feature_set_ref=FeatureSetRef.from_feature_set(cust_trans_fs)) + feature_set_ref=FeatureSetRef.from_feature_set(cust_trans_fs) + ) # filter ingestion jobs to only those that are running - ingest_jobs = [job for job in ingest_jobs if job.status == IngestionJobStatus.RUNNING] + ingest_jobs = [ + job for job in ingest_jobs if job.status == IngestionJobStatus.RUNNING + ] assert len(ingest_jobs) >= 1 for ingest_job in ingest_jobs: @@ -322,18 +327,16 @@ def test_basic_ingest_jobs(client): assert ingest_job.status == IngestionJobStatus.ABORTED -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def all_types_dataframe(): return pd.DataFrame( { - "datetime": [datetime.utcnow().replace(tzinfo=pytz.utc) for _ in - range(3)], + "datetime": [datetime.utcnow().replace(tzinfo=pytz.utc) for _ in range(3)], "user_id": [1001, 1002, 1003], "int32_feature": [np.int32(1), np.int32(2), np.int32(3)], "int64_feature": [np.int64(1), np.int64(2), np.int64(3)], "float_feature": [np.float(0.1), np.float(0.2), np.float(0.3)], - "double_feature": [np.float64(0.1), np.float64(0.2), - np.float64(0.3)], + "double_feature": [np.float64(0.1), np.float64(0.2), np.float64(0.3)], "string_feature": ["one", "two", "three"], "bytes_feature": [b"one", b"two", b"three"], "bool_feature": [True, False, False], @@ -395,8 +398,7 @@ def test_all_types_register_feature_set_success(client): Feature(name="float_list_feature", dtype=ValueType.FLOAT_LIST), Feature(name="int64_list_feature", dtype=ValueType.INT64_LIST), Feature(name="int32_list_feature", dtype=ValueType.INT32_LIST), - Feature(name="string_list_feature", - dtype=ValueType.STRING_LIST), + Feature(name="string_list_feature", dtype=ValueType.STRING_LIST), Feature(name="bytes_list_feature", dtype=ValueType.BYTES_LIST), ], max_age=Duration(seconds=3600), @@ -451,12 +453,16 @@ def test_all_types_retrieve_online_success(client, all_types_dataframe): "bytes_list_feature", "double_list_feature", ] + def try_get_features(): response = client.get_online_features( entity_rows=[ GetOnlineFeaturesRequest.EntityRow( - fields={"user_id": Value( - int64_val=all_types_dataframe.iloc[0]["user_id"])} + fields={ + "user_id": Value( + int64_val=all_types_dataframe.iloc[0]["user_id"] + ) + } ) ], feature_refs=feature_refs, @@ -464,17 +470,26 @@ def try_get_features(): is_ok = check_online_response("float_feature", all_types_dataframe, response) return response, is_ok - response = wait_retry_backoff(retry_fn=try_get_features, - timeout_secs=90, - timeout_msg="Timed out trying to get online feature values") + response = wait_retry_backoff( + retry_fn=try_get_features, + timeout_secs=90, + timeout_msg="Timed out trying to get online feature values", + ) # check returned values - returned_float_list = response.field_values[0].fields["float_list_feature"].float_list_val.val + returned_float_list = ( + response.field_values[0].fields["float_list_feature"].float_list_val.val + ) sent_float_list = all_types_dataframe.iloc[0]["float_list_feature"] - assert math.isclose(returned_float_list[0], sent_float_list[0], abs_tol=FLOAT_TOLERANCE) + assert math.isclose( + returned_float_list[0], sent_float_list[0], abs_tol=FLOAT_TOLERANCE + ) # check returned metadata - assert (response.field_values[0].statuses["float_list_feature"] - == GetOnlineFeaturesResponse.FieldStatus.PRESENT) + assert ( + response.field_values[0].statuses["float_list_feature"] + == GetOnlineFeaturesResponse.FieldStatus.PRESENT + ) + @pytest.mark.timeout(300) @pytest.mark.run(order=29) @@ -482,9 +497,12 @@ def test_all_types_ingest_jobs(client, all_types_dataframe): # list ingestion jobs given featureset all_types_fs = client.get_feature_set(name="all_types") ingest_jobs = client.list_ingest_jobs( - feature_set_ref=FeatureSetRef.from_feature_set(all_types_fs)) + feature_set_ref=FeatureSetRef.from_feature_set(all_types_fs) + ) # filter ingestion jobs to only those that are running - ingest_jobs = [job for job in ingest_jobs if job.status == IngestionJobStatus.RUNNING] + ingest_jobs = [ + job for job in ingest_jobs if job.status == IngestionJobStatus.RUNNING + ] assert len(ingest_jobs) >= 1 for ingest_job in ingest_jobs: @@ -499,15 +517,14 @@ def test_all_types_ingest_jobs(client, all_types_dataframe): assert ingest_job.status == IngestionJobStatus.ABORTED -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def large_volume_dataframe(): ROW_COUNT = 100000 offset = random.randint(1000000, 10000000) # ensure a unique key space customer_data = pd.DataFrame( { "datetime": [ - datetime.utcnow().replace(tzinfo=pytz.utc) for _ in - range(ROW_COUNT) + datetime.utcnow().replace(tzinfo=pytz.utc) for _ in range(ROW_COUNT) ], "customer_id": [offset + inc for inc in range(ROW_COUNT)], "daily_transactions_large": [np.random.rand() for _ in range(ROW_COUNT)], @@ -521,7 +538,8 @@ def large_volume_dataframe(): @pytest.mark.run(order=30) def test_large_volume_register_feature_set_success(client): cust_trans_fs_expected = FeatureSet.from_yaml( - f"{DIR_PATH}/large_volume/cust_trans_large_fs.yaml") + f"{DIR_PATH}/large_volume/cust_trans_large_fs.yaml" + ) # Register feature set client.apply(cust_trans_fs_expected) @@ -529,8 +547,7 @@ def test_large_volume_register_feature_set_success(client): # Feast Core needs some time to fully commit the FeatureSet applied # when there is no existing job yet for the Featureset time.sleep(10) - cust_trans_fs_actual = client.get_feature_set( - name="customer_transactions_large") + cust_trans_fs_actual = client.get_feature_set(name="customer_transactions_large") assert cust_trans_fs_actual == cust_trans_fs_expected @@ -557,7 +574,7 @@ def test_large_volume_ingest_success(client, large_volume_dataframe): @pytest.mark.run(order=32) def test_large_volume_retrieve_online_success(client, large_volume_dataframe): # Poll serving for feature values until the correct values are returned - feature_refs=[ + feature_refs = [ "daily_transactions_large", "total_transactions_large", ] @@ -567,64 +584,63 @@ def test_large_volume_retrieve_online_success(client, large_volume_dataframe): GetOnlineFeaturesRequest.EntityRow( fields={ "customer_id": Value( - int64_val=large_volume_dataframe.iloc[0][ - "customer_id"] + int64_val=large_volume_dataframe.iloc[0]["customer_id"] ) } ) ], feature_refs=feature_refs, ) # type: GetOnlineFeaturesResponse - is_ok = all([check_online_response(ref, large_volume_dataframe, response) for ref in feature_refs]) + is_ok = all( + [ + check_online_response(ref, large_volume_dataframe, response) + for ref in feature_refs + ] + ) return None, is_ok - wait_retry_backoff(retry_fn=try_get_features, - timeout_secs=90, - timeout_msg="Timed out trying to get online feature values") -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def all_types_parquet_file(): COUNT = 20000 df = pd.DataFrame( { "datetime": [datetime.utcnow() for _ in range(COUNT)], - "customer_id": [np.int32(random.randint(0, 10000)) for _ in - range(COUNT)], - "int32_feature_parquet": [np.int32(random.randint(0, 10000)) for _ in - range(COUNT)], - "int64_feature_parquet": [np.int64(random.randint(0, 10000)) for _ in - range(COUNT)], + "customer_id": [np.int32(random.randint(0, 10000)) for _ in range(COUNT)], + "int32_feature_parquet": [ + np.int32(random.randint(0, 10000)) for _ in range(COUNT) + ], + "int64_feature_parquet": [ + np.int64(random.randint(0, 10000)) for _ in range(COUNT) + ], "float_feature_parquet": [np.float(random.random()) for _ in range(COUNT)], - "double_feature_parquet": [np.float64(random.random()) for _ in - range(COUNT)], - "string_feature_parquet": ["one" + str(random.random()) for _ in - range(COUNT)], + "double_feature_parquet": [ + np.float64(random.random()) for _ in range(COUNT) + ], + "string_feature_parquet": [ + "one" + str(random.random()) for _ in range(COUNT) + ], "bytes_feature_parquet": [b"one" for _ in range(COUNT)], "int32_list_feature_parquet": [ np.array([1, 2, 3, random.randint(0, 10000)], dtype=np.int32) - for _ - in range(COUNT) + for _ in range(COUNT) ], "int64_list_feature_parquet": [ np.array([1, random.randint(0, 10000), 3, 4], dtype=np.int64) - for _ - in range(COUNT) + for _ in range(COUNT) ], "float_list_feature_parquet": [ - np.array([1.1, 1.2, 1.3, random.random()], dtype=np.float32) for - _ - in range(COUNT) + np.array([1.1, 1.2, 1.3, random.random()], dtype=np.float32) + for _ in range(COUNT) ], "double_list_feature_parquet": [ - np.array([1.1, 1.2, 1.3, random.random()], dtype=np.float64) for - _ - in range(COUNT) + np.array([1.1, 1.2, 1.3, random.random()], dtype=np.float64) + for _ in range(COUNT) ], "string_list_feature_parquet": [ - np.array(["one", "two" + str(random.random()), "three"]) for _ - in - range(COUNT) + np.array(["one", "two" + str(random.random()), "three"]) + for _ in range(COUNT) ], "bytes_list_feature_parquet": [ np.array([b"one", b"two", b"three"]) for _ in range(COUNT) @@ -635,7 +651,7 @@ def all_types_parquet_file(): # TODO: Boolean list is not being tested. # https://github.com/feast-dev/feast/issues/341 - file_path = os.path.join(tempfile.mkdtemp(), 'all_types.parquet') + file_path = os.path.join(tempfile.mkdtemp(), "all_types.parquet") df.to_parquet(file_path, allow_truncated_timestamps=True) return file_path @@ -645,7 +661,8 @@ def all_types_parquet_file(): def test_all_types_parquet_register_feature_set_success(client): # Load feature set from file all_types_parquet_expected = FeatureSet.from_yaml( - f"{DIR_PATH}/all_types_parquet/all_types_parquet.yaml") + f"{DIR_PATH}/all_types_parquet/all_types_parquet.yaml" + ) # Register feature set client.apply(all_types_parquet_expected) @@ -669,8 +686,7 @@ def test_all_types_parquet_register_feature_set_success(client): @pytest.mark.timeout(600) @pytest.mark.run(order=41) -def test_all_types_infer_register_ingest_file_success(client, - all_types_parquet_file): +def test_all_types_infer_register_ingest_file_success(client, all_types_parquet_file): # Get feature set all_types_fs = client.get_feature_set(name="all_types_parquet") @@ -684,43 +700,42 @@ def test_list_entities_and_features(client): customer_entity = Entity("customer_id", ValueType.INT64) driver_entity = Entity("driver_id", ValueType.INT64) - customer_feature_rating = Feature(name="rating", dtype=ValueType.FLOAT, labels={"key1":"val1"}) + customer_feature_rating = Feature( + name="rating", dtype=ValueType.FLOAT, labels={"key1": "val1"} + ) customer_feature_cost = Feature(name="cost", dtype=ValueType.FLOAT) driver_feature_rating = Feature(name="rating", dtype=ValueType.FLOAT) - driver_feature_cost = Feature(name="cost", dtype=ValueType.FLOAT, labels={"key1":"val1"}) + driver_feature_cost = Feature( + name="cost", dtype=ValueType.FLOAT, labels={"key1": "val1"} + ) - filter_by_project_entity_labels_expected = dict([ - ("customer:rating", customer_feature_rating) - ]) + filter_by_project_entity_labels_expected = dict( + [("customer:rating", customer_feature_rating)] + ) - filter_by_project_entity_expected = dict([ - ("driver:cost", driver_feature_cost), - ("driver:rating", driver_feature_rating) - ]) + filter_by_project_entity_expected = dict( + [("driver:cost", driver_feature_cost), ("driver:rating", driver_feature_rating)] + ) - filter_by_project_labels_expected = dict([ - ("customer:rating", customer_feature_rating), - ("driver:cost", driver_feature_cost) - ]) + filter_by_project_labels_expected = dict( + [ + ("customer:rating", customer_feature_rating), + ("driver:cost", driver_feature_cost), + ] + ) customer_fs = FeatureSet( "customer", - features=[ - customer_feature_rating, - customer_feature_cost - ], + features=[customer_feature_rating, customer_feature_cost], entities=[customer_entity], - max_age=Duration(seconds=100) + max_age=Duration(seconds=100), ) driver_fs = FeatureSet( "driver", - features=[ - driver_feature_rating, - driver_feature_cost - ], + features=[driver_feature_rating, driver_feature_cost], entities=[driver_entity], - max_age=Duration(seconds=100) + max_age=Duration(seconds=100), ) client.set_project(PROJECT_NAME) @@ -729,17 +744,30 @@ def test_list_entities_and_features(client): # Test for listing of features # Case 1: Filter by: project, entities and labels - filter_by_project_entity_labels_actual = client.list_features_by_ref(project=PROJECT_NAME, entities=["customer_id"], labels={"key1":"val1"}) + filter_by_project_entity_labels_actual = client.list_features_by_ref( + project=PROJECT_NAME, entities=["customer_id"], labels={"key1": "val1"} + ) # Case 2: Filter by: project, entities - filter_by_project_entity_actual = client.list_features_by_ref(project=PROJECT_NAME, entities=["driver_id"]) + filter_by_project_entity_actual = client.list_features_by_ref( + project=PROJECT_NAME, entities=["driver_id"] + ) # Case 3: Filter by: project, labels - filter_by_project_labels_actual = client.list_features_by_ref(project=PROJECT_NAME, labels={"key1":"val1"}) + filter_by_project_labels_actual = client.list_features_by_ref( + project=PROJECT_NAME, labels={"key1": "val1"} + ) + + assert set(filter_by_project_entity_labels_expected) == set( + filter_by_project_entity_labels_actual + ) + assert set(filter_by_project_entity_expected) == set( + filter_by_project_entity_actual + ) + assert set(filter_by_project_labels_expected) == set( + filter_by_project_labels_actual + ) - assert set(filter_by_project_entity_labels_expected) == set(filter_by_project_entity_labels_actual) - assert set(filter_by_project_entity_expected) == set(filter_by_project_entity_actual) - assert set(filter_by_project_labels_expected) == set(filter_by_project_labels_actual) @pytest.mark.timeout(900) @pytest.mark.run(order=60) @@ -748,7 +776,11 @@ def test_sources_deduplicate_ingest_jobs(client): alt_source = KafkaSource("localhost:9092", "feast-data") def get_running_jobs(): - return [ job for job in client.list_ingest_jobs() if job.status == IngestionJobStatus.RUNNING ] + return [ + job + for job in client.list_ingest_jobs() + if job.status == IngestionJobStatus.RUNNING + ] # stop all ingest jobs ingest_jobs = client.list_ingest_jobs() @@ -795,14 +827,14 @@ class TestsBasedOnGrpc: @pytest.fixture(scope="module") def core_service_stub(self, core_url): if core_url.endswith(":443"): - core_channel = grpc.secure_channel( - core_url, grpc.ssl_channel_credentials() - ) + core_channel = grpc.secure_channel(core_url, grpc.ssl_channel_credentials()) else: core_channel = grpc.insecure_channel(core_url) try: - grpc.channel_ready_future(core_channel).result(timeout=self.GRPC_CONNECTION_TIMEOUT) + grpc.channel_ready_future(core_channel).result( + timeout=self.GRPC_CONNECTION_TIMEOUT + ) except grpc.FutureTimeoutError: raise ConnectionError( f"Connection timed out while attempting to connect to Feast " diff --git a/tests/e2e/setup.cfg b/tests/e2e/setup.cfg new file mode 100644 index 0000000000..02710d242d --- /dev/null +++ b/tests/e2e/setup.cfg @@ -0,0 +1,16 @@ +[isort] +multi_line_output=3 +include_trailing_comma=True +force_grid_wrap=0 +use_parentheses=True +line_length=88 + +[flake8] +ignore = E203, E266, E501, W503 +max-line-length = 88 +max-complexity = 20 +select = B,C,E,F,W,T4 + +[mypy] +files=bq,redis +ignore_missing_imports=true \ No newline at end of file