Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Inferencing of Features in FeatureView and timestamp column of DataSource #1523

Merged
84 changes: 73 additions & 11 deletions sdk/python/feast/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@


import enum
from typing import Dict, Optional
import re
from typing import Dict, Iterable, Optional, Tuple

from google.cloud import bigquery
mavysavydav marked this conversation as resolved.
Show resolved Hide resolved
from pyarrow.parquet import ParquetFile

from feast.data_format import FileFormat, StreamFormat
from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto
Expand Down Expand Up @@ -515,11 +519,42 @@ def to_proto(self) -> DataSourceProto:
"""
raise NotImplementedError

def infer_event_timestamp_column(self, ts_column_type_regex_pattern):
ERROR_MSG_PREFIX = "Unable to infer DataSource event_timestamp_column"

if isinstance(self, FileSource) or isinstance(self, BigQuerySource):
event_timestamp_column, matched_flag = None, False
for col_name, col_datatype in self.get_table_column_names_and_types():
if re.match(ts_column_type_regex_pattern, col_datatype):
if matched_flag:
raise TypeError(
f"""
{ERROR_MSG_PREFIX} due to multiple possible columns satisfying
the criteria.
"""
)
matched_flag = True
event_timestamp_column = col_name
if matched_flag:
return event_timestamp_column
else:
raise TypeError(
f"""
{ERROR_MSG_PREFIX} due to an absence of columns that satisfy the criteria.
"""
)
else:
raise TypeError(
f"""
{ERROR_MSG_PREFIX} because this DataSource currently does not support this inference.
mavysavydav marked this conversation as resolved.
Show resolved Hide resolved
"""
)


class FileSource(DataSource):
def __init__(
self,
event_timestamp_column: str,
event_timestamp_column: Optional[str] = None,
file_url: Optional[str] = None,
path: Optional[str] = None,
file_format: FileFormat = None,
Expand All @@ -543,12 +578,6 @@ def __init__(
Examples:
>>> FileSource(path="/data/my_features.parquet", event_timestamp_column="datetime")
"""
super().__init__(
event_timestamp_column,
created_timestamp_column,
field_mapping,
date_partition_column,
)
if path is None and file_url is None:
raise ValueError(
'No "path" argument provided. Please set "path" to the location of your file source.'
Expand All @@ -561,8 +590,17 @@ def __init__(
)
else:
file_url = path

self._file_options = FileOptions(file_format=file_format, file_url=file_url)

super().__init__(
event_timestamp_column
or self.infer_event_timestamp_column(r"timestamp\[\w\w\]"),
created_timestamp_column,
field_mapping,
date_partition_column,
)

def __eq__(self, other):
if not isinstance(other, FileSource):
raise TypeError("Comparisons should only involve FileSource class objects.")
Expand Down Expand Up @@ -609,24 +647,30 @@ def to_proto(self) -> DataSourceProto:

return data_source_proto

def get_table_column_names_and_types(self) -> Iterable[Tuple[str, str]]:
schema = ParquetFile(self.path).schema_arrow
return zip(schema.names, map(str, schema.types))


class BigQuerySource(DataSource):
def __init__(
self,
event_timestamp_column: str,
event_timestamp_column: Optional[str] = None,
table_ref: Optional[str] = None,
created_timestamp_column: Optional[str] = "",
field_mapping: Optional[Dict[str, str]] = None,
date_partition_column: Optional[str] = "",
query: Optional[str] = None,
):
self._bigquery_options = BigQueryOptions(table_ref=table_ref, query=query)

super().__init__(
event_timestamp_column,
event_timestamp_column
or self.infer_event_timestamp_column("TIMESTAMP|DATETIME"),
created_timestamp_column,
field_mapping,
date_partition_column,
)
self._bigquery_options = BigQueryOptions(table_ref=table_ref, query=query)

def __eq__(self, other):
if not isinstance(other, BigQuerySource):
Expand Down Expand Up @@ -684,6 +728,24 @@ def get_table_query_string(self) -> str:
else:
return f"({self.query})"

def get_table_column_names_and_types(self) -> Iterable[Tuple[str, str]]:
assert self.table_ref is not None
mavysavydav marked this conversation as resolved.
Show resolved Hide resolved
name_type_pairs = []
project_id, dataset_id, table_id = self.table_ref.split(".")
bq_columns_query = f"""
SELECT COLUMN_NAME, DATA_TYPE FROM {project_id}.{dataset_id}.INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = '{table_id}'
"""

client = bigquery.Client()
table_schema = client.query(bq_columns_query).result().to_dataframe_iterable()
for df in table_schema:
name_type_pairs.extend(
list(zip(df["COLUMN_NAME"].to_list(), df["DATA_TYPE"].to_list()))
)

return name_type_pairs


class KafkaSource(DataSource):
def __init__(
Expand Down
27 changes: 26 additions & 1 deletion sdk/python/feast/feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple, Union

Expand All @@ -31,6 +32,7 @@
from feast.protos.feast.core.FeatureView_pb2 import (
MaterializationInterval as MaterializationIntervalProto,
)
from feast.type_map import bq_to_feast_value_type, pa_to_feast_value_type
from feast.value_type import ValueType


Expand All @@ -55,12 +57,35 @@ def __init__(
self,
name: str,
entities: List[str],
features: List[Feature],
ttl: Optional[Union[Duration, timedelta]],
input: Union[BigQuerySource, FileSource],
features: List[Feature] = [],
tags: Optional[Dict[str, str]] = None,
online: bool = True,
):
if not features:
features = [] # to handle python's mutable default arguments
columns_to_exclude = {
input.event_timestamp_column,
input.created_timestamp_column,
} | set(entities)
type_converter = (
mavysavydav marked this conversation as resolved.
Show resolved Hide resolved
bq_to_feast_value_type
if isinstance(input, BigQuerySource)
else pa_to_feast_value_type
)

for col_name, col_datatype in input.get_table_column_names_and_types():
if col_name not in columns_to_exclude and not re.match(
"^__|__$", col_name
):
features.append(Feature(col_name, type_converter(col_datatype)))

if not features:
raise ValueError(
f"Could not infer Features for the FeatureView named {name}."
mavysavydav marked this conversation as resolved.
Show resolved Hide resolved
)

cols = [entity for entity in entities] + [feat.name for feat in features]
for col in cols:
if input.field_mapping is not None and col in input.field_mapping.keys():
Expand Down
25 changes: 23 additions & 2 deletions sdk/python/feast/type_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ def pa_to_value_type(pa_type: object):
return type_map[pa_type.__str__()]


def pa_to_feast_value_type(value: pa.lib.ChunkedArray) -> ValueType:
def pa_to_feast_value_type(value: Union[pa.lib.ChunkedArray, str]) -> ValueType:
type_map = {
"timestamp[ms]": ValueType.INT64,
"int32": ValueType.INT32,
Expand All @@ -435,7 +435,9 @@ def pa_to_feast_value_type(value: pa.lib.ChunkedArray) -> ValueType:
"list<item: binary>": ValueType.BYTES_LIST,
"list<item: bool>": ValueType.BOOL_LIST,
}
return type_map[value.type.__str__()]
return type_map[
value.type.__str__() if isinstance(value, pa.lib.ChunkedArray) else value
]


def pa_column_to_timestamp_proto_column(column: pa.lib.ChunkedArray) -> List[Timestamp]:
Expand Down Expand Up @@ -480,3 +482,22 @@ def pa_column_to_proto_column(
]
else:
return [ProtoValue(**{value: x.as_py()}) for x in column]


def bq_to_feast_value_type(bq_type_as_str):
type_map: Dict[ValueType, Union[str, Dict[str, Any]]] = {
"DATETIME": ValueType.STRING, # Unsure if string is right
"TIMESTAMP": ValueType.STRING, # Unsure if string is right
mavysavydav marked this conversation as resolved.
Show resolved Hide resolved
"INT64": ValueType.INT64,
"STRING": ValueType.STRING,
"FLOAT64": ValueType.DOUBLE,
"BYTES": ValueType.BYTES,
"BOOL": ValueType.BOOL,
"ARRAY<INT64>": ValueType.INT64_LIST,
"ARRAY<FLOAT64>": ValueType.DOUBLE_LIST,
"ARRAY<STRING>": ValueType.STRING_LIST,
"ARRAY<BYTES>": ValueType.BYTES_LIST,
"ARRAY<BOOL>": ValueType.BOOL_LIST,
}

return type_map[bq_type_as_str]
21 changes: 21 additions & 0 deletions sdk/python/tests/example_feature_repo_with_inference.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from google.protobuf.duration_pb2 import Duration

from feast import Entity, FeatureView, ValueType
from feast.data_source import FileSource

driver_hourly_stats = FileSource(
path="%PARQUET_PATH%", # placeholder to be replaced by the test
created_timestamp_column="created",
)

driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id",)

# features are inferred from columns of data source
driver_hourly_stats_view = FeatureView(
name="driver_hourly_stats",
entities=["driver_id"],
ttl=Duration(seconds=86400 * 1),
online=True,
input=driver_hourly_stats,
tags={},
)
65 changes: 65 additions & 0 deletions sdk/python/tests/fixtures/data_source_fixtures.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import contextlib
import tempfile
from datetime import datetime, timedelta

import pandas as pd
import pytest
from google.cloud import bigquery

from feast.data_format import ParquetFormat
from feast.data_source import BigQuerySource, FileSource


@pytest.fixture
def simple_dataset_1() -> pd.DataFrame:
now = datetime.utcnow()
ts = pd.Timestamp(now).round("ms")
data = {
"id": [1, 2, 1, 3, 3],
"float_col": [0.1, 0.2, 0.3, 4, 5],
"int64_col": [1, 2, 3, 4, 5],
"string_col": ["a", "b", "c", "d", "e"],
"ts_1": [
ts,
ts - timedelta(hours=4),
ts - timedelta(hours=3),
ts - timedelta(hours=2),
ts - timedelta(hours=1),
],
}
return pd.DataFrame.from_dict(data)


@contextlib.contextmanager
def prep_file_source(df, event_timestamp_column="") -> FileSource:
with tempfile.NamedTemporaryFile(suffix=".parquet") as f:
f.close()
df.to_parquet(f.name)
file_source = FileSource(
file_format=ParquetFormat(),
file_url=f.name,
event_timestamp_column=event_timestamp_column,
)
yield file_source


def prep_bq_source(df, event_timestamp_column="") -> BigQuerySource:
client = bigquery.Client()
gcp_project = client.project
bigquery_dataset = "ds"
mavysavydav marked this conversation as resolved.
Show resolved Hide resolved
dataset = bigquery.Dataset(f"{gcp_project}.{bigquery_dataset}")
client.create_dataset(dataset, exists_ok=True)
dataset.default_table_expiration_ms = (
1000 * 60 * 60 * 24 * 14 # 2 weeks in milliseconds
)
client.update_dataset(dataset, ["default_table_expiration_ms"])
table_ref = f"{gcp_project}.{bigquery_dataset}.table_1"

job = client.load_table_from_dataframe(
df, table_ref, job_config=bigquery.LoadJobConfig()
)
job.result()

return BigQuerySource(
table_ref=table_ref, event_timestamp_column=event_timestamp_column,
)
Loading