Skip to content

Commit

Permalink
Deprecate table_ref parameter for BigQuerySource
Browse files Browse the repository at this point in the history
Signed-off-by: Felix Wang <wangfelix98@gmail.com>
  • Loading branch information
felixwang9817 committed Apr 6, 2022
1 parent 6abae16 commit fde556a
Show file tree
Hide file tree
Showing 11 changed files with 51 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,10 @@ public static DataSource createFileDataSourceSpec(
}

public static DataSource createBigQueryDataSourceSpec(
String bigQueryTableRef, String timestampColumn, String datePartitionColumn) {
String bigQueryTable, String timestampColumn, String datePartitionColumn) {
return DataSource.newBuilder()
.setType(DataSource.SourceType.BATCH_BIGQUERY)
.setBigqueryOptions(
DataSource.BigQueryOptions.newBuilder().setTableRef(bigQueryTableRef).build())
.setBigqueryOptions(DataSource.BigQueryOptions.newBuilder().setTable(bigQueryTable).build())
.setTimestampField(timestampColumn)
.setDatePartitionColumn(datePartitionColumn)
.build();
Expand Down
2 changes: 1 addition & 1 deletion protos/feast/core/DataSource.proto
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ message DataSource {
// Defines options for DataSource that sources features from a BigQuery Query
message BigQueryOptions {
// Full table reference in the form of [project:dataset.table]
string table_ref = 1;
string table = 1;

// SQL query that returns a table containing feature data. Must contain an event_timestamp column, and respective
// entity columns
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ def persist(self, storage: SavedDatasetStorage):
assert isinstance(storage, SavedDatasetBigQueryStorage)

self.to_bigquery(
bigquery.QueryJobConfig(destination=storage.bigquery_options.table_ref)
bigquery.QueryJobConfig(destination=storage.bigquery_options.table)
)

@property
Expand Down
71 changes: 28 additions & 43 deletions sdk/python/feast/infra/offline_stores/bigquery_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ def __init__(
self,
event_timestamp_column: Optional[str] = "",
table: Optional[str] = None,
table_ref: Optional[str] = None,
created_timestamp_column: Optional[str] = "",
field_mapping: Optional[Dict[str, str]] = None,
date_partition_column: Optional[str] = None,
Expand All @@ -33,14 +32,13 @@ def __init__(
Args:
table (optional): The BigQuery table where features can be found.
table_ref (optional): (Deprecated) The BigQuery table where features can be found.
event_timestamp_column: (Deprecated) Event timestamp column used for point in time joins of feature values.
created_timestamp_column (optional): Timestamp column when row was created, used for deduplicating rows.
field_mapping: A dictionary mapping of column names in this data source to feature names in a feature table
or view. Only used for feature columns, not entities or timestamp columns.
date_partition_column (deprecated): Timestamp column used for partitioning.
query (optional): SQL query to execute to generate data for this data source.
name (optional): Name for the source. Defaults to the table_ref if not specified.
name (optional): Name for the source. Defaults to the table if not specified.
description (optional): A human-readable description.
tags (optional): A dictionary of key-value pairs to store arbitrary metadata.
owner (optional): The owner of the bigquery source, typically the email of the primary
Expand All @@ -51,18 +49,10 @@ def __init__(
>>> from feast import BigQuerySource
>>> my_bigquery_source = BigQuerySource(table="gcp_project:bq_dataset.bq_table")
"""
if table is None and table_ref is None and query is None:
if table is None and query is None:
raise ValueError('No "table" or "query" argument provided.')
if not table and table_ref:
warnings.warn(
(
"The argument 'table_ref' is being deprecated. Please use 'table' "
"instead. Feast 0.20 and onwards will not support the argument 'table_ref'."
),
DeprecationWarning,
)
table = table_ref
self.bigquery_options = BigQueryOptions(table_ref=table, query=query)

self.bigquery_options = BigQueryOptions(table=table, query=query)

if date_partition_column:
warnings.warn(
Expand All @@ -73,13 +63,11 @@ def __init__(
DeprecationWarning,
)

# If no name, use the table_ref as the default name
# If no name, use the table as the default name
_name = name
if not _name:
if table:
_name = table
elif table_ref:
_name = table_ref
else:
warnings.warn(
(
Expand Down Expand Up @@ -111,7 +99,7 @@ def __eq__(self, other):

return (
self.name == other.name
and self.bigquery_options.table_ref == other.bigquery_options.table_ref
and self.bigquery_options.table == other.bigquery_options.table
and self.bigquery_options.query == other.bigquery_options.query
and self.timestamp_field == other.timestamp_field
and self.created_timestamp_column == other.created_timestamp_column
Expand All @@ -122,8 +110,8 @@ def __eq__(self, other):
)

@property
def table_ref(self):
return self.bigquery_options.table_ref
def table(self):
return self.bigquery_options.table

@property
def query(self):
Expand All @@ -137,7 +125,7 @@ def from_proto(data_source: DataSourceProto):
return BigQuerySource(
name=data_source.name,
field_mapping=dict(data_source.field_mapping),
table_ref=data_source.bigquery_options.table_ref,
table=data_source.bigquery_options.table,
timestamp_field=data_source.timestamp_field,
created_timestamp_column=data_source.created_timestamp_column,
query=data_source.bigquery_options.query,
Expand Down Expand Up @@ -169,14 +157,14 @@ def validate(self, config: RepoConfig):

client = bigquery.Client()
try:
client.get_table(self.table_ref)
client.get_table(self.table)
except NotFound:
raise DataSourceNotFoundException(self.table_ref)
raise DataSourceNotFoundException(self.table)

def get_table_query_string(self) -> str:
"""Returns a string that can directly be used to reference this table in SQL"""
if self.table_ref:
return f"`{self.table_ref}`"
if self.table:
return f"`{self.table}`"
else:
return f"({self.query})"

Expand All @@ -190,8 +178,8 @@ def get_table_column_names_and_types(
from google.cloud import bigquery

client = bigquery.Client()
if self.table_ref is not None:
schema = client.get_table(self.table_ref).schema
if self.table is not None:
schema = client.get_table(self.table).schema
if not isinstance(schema[0], bigquery.schema.SchemaField):
raise TypeError("Could not parse BigQuery table schema.")
else:
Expand All @@ -215,9 +203,9 @@ class BigQueryOptions:
"""

def __init__(
self, table_ref: Optional[str], query: Optional[str],
self, table: Optional[str], query: Optional[str],
):
self._table_ref = table_ref
self._table = table
self._query = query

@property
Expand All @@ -235,18 +223,18 @@ def query(self, query):
self._query = query

@property
def table_ref(self):
def table(self):
"""
Returns the table ref of this BQ table
"""
return self._table_ref
return self._table

@table_ref.setter
def table_ref(self, table_ref):
@table.setter
def table(self, table):
"""
Sets the table ref of this BQ table
"""
self._table_ref = table_ref
self._table = table

@classmethod
def from_proto(cls, bigquery_options_proto: DataSourceProto.BigQueryOptions):
Expand All @@ -261,8 +249,7 @@ def from_proto(cls, bigquery_options_proto: DataSourceProto.BigQueryOptions):
"""

bigquery_options = cls(
table_ref=bigquery_options_proto.table_ref,
query=bigquery_options_proto.query,
table=bigquery_options_proto.table, query=bigquery_options_proto.query,
)

return bigquery_options
Expand All @@ -276,7 +263,7 @@ def to_proto(self) -> DataSourceProto.BigQueryOptions:
"""

bigquery_options_proto = DataSourceProto.BigQueryOptions(
table_ref=self.table_ref, query=self.query,
table=self.table, query=self.query,
)

return bigquery_options_proto
Expand All @@ -287,15 +274,13 @@ class SavedDatasetBigQueryStorage(SavedDatasetStorage):

bigquery_options: BigQueryOptions

def __init__(self, table_ref: str):
self.bigquery_options = BigQueryOptions(table_ref=table_ref, query=None)
def __init__(self, table: str):
self.bigquery_options = BigQueryOptions(table=table, query=None)

@staticmethod
def from_proto(storage_proto: SavedDatasetStorageProto) -> SavedDatasetStorage:
return SavedDatasetBigQueryStorage(
table_ref=BigQueryOptions.from_proto(
storage_proto.bigquery_storage
).table_ref
table=BigQueryOptions.from_proto(storage_proto.bigquery_storage).table
)

def to_proto(self) -> SavedDatasetStorageProto:
Expand All @@ -304,4 +289,4 @@ def to_proto(self) -> SavedDatasetStorageProto:
)

def to_data_source(self) -> DataSource:
return BigQuerySource(table_ref=self.bigquery_options.table_ref)
return BigQuerySource(table=self.bigquery_options.table)
2 changes: 1 addition & 1 deletion sdk/python/feast/templates/gcp/driver_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
# datasets or materializing features into an online store.
driver_stats_source = BigQuerySource(
# The BigQuery table where features can be found
table_ref="feast-oss.demo_data.driver_hourly_stats_2",
table="feast-oss.demo_data.driver_hourly_stats_2",
# The event timestamp is used for point-in-time joins and for ensuring only
# features within the TTL are returned
timestamp_field="event_timestamp",
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/tests/example_repos/example_feature_repo_1.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@

customer_profile_source = BigQuerySource(
name="customer_profile_source",
table_ref="feast-oss.public.customers",
table="feast-oss.public.customers",
timestamp_field="event_timestamp",
)

customer_driver_combined_source = BigQuerySource(
table_ref="feast-oss.public.customer_driver", timestamp_field="event_timestamp",
table="feast-oss.public.customer_driver", timestamp_field="event_timestamp",
)

driver_locations_push_source = PushSource(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def create_data_source(
self.tables.append(destination_name)

return BigQuerySource(
table_ref=destination_name,
table=destination_name,
timestamp_field=timestamp_field,
created_timestamp_column=created_timestamp_column,
field_mapping=field_mapping or {"ts_1": "ts"},
Expand All @@ -84,7 +84,7 @@ def create_saved_dataset_destination(self) -> SavedDatasetBigQueryStorage:
table = self.get_prefixed_table_name(
f"persisted_{str(uuid.uuid4()).replace('-', '_')}"
)
return SavedDatasetBigQueryStorage(table_ref=table)
return SavedDatasetBigQueryStorage(table=table)

def get_prefixed_table_name(self, suffix: str) -> str:
return f"{self.client.project}.{self.project_name}.{suffix}"
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from tests.utils.data_source_utils import (
prep_file_source,
simple_bq_source_using_query_arg,
simple_bq_source_using_table_ref_arg,
simple_bq_source_using_table_arg,
)


Expand Down Expand Up @@ -234,7 +234,7 @@ def test_feature_view_inference_success(test_feature_store, dataframe_source):
entities=["id"],
ttl=timedelta(minutes=5),
online=True,
batch_source=simple_bq_source_using_table_ref_arg(dataframe_source, "ts_1"),
batch_source=simple_bq_source_using_table_arg(dataframe_source, "ts_1"),
tags={},
)

Expand All @@ -255,7 +255,7 @@ def test_feature_view_inference_success(test_feature_store, dataframe_source):
actual_file_source = {
(feature.name, feature.dtype) for feature in feature_view_1.features
}
actual_bq_using_table_ref_arg_source = {
actual_bq_using_table_arg_source = {
(feature.name, feature.dtype) for feature in feature_view_2.features
}
actual_bq_using_query_arg_source = {
Expand All @@ -270,7 +270,7 @@ def test_feature_view_inference_success(test_feature_store, dataframe_source):
assert (
expected
== actual_file_source
== actual_bq_using_table_ref_arg_source
== actual_bq_using_table_arg_source
== actual_bq_using_query_arg_source
)

Expand Down
4 changes: 2 additions & 2 deletions sdk/python/tests/integration/registration/test_inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from tests.utils.data_source_utils import (
prep_file_source,
simple_bq_source_using_query_arg,
simple_bq_source_using_table_ref_arg,
simple_bq_source_using_table_arg,
)


Expand Down Expand Up @@ -120,7 +120,7 @@ def test_update_file_data_source_with_inferred_event_timestamp_col(simple_datase
with prep_file_source(df=simple_dataset_1) as file_source:
data_sources = [
file_source,
simple_bq_source_using_table_ref_arg(simple_dataset_1),
simple_bq_source_using_table_arg(simple_dataset_1),
simple_bq_source_using_query_arg(simple_dataset_1),
]
update_data_sources_with_inferred_event_timestamp_col(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def test_partial() -> None:
) as store:

driver_locations_source = BigQuerySource(
table_ref="feast-oss.public.drivers",
table="feast-oss.public.drivers",
timestamp_field="event_timestamp",
created_timestamp_column="created_timestamp",
)
Expand Down
18 changes: 7 additions & 11 deletions sdk/python/tests/utils/data_source_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ def prep_file_source(df, event_timestamp_column=None) -> Iterator[FileSource]:
yield file_source


def simple_bq_source_using_table_ref_arg(
df, event_timestamp_column=None
) -> BigQuerySource:
def simple_bq_source_using_table_arg(df, event_timestamp_column=None) -> BigQuerySource:
client = bigquery.Client()
gcp_project = client.project
bigquery_dataset = f"ds_{time.time_ns()}"
Expand All @@ -37,20 +35,18 @@ def simple_bq_source_using_table_ref_arg(
* 60 # 60 minutes in milliseconds (seems to be minimum limit for gcloud)
)
client.update_dataset(dataset, ["default_table_expiration_ms"])
table_ref = f"{gcp_project}.{bigquery_dataset}.table_{random.randrange(100, 999)}"
table = f"{gcp_project}.{bigquery_dataset}.table_{random.randrange(100, 999)}"

job = client.load_table_from_dataframe(df, table_ref)
job = client.load_table_from_dataframe(df, table)
job.result()

return BigQuerySource(table_ref=table_ref, timestamp_field=event_timestamp_column,)
return BigQuerySource(table=table, timestamp_field=event_timestamp_column,)


def simple_bq_source_using_query_arg(df, event_timestamp_column=None) -> BigQuerySource:
bq_source_using_table_ref = simple_bq_source_using_table_ref_arg(
df, event_timestamp_column
)
bq_source_using_table = simple_bq_source_using_table_arg(df, event_timestamp_column)
return BigQuerySource(
name=bq_source_using_table_ref.table_ref,
query=f"SELECT * FROM {bq_source_using_table_ref.table_ref}",
name=bq_source_using_table.table,
query=f"SELECT * FROM {bq_source_using_table.table}",
timestamp_field=event_timestamp_column,
)

0 comments on commit fde556a

Please sign in to comment.