Skip to content

Commit

Permalink
Inferencing of Features in FeatureView and timestamp column of DataSo…
Browse files Browse the repository at this point in the history
…urce (#1523)

* Implemented the inferencing. Did cursory runs to make sure it works. More through testing needed.

Signed-off-by: David Liu <davidl@twitter.com>

* fixed issue with mutable default argument in FeatureView

Signed-off-by: David Liu <davidl@twitter.com>

* Fix in example_feature_repo_with_inference.py file

Signed-off-by: David Liu <davidl@twitter.com>

* Added test cases and small fixes.

Signed-off-by: David Liu <davidl@twitter.com>

* fixed missing import with handling for lint error

Signed-off-by: David Liu <davidl@twitter.com>

* marked a test that needs bigquery client requesting to be an integration test & added __ rule in inference

Signed-off-by: David Liu <davidl@twitter.com>

* Code review corrections + BQSource Query arg handling + corresponding test case for it

Signed-off-by: David Liu <davidl@twitter.com>

* CR corrections

Signed-off-by: David Y Liu <davidyliuliu@gmail.com>

Co-authored-by: David Liu <davidl@twitter.com>
  • Loading branch information
mavysavydav and David Liu authored May 4, 2021
1 parent fb2f63c commit f55b51c
Show file tree
Hide file tree
Showing 6 changed files with 332 additions and 14 deletions.
107 changes: 96 additions & 11 deletions sdk/python/feast/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,15 @@


import enum
from typing import Dict, Optional
import re
from typing import Callable, Dict, Iterable, Optional, Tuple

from pyarrow.parquet import ParquetFile

from feast import type_map
from feast.data_format import FileFormat, StreamFormat
from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto
from feast.value_type import ValueType


class SourceType(enum.Enum):
Expand Down Expand Up @@ -515,11 +520,45 @@ def to_proto(self) -> DataSourceProto:
"""
raise NotImplementedError

def _infer_event_timestamp_column(self, ts_column_type_regex_pattern):
ERROR_MSG_PREFIX = "Unable to infer DataSource event_timestamp_column"
USER_GUIDANCE = "Please specify event_timestamp_column explicitly."

if isinstance(self, FileSource) or isinstance(self, BigQuerySource):
event_timestamp_column, matched_flag = None, False
for col_name, col_datatype in self.get_table_column_names_and_types():
if re.match(ts_column_type_regex_pattern, col_datatype):
if matched_flag:
raise TypeError(
f"""
{ERROR_MSG_PREFIX} due to multiple possible columns satisfying
the criteria. {USER_GUIDANCE}
"""
)
matched_flag = True
event_timestamp_column = col_name
if matched_flag:
return event_timestamp_column
else:
raise TypeError(
f"""
{ERROR_MSG_PREFIX} due to an absence of columns that satisfy the criteria.
{USER_GUIDANCE}
"""
)
else:
raise TypeError(
f"""
{ERROR_MSG_PREFIX} because this DataSource currently does not support this inference.
{USER_GUIDANCE}
"""
)


class FileSource(DataSource):
def __init__(
self,
event_timestamp_column: str,
event_timestamp_column: Optional[str] = None,
file_url: Optional[str] = None,
path: Optional[str] = None,
file_format: FileFormat = None,
Expand All @@ -543,12 +582,6 @@ def __init__(
Examples:
>>> FileSource(path="/data/my_features.parquet", event_timestamp_column="datetime")
"""
super().__init__(
event_timestamp_column,
created_timestamp_column,
field_mapping,
date_partition_column,
)
if path is None and file_url is None:
raise ValueError(
'No "path" argument provided. Please set "path" to the location of your file source.'
Expand All @@ -561,8 +594,17 @@ def __init__(
)
else:
file_url = path

self._file_options = FileOptions(file_format=file_format, file_url=file_url)

super().__init__(
event_timestamp_column
or self._infer_event_timestamp_column(r"timestamp\[\w\w\]"),
created_timestamp_column,
field_mapping,
date_partition_column,
)

def __eq__(self, other):
if not isinstance(other, FileSource):
raise TypeError("Comparisons should only involve FileSource class objects.")
Expand Down Expand Up @@ -609,24 +651,34 @@ def to_proto(self) -> DataSourceProto:

return data_source_proto

@staticmethod
def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]:
return type_map.pa_to_feast_value_type

def get_table_column_names_and_types(self) -> Iterable[Tuple[str, str]]:
schema = ParquetFile(self.path).schema_arrow
return zip(schema.names, map(str, schema.types))


class BigQuerySource(DataSource):
def __init__(
self,
event_timestamp_column: str,
event_timestamp_column: Optional[str] = None,
table_ref: Optional[str] = None,
created_timestamp_column: Optional[str] = "",
field_mapping: Optional[Dict[str, str]] = None,
date_partition_column: Optional[str] = "",
query: Optional[str] = None,
):
self._bigquery_options = BigQueryOptions(table_ref=table_ref, query=query)

super().__init__(
event_timestamp_column,
event_timestamp_column
or self._infer_event_timestamp_column("TIMESTAMP|DATETIME"),
created_timestamp_column,
field_mapping,
date_partition_column,
)
self._bigquery_options = BigQueryOptions(table_ref=table_ref, query=query)

def __eq__(self, other):
if not isinstance(other, BigQuerySource):
Expand Down Expand Up @@ -684,6 +736,39 @@ def get_table_query_string(self) -> str:
else:
return f"({self.query})"

@staticmethod
def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]:
return type_map.bq_to_feast_value_type

def get_table_column_names_and_types(self) -> Iterable[Tuple[str, str]]:
from google.cloud import bigquery

client = bigquery.Client()
bq_columns_query = ""
name_type_pairs = []
if self.table_ref is not None:
project_id, dataset_id, table_id = self.table_ref.split(".")
bq_columns_query = f"""
SELECT COLUMN_NAME, DATA_TYPE FROM {project_id}.{dataset_id}.INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = '{table_id}'
"""
table_schema = (
client.query(bq_columns_query).result().to_dataframe_iterable()
)
for df in table_schema:
name_type_pairs.extend(
list(zip(df["COLUMN_NAME"].to_list(), df["DATA_TYPE"].to_list()))
)
else:
bq_columns_query = f"SELECT * FROM ({self.query}) LIMIT 1"
queryRes = client.query(bq_columns_query).result()
name_type_pairs = [
(schema_field.name, schema_field.field_type)
for schema_field in queryRes.schema
]

return name_type_pairs


class KafkaSource(DataSource):
def __init__(
Expand Down
26 changes: 25 additions & 1 deletion sdk/python/feast/feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple, Union

Expand Down Expand Up @@ -55,12 +56,35 @@ def __init__(
self,
name: str,
entities: List[str],
features: List[Feature],
ttl: Optional[Union[Duration, timedelta]],
input: Union[BigQuerySource, FileSource],
features: List[Feature] = [],
tags: Optional[Dict[str, str]] = None,
online: bool = True,
):
if not features:
features = [] # to handle python's mutable default arguments
columns_to_exclude = {
input.event_timestamp_column,
input.created_timestamp_column,
} | set(entities)

for col_name, col_datatype in input.get_table_column_names_and_types():
if col_name not in columns_to_exclude and not re.match(
"^__|__$", col_name
):
features.append(
Feature(
col_name,
input.source_datatype_to_feast_value_type()(col_datatype),
)
)

if not features:
raise ValueError(
f"Could not infer Features for the FeatureView named {name}. Please specify Features explicitly for this FeatureView."
)

cols = [entity for entity in entities] + [feat.name for feat in features]
for col in cols:
if input.field_mapping is not None and col in input.field_mapping.keys():
Expand Down
27 changes: 25 additions & 2 deletions sdk/python/feast/type_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ def pa_to_value_type(pa_type: object):
return type_map[pa_type.__str__()]


def pa_to_feast_value_type(value: pa.lib.ChunkedArray) -> ValueType:
def pa_to_feast_value_type(value: Union[pa.lib.ChunkedArray, str]) -> ValueType:
type_map = {
"timestamp[ms]": ValueType.INT64,
"int32": ValueType.INT32,
Expand All @@ -435,7 +435,9 @@ def pa_to_feast_value_type(value: pa.lib.ChunkedArray) -> ValueType:
"list<item: binary>": ValueType.BYTES_LIST,
"list<item: bool>": ValueType.BOOL_LIST,
}
return type_map[value.type.__str__()]
return type_map[
value.type.__str__() if isinstance(value, pa.lib.ChunkedArray) else value
]


def pa_column_to_timestamp_proto_column(column: pa.lib.ChunkedArray) -> List[Timestamp]:
Expand Down Expand Up @@ -480,3 +482,24 @@ def pa_column_to_proto_column(
]
else:
return [ProtoValue(**{value: x.as_py()}) for x in column]


def bq_to_feast_value_type(bq_type_as_str):
type_map: Dict[ValueType, Union[str, Dict[str, Any]]] = {
"DATETIME": ValueType.STRING, # Update to ValueType.UNIX_TIMESTAMP once #1520 lands.
"TIMESTAMP": ValueType.STRING, # Update to ValueType.UNIX_TIMESTAMP once #1520 lands.
"INTEGER": ValueType.INT64,
"INT64": ValueType.INT64,
"STRING": ValueType.STRING,
"FLOAT": ValueType.DOUBLE,
"FLOAT64": ValueType.DOUBLE,
"BYTES": ValueType.BYTES,
"BOOL": ValueType.BOOL,
"ARRAY<INT64>": ValueType.INT64_LIST,
"ARRAY<FLOAT64>": ValueType.DOUBLE_LIST,
"ARRAY<STRING>": ValueType.STRING_LIST,
"ARRAY<BYTES>": ValueType.BYTES_LIST,
"ARRAY<BOOL>": ValueType.BOOL_LIST,
}

return type_map[bq_type_as_str]
21 changes: 21 additions & 0 deletions sdk/python/tests/example_feature_repo_with_inference.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from google.protobuf.duration_pb2 import Duration

from feast import Entity, FeatureView, ValueType
from feast.data_source import FileSource

driver_hourly_stats = FileSource(
path="%PARQUET_PATH%", # placeholder to be replaced by the test
created_timestamp_column="created",
)

driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id",)

# features are inferred from columns of data source
driver_hourly_stats_view = FeatureView(
name="driver_hourly_stats",
entities=["driver_id"],
ttl=Duration(seconds=86400 * 1),
online=True,
input=driver_hourly_stats,
tags={},
)
79 changes: 79 additions & 0 deletions sdk/python/tests/fixtures/data_source_fixtures.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import contextlib
import tempfile
from datetime import datetime, timedelta

import pandas as pd
import pytest
from google.cloud import bigquery

from feast.data_format import ParquetFormat
from feast.data_source import BigQuerySource, FileSource


@pytest.fixture
def simple_dataset_1() -> pd.DataFrame:
now = datetime.utcnow()
ts = pd.Timestamp(now).round("ms")
data = {
"id": [1, 2, 1, 3, 3],
"float_col": [0.1, 0.2, 0.3, 4, 5],
"int64_col": [1, 2, 3, 4, 5],
"string_col": ["a", "b", "c", "d", "e"],
"ts_1": [
ts,
ts - timedelta(hours=4),
ts - timedelta(hours=3),
ts - timedelta(hours=2),
ts - timedelta(hours=1),
],
}
return pd.DataFrame.from_dict(data)


@contextlib.contextmanager
def prep_file_source(df, event_timestamp_column="") -> FileSource:
with tempfile.NamedTemporaryFile(suffix=".parquet") as f:
f.close()
df.to_parquet(f.name)
file_source = FileSource(
file_format=ParquetFormat(),
file_url=f.name,
event_timestamp_column=event_timestamp_column,
)
yield file_source


def simple_bq_source_using_table_ref_arg(
df, event_timestamp_column=""
) -> BigQuerySource:
client = bigquery.Client()
gcp_project = client.project
bigquery_dataset = "ds"
dataset = bigquery.Dataset(f"{gcp_project}.{bigquery_dataset}")
client.create_dataset(dataset, exists_ok=True)
dataset.default_table_expiration_ms = (
1000
* 60
* 60 # 60 minutes in milliseconds (seems to be minimum limit for gcloud)
)
client.update_dataset(dataset, ["default_table_expiration_ms"])
table_ref = f"{gcp_project}.{bigquery_dataset}.table_1"

job = client.load_table_from_dataframe(
df, table_ref, job_config=bigquery.LoadJobConfig()
)
job.result()

return BigQuerySource(
table_ref=table_ref, event_timestamp_column=event_timestamp_column,
)


def simple_bq_source_using_query_arg(df, event_timestamp_column="") -> BigQuerySource:
bq_source_using_table_ref = simple_bq_source_using_table_ref_arg(
df, event_timestamp_column
)
return BigQuerySource(
query=f"SELECT * FROM {bq_source_using_table_ref.table_ref}",
event_timestamp_column=event_timestamp_column,
)
Loading

0 comments on commit f55b51c

Please sign in to comment.