Skip to content

Commit

Permalink
fix: Remove snowflake source warehouse tech debt (feast-dev#3422)
Browse files Browse the repository at this point in the history
Signed-off-by: Miles Adkins <miles.adkins@snowflake.com>
  • Loading branch information
sfc-gh-madkins authored Apr 21, 2023
1 parent b27472f commit 7da0580
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 26 deletions.
5 changes: 2 additions & 3 deletions protos/feast/core/DataSource.proto
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@ message DataSource {

// Defines options for DataSource that sources features from a Snowflake Query
message SnowflakeOptions {
reserved 5; // Snowflake warehouse name

// Snowflake table name
string table = 1;

Expand All @@ -209,9 +211,6 @@ message DataSource {

// Snowflake schema name
string database = 4;

// Snowflake warehouse name
string warehouse = 5;
}

// Defines options for DataSource that sources features from a spark table/query
Expand Down
8 changes: 1 addition & 7 deletions sdk/python/feast/infra/offline_stores/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ def pull_latest_from_table_or_query(
)
select_timestamps = list(
map(
lambda field_name: f"to_varchar({field_name}, 'YYYY-MM-DD\"T\"HH24:MI:SS.FFTZH:TZM') as {field_name}",
lambda field_name: f"TO_VARCHAR({field_name}, 'YYYY-MM-DD\"T\"HH24:MI:SS.FFTZH:TZM') AS {field_name}",
timestamp_columns,
)
)
Expand All @@ -178,9 +178,6 @@ def pull_latest_from_table_or_query(
)
inner_field_string = ", ".join(select_fields)

if data_source.snowflake_options.warehouse:
config.offline_store.warehouse = data_source.snowflake_options.warehouse

with GetSnowflakeConnection(config.offline_store) as conn:
snowflake_conn = conn

Expand Down Expand Up @@ -232,9 +229,6 @@ def pull_all_from_table_or_query(
+ '"'
)

if data_source.snowflake_options.warehouse:
config.offline_store.warehouse = data_source.snowflake_options.warehouse

with GetSnowflakeConnection(config.offline_store) as conn:
snowflake_conn = conn

Expand Down
23 changes: 9 additions & 14 deletions sdk/python/feast/infra/offline_stores/snowflake_source.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import warnings
from typing import Callable, Dict, Iterable, Optional, Tuple

from typeguard import typechecked
Expand Down Expand Up @@ -45,7 +46,6 @@ def __init__(
timestamp_field (optional): Event timestamp field used for point in time
joins of feature values.
database (optional): Snowflake database where the features are stored.
warehouse (optional): Snowflake warehouse where the database is stored.
schema (optional): Snowflake schema in which the table is located.
table (optional): Snowflake table where the features are stored. Exactly one of 'table'
and 'query' must be specified.
Expand All @@ -60,6 +60,14 @@ def __init__(
owner (optional): The owner of the snowflake source, typically the email of the primary
maintainer.
"""

if warehouse:
warnings.warn(
"Specifying a warehouse within a SnowflakeSource is to be deprecated."
"Starting v0.32.0, the warehouse as part of the Snowflake store config will be used.",
RuntimeWarning,
)

if table is None and query is None:
raise ValueError('No "table" or "query" argument provided.')
if table and query:
Expand All @@ -73,7 +81,6 @@ def __init__(
schema=_schema,
table=table,
query=query,
warehouse=warehouse,
)

# If no name, use the table as the default name.
Expand Down Expand Up @@ -109,7 +116,6 @@ def from_proto(data_source: DataSourceProto):
database=data_source.snowflake_options.database,
schema=data_source.snowflake_options.schema,
table=data_source.snowflake_options.table,
warehouse=data_source.snowflake_options.warehouse,
created_timestamp_column=data_source.created_timestamp_column,
field_mapping=dict(data_source.field_mapping),
query=data_source.snowflake_options.query,
Expand All @@ -134,7 +140,6 @@ def __eq__(self, other):
and self.schema == other.schema
and self.table == other.table
and self.query == other.query
and self.warehouse == other.warehouse
)

@property
Expand All @@ -157,11 +162,6 @@ def query(self):
"""Returns the snowflake options of this snowflake source."""
return self.snowflake_options.query

@property
def warehouse(self):
"""Returns the warehouse of this snowflake source."""
return self.snowflake_options.warehouse

def to_proto(self) -> DataSourceProto:
"""
Converts a SnowflakeSource object to its protobuf representation.
Expand Down Expand Up @@ -335,13 +335,11 @@ def __init__(
schema: Optional[str],
table: Optional[str],
query: Optional[str],
warehouse: Optional[str],
):
self.database = database or ""
self.schema = schema or ""
self.table = table or ""
self.query = query or ""
self.warehouse = warehouse or ""

@classmethod
def from_proto(cls, snowflake_options_proto: DataSourceProto.SnowflakeOptions):
Expand All @@ -359,7 +357,6 @@ def from_proto(cls, snowflake_options_proto: DataSourceProto.SnowflakeOptions):
schema=snowflake_options_proto.schema,
table=snowflake_options_proto.table,
query=snowflake_options_proto.query,
warehouse=snowflake_options_proto.warehouse,
)

return snowflake_options
Expand All @@ -376,7 +373,6 @@ def to_proto(self) -> DataSourceProto.SnowflakeOptions:
schema=self.schema,
table=self.table,
query=self.query,
warehouse=self.warehouse,
)

return snowflake_options_proto
Expand All @@ -393,7 +389,6 @@ def __init__(self, table_ref: str):
schema=None,
table=table_ref,
query=None,
warehouse=None,
)

@staticmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ def create_data_source(
timestamp_field=timestamp_field,
created_timestamp_column=created_timestamp_column,
field_mapping=field_mapping or {"ts_1": "ts"},
warehouse=self.offline_store_config.warehouse,
)

def create_saved_dataset_destination(self) -> SavedDatasetSnowflakeStorage:
Expand Down
1 change: 0 additions & 1 deletion sdk/python/tests/unit/test_data_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ def test_proto_conversion():
snowflake_source = SnowflakeSource(
name="test_source",
database="test_database",
warehouse="test_warehouse",
schema="test_schema",
table="test_table",
timestamp_field="event_timestamp",
Expand Down

0 comments on commit 7da0580

Please sign in to comment.