diff --git a/java/serving/src/test/java/feast/serving/util/DataGenerator.java b/java/serving/src/test/java/feast/serving/util/DataGenerator.java index 261c9ac134..e38d1ce459 100644 --- a/java/serving/src/test/java/feast/serving/util/DataGenerator.java +++ b/java/serving/src/test/java/feast/serving/util/DataGenerator.java @@ -210,11 +210,10 @@ public static DataSource createFileDataSourceSpec( } public static DataSource createBigQueryDataSourceSpec( - String bigQueryTableRef, String timestampColumn, String datePartitionColumn) { + String bigQueryTable, String timestampColumn, String datePartitionColumn) { return DataSource.newBuilder() .setType(DataSource.SourceType.BATCH_BIGQUERY) - .setBigqueryOptions( - DataSource.BigQueryOptions.newBuilder().setTableRef(bigQueryTableRef).build()) + .setBigqueryOptions(DataSource.BigQueryOptions.newBuilder().setTable(bigQueryTable).build()) .setTimestampField(timestampColumn) .setDatePartitionColumn(datePartitionColumn) .build(); diff --git a/protos/feast/core/DataSource.proto b/protos/feast/core/DataSource.proto index 961b5ac188..0922cc1f25 100644 --- a/protos/feast/core/DataSource.proto +++ b/protos/feast/core/DataSource.proto @@ -98,7 +98,7 @@ message DataSource { // Defines options for DataSource that sources features from a BigQuery Query message BigQueryOptions { // Full table reference in the form of [project:dataset.table] - string table_ref = 1; + string table = 1; // SQL query that returns a table containing feature data. Must contain an event_timestamp column, and respective // entity columns diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index 2ca05e9ae6..1e27fc326b 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -361,7 +361,7 @@ def persist(self, storage: SavedDatasetStorage): assert isinstance(storage, SavedDatasetBigQueryStorage) self.to_bigquery( - bigquery.QueryJobConfig(destination=storage.bigquery_options.table_ref) + bigquery.QueryJobConfig(destination=storage.bigquery_options.table) ) @property diff --git a/sdk/python/feast/infra/offline_stores/bigquery_source.py b/sdk/python/feast/infra/offline_stores/bigquery_source.py index eca9bdf21e..31b0ed617e 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery_source.py +++ b/sdk/python/feast/infra/offline_stores/bigquery_source.py @@ -18,7 +18,6 @@ def __init__( self, event_timestamp_column: Optional[str] = "", table: Optional[str] = None, - table_ref: Optional[str] = None, created_timestamp_column: Optional[str] = "", field_mapping: Optional[Dict[str, str]] = None, date_partition_column: Optional[str] = None, @@ -33,14 +32,13 @@ def __init__( Args: table (optional): The BigQuery table where features can be found. - table_ref (optional): (Deprecated) The BigQuery table where features can be found. event_timestamp_column: (Deprecated) Event timestamp column used for point in time joins of feature values. created_timestamp_column (optional): Timestamp column when row was created, used for deduplicating rows. field_mapping: A dictionary mapping of column names in this data source to feature names in a feature table or view. Only used for feature columns, not entities or timestamp columns. date_partition_column (deprecated): Timestamp column used for partitioning. query (optional): SQL query to execute to generate data for this data source. - name (optional): Name for the source. Defaults to the table_ref if not specified. + name (optional): Name for the source. Defaults to the table if not specified. description (optional): A human-readable description. tags (optional): A dictionary of key-value pairs to store arbitrary metadata. owner (optional): The owner of the bigquery source, typically the email of the primary @@ -51,18 +49,10 @@ def __init__( >>> from feast import BigQuerySource >>> my_bigquery_source = BigQuerySource(table="gcp_project:bq_dataset.bq_table") """ - if table is None and table_ref is None and query is None: + if table is None and query is None: raise ValueError('No "table" or "query" argument provided.') - if not table and table_ref: - warnings.warn( - ( - "The argument 'table_ref' is being deprecated. Please use 'table' " - "instead. Feast 0.20 and onwards will not support the argument 'table_ref'." - ), - DeprecationWarning, - ) - table = table_ref - self.bigquery_options = BigQueryOptions(table_ref=table, query=query) + + self.bigquery_options = BigQueryOptions(table=table, query=query) if date_partition_column: warnings.warn( @@ -73,13 +63,11 @@ def __init__( DeprecationWarning, ) - # If no name, use the table_ref as the default name + # If no name, use the table as the default name _name = name if not _name: if table: _name = table - elif table_ref: - _name = table_ref else: warnings.warn( ( @@ -111,7 +99,7 @@ def __eq__(self, other): return ( self.name == other.name - and self.bigquery_options.table_ref == other.bigquery_options.table_ref + and self.bigquery_options.table == other.bigquery_options.table and self.bigquery_options.query == other.bigquery_options.query and self.timestamp_field == other.timestamp_field and self.created_timestamp_column == other.created_timestamp_column @@ -122,8 +110,8 @@ def __eq__(self, other): ) @property - def table_ref(self): - return self.bigquery_options.table_ref + def table(self): + return self.bigquery_options.table @property def query(self): @@ -137,7 +125,7 @@ def from_proto(data_source: DataSourceProto): return BigQuerySource( name=data_source.name, field_mapping=dict(data_source.field_mapping), - table_ref=data_source.bigquery_options.table_ref, + table=data_source.bigquery_options.table, timestamp_field=data_source.timestamp_field, created_timestamp_column=data_source.created_timestamp_column, query=data_source.bigquery_options.query, @@ -169,14 +157,14 @@ def validate(self, config: RepoConfig): client = bigquery.Client() try: - client.get_table(self.table_ref) + client.get_table(self.table) except NotFound: - raise DataSourceNotFoundException(self.table_ref) + raise DataSourceNotFoundException(self.table) def get_table_query_string(self) -> str: """Returns a string that can directly be used to reference this table in SQL""" - if self.table_ref: - return f"`{self.table_ref}`" + if self.table: + return f"`{self.table}`" else: return f"({self.query})" @@ -190,8 +178,8 @@ def get_table_column_names_and_types( from google.cloud import bigquery client = bigquery.Client() - if self.table_ref is not None: - schema = client.get_table(self.table_ref).schema + if self.table is not None: + schema = client.get_table(self.table).schema if not isinstance(schema[0], bigquery.schema.SchemaField): raise TypeError("Could not parse BigQuery table schema.") else: @@ -215,9 +203,9 @@ class BigQueryOptions: """ def __init__( - self, table_ref: Optional[str], query: Optional[str], + self, table: Optional[str], query: Optional[str], ): - self._table_ref = table_ref + self._table = table self._query = query @property @@ -235,18 +223,18 @@ def query(self, query): self._query = query @property - def table_ref(self): + def table(self): """ Returns the table ref of this BQ table """ - return self._table_ref + return self._table - @table_ref.setter - def table_ref(self, table_ref): + @table.setter + def table(self, table): """ Sets the table ref of this BQ table """ - self._table_ref = table_ref + self._table = table @classmethod def from_proto(cls, bigquery_options_proto: DataSourceProto.BigQueryOptions): @@ -261,8 +249,7 @@ def from_proto(cls, bigquery_options_proto: DataSourceProto.BigQueryOptions): """ bigquery_options = cls( - table_ref=bigquery_options_proto.table_ref, - query=bigquery_options_proto.query, + table=bigquery_options_proto.table, query=bigquery_options_proto.query, ) return bigquery_options @@ -276,7 +263,7 @@ def to_proto(self) -> DataSourceProto.BigQueryOptions: """ bigquery_options_proto = DataSourceProto.BigQueryOptions( - table_ref=self.table_ref, query=self.query, + table=self.table, query=self.query, ) return bigquery_options_proto @@ -287,15 +274,13 @@ class SavedDatasetBigQueryStorage(SavedDatasetStorage): bigquery_options: BigQueryOptions - def __init__(self, table_ref: str): - self.bigquery_options = BigQueryOptions(table_ref=table_ref, query=None) + def __init__(self, table: str): + self.bigquery_options = BigQueryOptions(table=table, query=None) @staticmethod def from_proto(storage_proto: SavedDatasetStorageProto) -> SavedDatasetStorage: return SavedDatasetBigQueryStorage( - table_ref=BigQueryOptions.from_proto( - storage_proto.bigquery_storage - ).table_ref + table=BigQueryOptions.from_proto(storage_proto.bigquery_storage).table ) def to_proto(self) -> SavedDatasetStorageProto: @@ -304,4 +289,4 @@ def to_proto(self) -> SavedDatasetStorageProto: ) def to_data_source(self) -> DataSource: - return BigQuerySource(table_ref=self.bigquery_options.table_ref) + return BigQuerySource(table=self.bigquery_options.table) diff --git a/sdk/python/feast/templates/gcp/driver_repo.py b/sdk/python/feast/templates/gcp/driver_repo.py index e2e1250c62..7ad7586020 100644 --- a/sdk/python/feast/templates/gcp/driver_repo.py +++ b/sdk/python/feast/templates/gcp/driver_repo.py @@ -20,7 +20,7 @@ # datasets or materializing features into an online store. driver_stats_source = BigQuerySource( # The BigQuery table where features can be found - table_ref="feast-oss.demo_data.driver_hourly_stats_2", + table="feast-oss.demo_data.driver_hourly_stats_2", # The event timestamp is used for point-in-time joins and for ensuring only # features within the TTL are returned timestamp_field="event_timestamp", diff --git a/sdk/python/tests/example_repos/example_feature_repo_1.py b/sdk/python/tests/example_repos/example_feature_repo_1.py index db9a0ff991..0ceeb605bc 100644 --- a/sdk/python/tests/example_repos/example_feature_repo_1.py +++ b/sdk/python/tests/example_repos/example_feature_repo_1.py @@ -30,12 +30,12 @@ customer_profile_source = BigQuerySource( name="customer_profile_source", - table_ref="feast-oss.public.customers", + table="feast-oss.public.customers", timestamp_field="event_timestamp", ) customer_driver_combined_source = BigQuerySource( - table_ref="feast-oss.public.customer_driver", timestamp_field="event_timestamp", + table="feast-oss.public.customer_driver", timestamp_field="event_timestamp", ) driver_locations_push_source = PushSource( diff --git a/sdk/python/tests/integration/feature_repos/universal/data_sources/bigquery.py b/sdk/python/tests/integration/feature_repos/universal/data_sources/bigquery.py index cb7113bf66..e1c529da59 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_sources/bigquery.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_sources/bigquery.py @@ -74,7 +74,7 @@ def create_data_source( self.tables.append(destination_name) return BigQuerySource( - table_ref=destination_name, + table=destination_name, timestamp_field=timestamp_field, created_timestamp_column=created_timestamp_column, field_mapping=field_mapping or {"ts_1": "ts"}, @@ -84,7 +84,7 @@ def create_saved_dataset_destination(self) -> SavedDatasetBigQueryStorage: table = self.get_prefixed_table_name( f"persisted_{str(uuid.uuid4()).replace('-', '_')}" ) - return SavedDatasetBigQueryStorage(table_ref=table) + return SavedDatasetBigQueryStorage(table=table) def get_prefixed_table_name(self, suffix: str) -> str: return f"{self.client.project}.{self.project_name}.{suffix}" diff --git a/sdk/python/tests/integration/registration/test_feature_store.py b/sdk/python/tests/integration/registration/test_feature_store.py index a1eab11070..eed7b144ea 100644 --- a/sdk/python/tests/integration/registration/test_feature_store.py +++ b/sdk/python/tests/integration/registration/test_feature_store.py @@ -33,7 +33,7 @@ from tests.utils.data_source_utils import ( prep_file_source, simple_bq_source_using_query_arg, - simple_bq_source_using_table_ref_arg, + simple_bq_source_using_table_arg, ) @@ -234,7 +234,7 @@ def test_feature_view_inference_success(test_feature_store, dataframe_source): entities=["id"], ttl=timedelta(minutes=5), online=True, - batch_source=simple_bq_source_using_table_ref_arg(dataframe_source, "ts_1"), + batch_source=simple_bq_source_using_table_arg(dataframe_source, "ts_1"), tags={}, ) @@ -255,7 +255,7 @@ def test_feature_view_inference_success(test_feature_store, dataframe_source): actual_file_source = { (feature.name, feature.dtype) for feature in feature_view_1.features } - actual_bq_using_table_ref_arg_source = { + actual_bq_using_table_arg_source = { (feature.name, feature.dtype) for feature in feature_view_2.features } actual_bq_using_query_arg_source = { @@ -270,7 +270,7 @@ def test_feature_view_inference_success(test_feature_store, dataframe_source): assert ( expected == actual_file_source - == actual_bq_using_table_ref_arg_source + == actual_bq_using_table_arg_source == actual_bq_using_query_arg_source ) diff --git a/sdk/python/tests/integration/registration/test_inference.py b/sdk/python/tests/integration/registration/test_inference.py index 288439874f..aa359771b9 100644 --- a/sdk/python/tests/integration/registration/test_inference.py +++ b/sdk/python/tests/integration/registration/test_inference.py @@ -31,7 +31,7 @@ from tests.utils.data_source_utils import ( prep_file_source, simple_bq_source_using_query_arg, - simple_bq_source_using_table_ref_arg, + simple_bq_source_using_table_arg, ) @@ -120,7 +120,7 @@ def test_update_file_data_source_with_inferred_event_timestamp_col(simple_datase with prep_file_source(df=simple_dataset_1) as file_source: data_sources = [ file_source, - simple_bq_source_using_table_ref_arg(simple_dataset_1), + simple_bq_source_using_table_arg(simple_dataset_1), simple_bq_source_using_query_arg(simple_dataset_1), ] update_data_sources_with_inferred_event_timestamp_col( diff --git a/sdk/python/tests/integration/scaffolding/test_partial_apply.py b/sdk/python/tests/integration/scaffolding/test_partial_apply.py index c66529080f..758ff9e536 100644 --- a/sdk/python/tests/integration/scaffolding/test_partial_apply.py +++ b/sdk/python/tests/integration/scaffolding/test_partial_apply.py @@ -20,7 +20,7 @@ def test_partial() -> None: ) as store: driver_locations_source = BigQuerySource( - table_ref="feast-oss.public.drivers", + table="feast-oss.public.drivers", timestamp_field="event_timestamp", created_timestamp_column="created_timestamp", ) diff --git a/sdk/python/tests/utils/data_source_utils.py b/sdk/python/tests/utils/data_source_utils.py index d98c047cd6..5bb5a622d6 100644 --- a/sdk/python/tests/utils/data_source_utils.py +++ b/sdk/python/tests/utils/data_source_utils.py @@ -23,9 +23,7 @@ def prep_file_source(df, event_timestamp_column=None) -> Iterator[FileSource]: yield file_source -def simple_bq_source_using_table_ref_arg( - df, event_timestamp_column=None -) -> BigQuerySource: +def simple_bq_source_using_table_arg(df, event_timestamp_column=None) -> BigQuerySource: client = bigquery.Client() gcp_project = client.project bigquery_dataset = f"ds_{time.time_ns()}" @@ -37,20 +35,18 @@ def simple_bq_source_using_table_ref_arg( * 60 # 60 minutes in milliseconds (seems to be minimum limit for gcloud) ) client.update_dataset(dataset, ["default_table_expiration_ms"]) - table_ref = f"{gcp_project}.{bigquery_dataset}.table_{random.randrange(100, 999)}" + table = f"{gcp_project}.{bigquery_dataset}.table_{random.randrange(100, 999)}" - job = client.load_table_from_dataframe(df, table_ref) + job = client.load_table_from_dataframe(df, table) job.result() - return BigQuerySource(table_ref=table_ref, timestamp_field=event_timestamp_column,) + return BigQuerySource(table=table, timestamp_field=event_timestamp_column,) def simple_bq_source_using_query_arg(df, event_timestamp_column=None) -> BigQuerySource: - bq_source_using_table_ref = simple_bq_source_using_table_ref_arg( - df, event_timestamp_column - ) + bq_source_using_table = simple_bq_source_using_table_arg(df, event_timestamp_column) return BigQuerySource( - name=bq_source_using_table_ref.table_ref, - query=f"SELECT * FROM {bq_source_using_table_ref.table_ref}", + name=bq_source_using_table.table, + query=f"SELECT * FROM {bq_source_using_table.table}", timestamp_field=event_timestamp_column, )