Skip to content

Commit

Permalink
fix: Snowflake remote storage (#3574)
Browse files Browse the repository at this point in the history
* fix: Snowflake remote storage

Signed-off-by: adamschmidt <aschmidt1978@gmail.com>

* fix: lint

Signed-off-by: adamschmidt <aschmidt1978@gmail.com>

* fix: field string build

Signed-off-by: adamschmidt <aschmidt1978@gmail.com>

* fix: join typo

Signed-off-by: adamschmidt <aschmidt1978@gmail.com>

* fix: formatting

Signed-off-by: adamschmidt <aschmidt1978@gmail.com>

---------

Signed-off-by: adamschmidt <aschmidt1978@gmail.com>
  • Loading branch information
adamschmidt authored Apr 3, 2023
1 parent 8b90e2f commit f8d3890
Showing 1 changed file with 28 additions and 2 deletions.
30 changes: 28 additions & 2 deletions sdk/python/feast/infra/offline_stores/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ class SnowflakeOfflineStoreConfig(FeastConfigBaseModel):
blob_export_location: Optional[str] = None
""" Location (in S3, Google storage or Azure storage) where data is offloaded """

convert_timestamp_columns: Optional[bool] = None
""" Convert timestamp columns on export to a Parquet-supported format """

class Config:
allow_population_by_field_name = True

Expand Down Expand Up @@ -152,6 +155,29 @@ def pull_latest_from_table_or_query(
+ '"'
)

if config.offline_store.convert_timestamp_columns:
select_fields = list(
map(
lambda field_name: f'"{field_name}"',
join_key_columns + feature_name_columns,
)
)
select_timestamps = list(
map(
lambda field_name: f"to_varchar({field_name}, 'YYYY-MM-DD\"T\"HH24:MI:SS.FFTZH:TZM') as {field_name}",
timestamp_columns,
)
)
inner_field_string = ", ".join(select_fields + select_timestamps)
else:
select_fields = list(
map(
lambda field_name: f'"{field_name}"',
join_key_columns + feature_name_columns + timestamp_columns,
)
)
inner_field_string = ", ".join(select_fields)

if data_source.snowflake_options.warehouse:
config.offline_store.warehouse = data_source.snowflake_options.warehouse

Expand All @@ -166,7 +192,7 @@ def pull_latest_from_table_or_query(
{field_string}
{f''', TRIM({repr(DUMMY_ENTITY_VAL)}::VARIANT,'"') AS "{DUMMY_ENTITY_ID}"''' if not join_key_columns else ""}
FROM (
SELECT {field_string},
SELECT {inner_field_string},
ROW_NUMBER() OVER({partition_by_join_key_string} ORDER BY {timestamp_desc_string}) AS "_feast_row"
FROM {from_expression}
WHERE "{timestamp_field}" BETWEEN TIMESTAMP '{start_date}' AND TIMESTAMP '{end_date}'
Expand Down Expand Up @@ -533,7 +559,7 @@ def to_remote_storage(self) -> List[str]:
self.to_snowflake(table)

query = f"""
COPY INTO '{self.config.offline_store.blob_export_location}/{table}' FROM "{self.config.offline_store.database}"."{self.config.offline_store.schema_}"."{table}"\n
COPY INTO '{self.export_path}/{table}' FROM "{self.config.offline_store.database}"."{self.config.offline_store.schema_}"."{table}"\n
STORAGE_INTEGRATION = {self.config.offline_store.storage_integration_name}\n
FILE_FORMAT = (TYPE = PARQUET)
DETAILED_OUTPUT = TRUE
Expand Down

0 comments on commit f8d3890

Please sign in to comment.