Skip to content

Commit

Permalink
Enable linting and formatting for e2e tests (#832)
Browse files Browse the repository at this point in the history
* Enable linting and formating for e2e pytests

* Enable Mypy for e2e tests

* Fix missing column on BQ polling

* Fix incorrect Python SDK import path in e2e tests

* Convert iterator to list in e2e tests
  • Loading branch information
woop authored Jun 28, 2020
1 parent 9ea1f52 commit f685726
Show file tree
Hide file tree
Showing 7 changed files with 364 additions and 270 deletions.
9 changes: 9 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,23 @@ test-python:
pytest --verbose --color=yes sdk/python/tests

format-python:
# Sort
cd ${ROOT_DIR}/sdk/python; isort -rc feast tests
cd ${ROOT_DIR}/tests/e2e; isort -rc .

# Format
cd ${ROOT_DIR}/sdk/python; black --target-version py37 feast tests
cd ${ROOT_DIR}/tests/e2e; black --target-version py37 .

lint-python:
cd ${ROOT_DIR}/sdk/python; mypy feast/ tests/
cd ${ROOT_DIR}/sdk/python; flake8 feast/ tests/
cd ${ROOT_DIR}/sdk/python; black --check feast tests

cd ${ROOT_DIR}/tests/e2e; mypy bq/ redis/
cd ${ROOT_DIR}/tests/e2e; flake8 .
cd ${ROOT_DIR}/tests/e2e; black --check .

# Go SDK

install-go-ci-dependencies:
Expand Down
165 changes: 89 additions & 76 deletions tests/e2e/bq/bq-batch-retrieval.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,15 @@
from google.protobuf.duration_pb2 import Duration
from pandavro import to_avro

import tensorflow_data_validation as tfdv
from bq.testutils import assert_stats_equal, clear_unsupported_fields
from feast.client import Client
from feast.core.CoreService_pb2 import ListStoresRequest
from feast.core.IngestionJob_pb2 import IngestionJobStatus
from feast.entity import Entity
from feast.feature import Feature
from feast.feature_set import FeatureSet
from feast.type_map import ValueType
from google.cloud import storage, bigquery
from google.cloud.storage import Blob
from google.protobuf.duration_pb2 import Duration
from pandavro import to_avro
import tensorflow_data_validation as tfdv
from bq.testutils import *

pd.set_option("display.max_columns", None)

Expand Down Expand Up @@ -74,7 +70,7 @@ def client(core_url, serving_url, allow_dirty):

def wait_for(fn, timeout: timedelta, sleep=5):
until = datetime.now() + timeout
last_exc = None
last_exc = BaseException()

while datetime.now() <= until:
try:
Expand Down Expand Up @@ -184,7 +180,9 @@ def test_batch_get_batch_features_with_file(client):
client.ingest(file_fs1, features_1_df, timeout=480)

# Rename column (datetime -> event_timestamp)
features_1_df['datetime'] + pd.Timedelta(seconds=1) # adds buffer to avoid rounding errors
features_1_df["datetime"] + pd.Timedelta(
seconds=1
) # adds buffer to avoid rounding errors
features_1_df = features_1_df.rename(columns={"datetime": "event_timestamp"})

to_avro(
Expand Down Expand Up @@ -230,7 +228,9 @@ def test_batch_get_batch_features_with_gs_path(client, gcs_path):
client.ingest(gcs_fs1, features_1_df, timeout=360)

# Rename column (datetime -> event_timestamp)
features_1_df['datetime'] + pd.Timedelta(seconds=1) # adds buffer to avoid rounding errors
features_1_df["datetime"] + pd.Timedelta(
seconds=1
) # adds buffer to avoid rounding errors
features_1_df = features_1_df.rename(columns={"datetime": "event_timestamp"})

# Output file to local
Expand Down Expand Up @@ -342,17 +342,22 @@ def check():
feature_refs=["feature_value4"],
project=PROJECT_NAME,
)
output = feature_retrieval_job.to_dataframe(timeout_sec=180).sort_values(by=["entity_id"])
output = feature_retrieval_job.to_dataframe(timeout_sec=180).sort_values(
by=["entity_id"]
)
print(output.head(10))

assert np.allclose(
output["additional_float_col"], entity_df["additional_float_col"]
)
assert (
output["additional_string_col"].to_list()
== entity_df["additional_string_col"].to_list()
output["additional_string_col"].to_list()
== entity_df["additional_string_col"].to_list()
)
assert (
output["feature_value4"].to_list()
== features_df["feature_value4"].to_list()
)
assert output["feature_value4"].to_list() == features_df["feature_value4"].to_list()
clean_up_remote_files(feature_retrieval_job.get_avro_files())

wait_for(check, timedelta(minutes=5))
Expand All @@ -368,11 +373,11 @@ def test_batch_point_in_time_correctness_join(client):
historical_df = pd.DataFrame(
{
"datetime": [
time_offset - timedelta(seconds=50),
time_offset - timedelta(seconds=30),
time_offset - timedelta(seconds=10),
]
* N_EXAMPLES,
time_offset - timedelta(seconds=50),
time_offset - timedelta(seconds=30),
time_offset - timedelta(seconds=10),
]
* N_EXAMPLES,
"entity_id": [i for i in range(N_EXAMPLES) for _ in range(3)],
"feature_value5": ["WRONG", "WRONG", "CORRECT"] * N_EXAMPLES,
}
Expand Down Expand Up @@ -440,10 +445,7 @@ def test_batch_multiple_featureset_joins(client):
def check():
feature_retrieval_job = client.get_batch_features(
entity_rows=entity_df,
feature_refs=[
"feature_value6",
"feature_set_2:other_feature_value7",
],
feature_refs=["feature_value6", "feature_set_2:other_feature_value7"],
project=PROJECT_NAME,
)
output = feature_retrieval_job.to_dataframe(timeout_sec=180)
Expand All @@ -453,7 +455,8 @@ def check():
int(i) for i in output["feature_value6"].to_list()
]
assert (
output["other_entity_id"].to_list() == output["feature_set_2__other_feature_value7"].to_list()
output["other_entity_id"].to_list()
== output["feature_set_2__other_feature_value7"].to_list()
)
clean_up_remote_files(feature_retrieval_job.get_avro_files())

Expand Down Expand Up @@ -516,15 +519,15 @@ def infra_teardown(pytestconfig, core_url, serving_url):
print("Cleaning up not required")


'''
This suite of tests tests the apply feature set - update feature set - retrieve
event sequence. It ensures that when a feature set is updated, tombstoned features
"""
This suite of tests tests the apply feature set - update feature set - retrieve
event sequence. It ensures that when a feature set is updated, tombstoned features
are no longer retrieved, and added features are null for previously ingested
rows.
It is marked separately because of the length of time required
to perform this test, due to bigquery schema caching for streaming writes.
'''
"""


@pytest.fixture(scope="module")
Expand All @@ -546,7 +549,7 @@ def update_featureset_dataframe():
@pytest.mark.fs_update
@pytest.mark.run(order=20)
def test_update_featureset_apply_featureset_and_ingest_first_subset(
client, update_featureset_dataframe
client, update_featureset_dataframe
):
subset_columns = ["datetime", "entity_id", "update_feature1", "update_feature2"]
subset_df = update_featureset_dataframe.iloc[:5][subset_columns]
Expand All @@ -563,18 +566,23 @@ def test_update_featureset_apply_featureset_and_ingest_first_subset(
def check():
feature_retrieval_job = client.get_batch_features(
entity_rows=update_featureset_dataframe[["datetime", "entity_id"]].iloc[:5],
feature_refs=[
"update_feature1",
"update_feature2",
],
project=PROJECT_NAME
feature_refs=["update_feature1", "update_feature2"],
project=PROJECT_NAME,
)

output = feature_retrieval_job.to_dataframe(timeout_sec=180).sort_values(by=["entity_id"])
output = feature_retrieval_job.to_dataframe(timeout_sec=180).sort_values(
by=["entity_id"]
)
print(output.head())

assert output["update_feature1"].to_list() == subset_df["update_feature1"].to_list()
assert output["update_feature2"].to_list() == subset_df["update_feature2"].to_list()
assert (
output["update_feature1"].to_list()
== subset_df["update_feature1"].to_list()
)
assert (
output["update_feature2"].to_list()
== subset_df["update_feature2"].to_list()
)

clean_up_remote_files(feature_retrieval_job.get_avro_files())

Expand All @@ -585,7 +593,7 @@ def check():
@pytest.mark.timeout(600)
@pytest.mark.run(order=21)
def test_update_featureset_update_featureset_and_ingest_second_subset(
client, update_featureset_dataframe
client, update_featureset_dataframe
):
subset_columns = [
"datetime",
Expand Down Expand Up @@ -621,20 +629,27 @@ def test_update_featureset_update_featureset_and_ingest_second_subset(
def check():
feature_retrieval_job = client.get_batch_features(
entity_rows=update_featureset_dataframe[["datetime", "entity_id"]].iloc[5:],
feature_refs=[
"update_feature1",
"update_feature3",
"update_feature4",
],
feature_refs=["update_feature1", "update_feature3", "update_feature4"],
project=PROJECT_NAME,
)

output = feature_retrieval_job.to_dataframe(timeout_sec=180).sort_values(by=["entity_id"])
output = feature_retrieval_job.to_dataframe(timeout_sec=180).sort_values(
by=["entity_id"]
)
print(output.head())

assert output["update_feature1"].to_list() == subset_df["update_feature1"].to_list()
assert output["update_feature3"].to_list() == subset_df["update_feature3"].to_list()
assert output["update_feature4"].to_list() == subset_df["update_feature4"].to_list()
assert (
output["update_feature1"].to_list()
== subset_df["update_feature1"].to_list()
)
assert (
output["update_feature3"].to_list()
== subset_df["update_feature3"].to_list()
)
assert (
output["update_feature4"].to_list()
== subset_df["update_feature4"].to_list()
)
clean_up_remote_files(feature_retrieval_job.get_avro_files())

wait_for(check, timedelta(minutes=5))
Expand Down Expand Up @@ -662,19 +677,17 @@ def test_update_featureset_retrieve_all_fields(client, update_featureset_datafra
def test_update_featureset_retrieve_valid_fields(client, update_featureset_dataframe):
feature_retrieval_job = client.get_batch_features(
entity_rows=update_featureset_dataframe[["datetime", "entity_id"]],
feature_refs=[
"update_feature1",
"update_feature3",
"update_feature4",
],
feature_refs=["update_feature1", "update_feature3", "update_feature4"],
project=PROJECT_NAME,
)
output = feature_retrieval_job.to_dataframe(timeout_sec=180).sort_values(by=["entity_id"])
output = feature_retrieval_job.to_dataframe(timeout_sec=180).sort_values(
by=["entity_id"]
)
clean_up_remote_files(feature_retrieval_job.get_avro_files())
print(output.head(10))
assert (
output["update_feature1"].to_list()
== update_featureset_dataframe["update_feature1"].to_list()
output["update_feature1"].to_list()
== update_featureset_dataframe["update_feature1"].to_list()
)
# we have to convert to float because the column contains np.NaN
assert [math.isnan(i) for i in output["update_feature3"].to_list()[:5]] == [
Expand All @@ -684,8 +697,8 @@ def test_update_featureset_retrieve_valid_fields(client, update_featureset_dataf
float(i) for i in update_featureset_dataframe["update_feature3"].to_list()[5:]
]
assert (
output["update_feature4"].to_list()
== [None] * 5 + update_featureset_dataframe["update_feature4"].to_list()[5:]
output["update_feature4"].to_list()
== [None] * 5 + update_featureset_dataframe["update_feature4"].to_list()[5:]
)


Expand All @@ -697,49 +710,50 @@ def test_batch_dataset_statistics(client):
fs2 = client.get_feature_set(name="feature_set_2")
id_offset = 20

N_ROWS = 21
n_rows = 21
time_offset = datetime.utcnow().replace(tzinfo=pytz.utc)
features_1_df = pd.DataFrame(
{
"datetime": [time_offset] * N_ROWS,
"entity_id": [id_offset + i for i in range(N_ROWS)],
"feature_value6": ["a" for i in range(N_ROWS)],
"datetime": [time_offset] * n_rows,
"entity_id": [id_offset + i for i in range(n_rows)],
"feature_value6": ["a" for i in range(n_rows)],
}
)
ingestion_id1 = client.ingest(fs1, features_1_df)

features_2_df = pd.DataFrame(
{
"datetime": [time_offset] * N_ROWS,
"other_entity_id": [id_offset + i for i in range(N_ROWS)],
"other_feature_value7": [int(i) % 10 for i in range(0, N_ROWS)],
"datetime": [time_offset] * n_rows,
"other_entity_id": [id_offset + i for i in range(n_rows)],
"other_feature_value7": [int(i) % 10 for i in range(0, n_rows)],
}
)
ingestion_id2 = client.ingest(fs2, features_2_df)

entity_df = pd.DataFrame(
{
"datetime": [time_offset] * N_ROWS,
"entity_id": [id_offset + i for i in range(N_ROWS)],
"other_entity_id": [id_offset + i for i in range(N_ROWS)],
"datetime": [time_offset] * n_rows,
"entity_id": [id_offset + i for i in range(n_rows)],
"other_entity_id": [id_offset + i for i in range(n_rows)],
}
)

time.sleep(15) # wait for rows to get written to bq
while True:
rows_ingested1 = get_rows_ingested(client, fs1, ingestion_id1)
rows_ingested2 = get_rows_ingested(client, fs2, ingestion_id2)
if rows_ingested1 == len(features_1_df) and rows_ingested2 == len(features_2_df):
print(f"Number of rows successfully ingested: {rows_ingested1}, {rows_ingested2}. Continuing.")
if rows_ingested1 == len(features_1_df) and rows_ingested2 == len(
features_2_df
):
print(
f"Number of rows successfully ingested: {rows_ingested1}, {rows_ingested2}. Continuing."
)
break
time.sleep(30)

feature_retrieval_job = client.get_batch_features(
entity_rows=entity_df,
feature_refs=[
"feature_value6",
"feature_set_2:other_feature_value7",
],
feature_refs=["feature_value6", "feature_set_2:other_feature_value7"],
project=PROJECT_NAME,
compute_statistics=True,
)
Expand All @@ -765,9 +779,9 @@ def test_batch_dataset_statistics(client):


def get_rows_ingested(
client: Client, feature_set: FeatureSet, ingestion_id: str
client: Client, feature_set: FeatureSet, ingestion_id: str
) -> int:
response = client._core_service_stub.ListStores(
response = client._core_service.ListStores(
ListStoresRequest(filter=ListStoresRequest.Filter(name="historical"))
)
bq_config = response.store[0].bigquery_config
Expand All @@ -780,8 +794,7 @@ def get_rows_ingested(
f'SELECT COUNT(*) as count FROM `{project}.{dataset}.{table}` WHERE ingestion_id = "{ingestion_id}"'
).result()

for row in rows:
return row["count"]
return list(rows)[0]["count"]


def clean_up_remote_files(files):
Expand Down
Loading

0 comments on commit f685726

Please sign in to comment.