-
Notifications
You must be signed in to change notification settings - Fork 996
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Inferencing of Features in FeatureView and timestamp column of DataSo…
…urce (#1523) * Implemented the inferencing. Did cursory runs to make sure it works. More through testing needed. Signed-off-by: David Liu <davidl@twitter.com> * fixed issue with mutable default argument in FeatureView Signed-off-by: David Liu <davidl@twitter.com> * Fix in example_feature_repo_with_inference.py file Signed-off-by: David Liu <davidl@twitter.com> * Added test cases and small fixes. Signed-off-by: David Liu <davidl@twitter.com> * fixed missing import with handling for lint error Signed-off-by: David Liu <davidl@twitter.com> * marked a test that needs bigquery client requesting to be an integration test & added __ rule in inference Signed-off-by: David Liu <davidl@twitter.com> * Code review corrections + BQSource Query arg handling + corresponding test case for it Signed-off-by: David Liu <davidl@twitter.com> * CR corrections Signed-off-by: David Y Liu <davidyliuliu@gmail.com> Co-authored-by: David Liu <davidl@twitter.com>
- Loading branch information
Showing
6 changed files
with
332 additions
and
14 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
from google.protobuf.duration_pb2 import Duration | ||
|
||
from feast import Entity, FeatureView, ValueType | ||
from feast.data_source import FileSource | ||
|
||
driver_hourly_stats = FileSource( | ||
path="%PARQUET_PATH%", # placeholder to be replaced by the test | ||
created_timestamp_column="created", | ||
) | ||
|
||
driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id",) | ||
|
||
# features are inferred from columns of data source | ||
driver_hourly_stats_view = FeatureView( | ||
name="driver_hourly_stats", | ||
entities=["driver_id"], | ||
ttl=Duration(seconds=86400 * 1), | ||
online=True, | ||
input=driver_hourly_stats, | ||
tags={}, | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
import contextlib | ||
import tempfile | ||
from datetime import datetime, timedelta | ||
|
||
import pandas as pd | ||
import pytest | ||
from google.cloud import bigquery | ||
|
||
from feast.data_format import ParquetFormat | ||
from feast.data_source import BigQuerySource, FileSource | ||
|
||
|
||
@pytest.fixture | ||
def simple_dataset_1() -> pd.DataFrame: | ||
now = datetime.utcnow() | ||
ts = pd.Timestamp(now).round("ms") | ||
data = { | ||
"id": [1, 2, 1, 3, 3], | ||
"float_col": [0.1, 0.2, 0.3, 4, 5], | ||
"int64_col": [1, 2, 3, 4, 5], | ||
"string_col": ["a", "b", "c", "d", "e"], | ||
"ts_1": [ | ||
ts, | ||
ts - timedelta(hours=4), | ||
ts - timedelta(hours=3), | ||
ts - timedelta(hours=2), | ||
ts - timedelta(hours=1), | ||
], | ||
} | ||
return pd.DataFrame.from_dict(data) | ||
|
||
|
||
@contextlib.contextmanager | ||
def prep_file_source(df, event_timestamp_column="") -> FileSource: | ||
with tempfile.NamedTemporaryFile(suffix=".parquet") as f: | ||
f.close() | ||
df.to_parquet(f.name) | ||
file_source = FileSource( | ||
file_format=ParquetFormat(), | ||
file_url=f.name, | ||
event_timestamp_column=event_timestamp_column, | ||
) | ||
yield file_source | ||
|
||
|
||
def simple_bq_source_using_table_ref_arg( | ||
df, event_timestamp_column="" | ||
) -> BigQuerySource: | ||
client = bigquery.Client() | ||
gcp_project = client.project | ||
bigquery_dataset = "ds" | ||
dataset = bigquery.Dataset(f"{gcp_project}.{bigquery_dataset}") | ||
client.create_dataset(dataset, exists_ok=True) | ||
dataset.default_table_expiration_ms = ( | ||
1000 | ||
* 60 | ||
* 60 # 60 minutes in milliseconds (seems to be minimum limit for gcloud) | ||
) | ||
client.update_dataset(dataset, ["default_table_expiration_ms"]) | ||
table_ref = f"{gcp_project}.{bigquery_dataset}.table_1" | ||
|
||
job = client.load_table_from_dataframe( | ||
df, table_ref, job_config=bigquery.LoadJobConfig() | ||
) | ||
job.result() | ||
|
||
return BigQuerySource( | ||
table_ref=table_ref, event_timestamp_column=event_timestamp_column, | ||
) | ||
|
||
|
||
def simple_bq_source_using_query_arg(df, event_timestamp_column="") -> BigQuerySource: | ||
bq_source_using_table_ref = simple_bq_source_using_table_ref_arg( | ||
df, event_timestamp_column | ||
) | ||
return BigQuerySource( | ||
query=f"SELECT * FROM {bq_source_using_table_ref.table_ref}", | ||
event_timestamp_column=event_timestamp_column, | ||
) |
Oops, something went wrong.