Skip to content

Commit

Permalink
fix: Batch Snowflake materialization queries to obey Snowpark 100 fea… (
Browse files Browse the repository at this point in the history
feast-dev#3406)

fix: Batch Snowflake materialization queries to obey Snowpark 100 feature limit

Signed-off-by: miles.adkins <miles.adkins@snowflake.com>

Signed-off-by: miles.adkins <miles.adkins@snowflake.com>
  • Loading branch information
sfc-gh-madkins authored Apr 21, 2023
1 parent ad48146 commit f9862b5
Showing 1 changed file with 61 additions and 35 deletions.
96 changes: 61 additions & 35 deletions sdk/python/feast/infra/materialization/snowflake_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,32 +276,65 @@ def _materialize_one(

fv_latest_values_sql = offline_job.to_sql()

if feature_view.entity_columns:
join_keys = [entity.name for entity in feature_view.entity_columns]
unique_entities = '"' + '", "'.join(join_keys) + '"'

query = f"""
SELECT
COUNT(DISTINCT {unique_entities})
FROM
{feature_view.batch_source.get_table_query_string()}
"""

with GetSnowflakeConnection(self.repo_config.offline_store) as conn:
entities_to_write = conn.cursor().execute(query).fetchall()[0][0]
else:
entities_to_write = (
1 # entityless feature view has a placeholder entity
)

if feature_view.batch_source.field_mapping is not None:
fv_latest_mapped_values_sql = _run_snowflake_field_mapping(
fv_latest_values_sql, feature_view.batch_source.field_mapping
)

fv_to_proto_sql = self.generate_snowflake_materialization_query(
self.repo_config,
fv_latest_mapped_values_sql,
feature_view,
project,
)
features_full_list = feature_view.features
feature_batches = [
features_full_list[i : i + 100]
for i in range(0, len(features_full_list), 100)
]

if self.repo_config.online_store.type == "snowflake.online":
self.materialize_to_snowflake_online_store(
self.repo_config,
fv_to_proto_sql,
feature_view,
project,
)
rows_to_write = entities_to_write * len(features_full_list)
else:
self.materialize_to_external_online_store(
self.repo_config,
fv_to_proto_sql,
feature_view,
tqdm_builder,
)
rows_to_write = entities_to_write * len(feature_batches)

with tqdm_builder(rows_to_write) as pbar:
for i, feature_batch in enumerate(feature_batches):
fv_to_proto_sql = self.generate_snowflake_materialization_query(
self.repo_config,
fv_latest_mapped_values_sql,
feature_view,
feature_batch,
project,
)

if self.repo_config.online_store.type == "snowflake.online":
self.materialize_to_snowflake_online_store(
self.repo_config,
fv_to_proto_sql,
feature_view,
project,
)
pbar.update(entities_to_write * len(feature_batch))
else:
self.materialize_to_external_online_store(
self.repo_config,
fv_to_proto_sql,
feature_view,
pbar,
)

return SnowflakeMaterializationJob(
job_id=job_id, status=MaterializationJobStatus.SUCCEEDED
Expand All @@ -316,6 +349,7 @@ def generate_snowflake_materialization_query(
repo_config: RepoConfig,
fv_latest_mapped_values_sql: str,
feature_view: Union[BatchFeatureView, FeatureView],
feature_batch: list,
project: str,
) -> str:

Expand All @@ -338,7 +372,7 @@ def generate_snowflake_materialization_query(
UDF serialization function.
"""
feature_sql_list = []
for feature in feature_view.features:
for feature in feature_batch:
feature_value_type_name = feature.dtype.to_value_type().name

feature_sql = _convert_value_name_to_snowflake_udf(
Expand Down Expand Up @@ -434,19 +468,16 @@ def materialize_to_snowflake_online_store(
"""

with GetSnowflakeConnection(repo_config.batch_engine) as conn:
query_id = execute_snowflake_statement(conn, query).sfqid
execute_snowflake_statement(conn, query).sfqid

click.echo(
f"Snowflake Query ID: {Style.BRIGHT + Fore.GREEN}{query_id}{Style.RESET_ALL}"
)
return None

def materialize_to_external_online_store(
self,
repo_config: RepoConfig,
materialization_sql: str,
feature_view: Union[StreamFeatureView, FeatureView],
tqdm_builder: Callable[[int], tqdm],
pbar: tqdm,
) -> None:

feature_names = [feature.name for feature in feature_view.features]
Expand All @@ -455,10 +486,6 @@ def materialize_to_external_online_store(
query = materialization_sql
cursor = execute_snowflake_statement(conn, query)
for i, df in enumerate(cursor.fetch_pandas_batches()):
click.echo(
f"Snowflake: Processing Materialization ResultSet Batch #{i+1}"
)

entity_keys = (
df["entity_key"].apply(EntityKeyProto.FromString).to_numpy()
)
Expand Down Expand Up @@ -494,11 +521,10 @@ def materialize_to_external_online_store(
)
)

with tqdm_builder(len(rows_to_write)) as pbar:
self.online_store.online_write_batch(
repo_config,
feature_view,
rows_to_write,
lambda x: pbar.update(x),
)
self.online_store.online_write_batch(
repo_config,
feature_view,
rows_to_write,
lambda x: pbar.update(x),
)
return None

0 comments on commit f9862b5

Please sign in to comment.