-
Notifications
You must be signed in to change notification settings - Fork 109
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SNOW-1418533 handle dropping temp objects in post actions #2405
base: main
Are you sure you want to change the base?
Conversation
…iables-thread-safe
…iables-thread-safe
…iables-thread-safe
…iables-thread-safe
…ad-safe' into aalam-SNOW-1418523-handle-post-actions
Seems like your changes contain some Local Testing changes, please request review from @snowflakedb/local-testing |
@@ -1673,6 +1678,7 @@ def __init__( | |||
*, | |||
query_id_place_holder: Optional[str] = None, | |||
is_ddl_on_temp_object: bool = False, | |||
temp_name_place_holder: Optional[Tuple[str, TempObjectType]] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible that you can have two temp_name_place_holders, like joining two dataframes with temp tables?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a join statement when resolve can contain multiple temp_name_place_holders, but currently, when we generate temp object like temp tables or temp file format, we create them one at a time.
Seems like your changes contain some Local Testing changes, please request review from @snowflakedb/local-testing |
Seems like your changes contain some Local Testing changes, please request review from @snowflakedb/local-testing |
Seems like your changes contain some Local Testing changes, please request review from @snowflakedb/local-testing |
CHANGELOG.md
Outdated
@@ -55,6 +55,8 @@ | |||
|
|||
### Snowpark Python API Updates | |||
|
|||
- Updated `Session` class to be thread-safe. This allows concurrent dataframe transformations, dataframe actions, UDF and store procedure registration, and concurrent file uploads. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this message is not clear about the change here. Please update the change log to reflect the actual change in this pr, and the reason to add this to help making the session class to be safe
@@ -731,7 +730,7 @@ def large_local_relation_plan( | |||
source_plan: Optional[LogicalPlan], | |||
schema_query: Optional[str], | |||
) -> SnowflakePlan: | |||
temp_table_name = random_name_for_temp_object(TempObjectType.TABLE) | |||
temp_table_name = f"temp_name_placeholder_{generate_random_alphanumeric()}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't that going to impact the df.queries? not just people will see a different table, but is people going to see a non-sense temp table name called "temp_name_placeholder_xxx"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is large_local_relation_plan it only case we will have internally created query scoped temp table?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A more efficient way for those is probably making the temp table session scoped, and let @sfc-gh-jdu 's temp table clean up to handle the table drop off, i guess we can do this as future improvement
@@ -645,10 +648,20 @@ def get_result_set( | |||
final_queries = [] | |||
last_place_holder = None | |||
for q in main_queries: | |||
query = q.sql | |||
if q.temp_name_place_holder: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of doing it here, might be better to do it at the execute_queries when calling plan_compiler to get the final query, so that this can be applied to all user facing queries
run=False, | ||
) | ||
def test_temp_name_placeholder_for_sync(threadsafe_session): | ||
from snowflake.snowpark._internal.analyzer import analyzer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just recall, do we have any multi-threading tests with the new query compilation stage?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we do not at the moment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sfc-gh-aalam can you get a ticket to add the tests? we should make sure all things are tested out
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
src/snowflake/snowpark/async_job.py
Outdated
@@ -405,8 +407,12 @@ def result( | |||
else: | |||
raise ValueError(f"{async_result_type} is not supported") | |||
for action in self._post_actions: | |||
query = action.sql | |||
if self._placeholders: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need a special handling for async job ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because when dropping temp objects after async job is done, we need to drop the correct temp object. This information needs to be propagated to async job as well.
Seems like your changes contain some Local Testing changes, please request review from @snowflakedb/local-testing |
Seems like your changes contain some Local Testing changes, please request review from @snowflakedb/local-testing |
Seems like your changes contain some Local Testing changes, please request review from @snowflakedb/local-testing |
Seems like your changes contain some Local Testing changes, please request review from @snowflakedb/local-testing |
@@ -294,7 +294,30 @@ def execution_queries(self) -> Dict["PlanQueryType", List["Query"]]: | |||
from snowflake.snowpark._internal.compiler.plan_compiler import PlanCompiler | |||
|
|||
compiler = PlanCompiler(self) | |||
return compiler.compile() | |||
compiled_queries = compiler.compile() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually, let's further move this to the compile() function, the could keep the code clear, where the compile will be responsible to compile the plan into a set of executable queries.
You can do this post processing at the end right before we return the final query, and extract the following as a separate function
compiled_queries = compiler.compile() | ||
|
||
if self.session._conn._thread_safe_session_enabled: | ||
placeholders = {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add comment here about what are we doing here, and why are we doing this
@@ -1719,6 +1765,13 @@ def __init__( | |||
if query_id_place_holder | |||
else f"query_id_place_holder_{generate_random_alphanumeric()}" | |||
) | |||
# This is a temporary workaround to handle the case when a snowflake plan is created |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i am not sure if that is a temporary workaround. The sessions scoped temp table could be a long term solution, but it may have other draw back also, you can mention this as an alternative solution below
# in the following way in a multi-threaded environment: | ||
# 1. Create a temp object | ||
# 2. Use the temp object in a query | ||
# 3. Drop the temp object |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this comment explained the scenario that needs that field, but not really explained how this field can be used to solve the problem, can you add some more detailed comment here
@@ -666,6 +669,7 @@ def get_result_set( | |||
num_statements=len(main_queries), | |||
params=params, | |||
ignore_results=ignore_results, | |||
async_post_actions=post_actions, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is that fixing a bug?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes indeed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
earlier it would pull post_actions from snowflake plan which may not have drop table commands coming from large query breakdown
@@ -1711,6 +1756,7 @@ def __init__( | |||
*, | |||
query_id_place_holder: Optional[str] = None, | |||
is_ddl_on_temp_object: bool = False, | |||
temp_name_place_holder: Optional[Tuple[str, TempObjectType]] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's call this temp_obj_name_placeholder to the more clear
@@ -15,7 +15,7 @@ | |||
|
|||
#### Improvements | |||
|
|||
- Disables sql simplification when sort is performed after limit. | |||
- Disables sql simplification when sort is performed after limit. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sfc-gh-aalam do we have a ticket to track the documentation for multi-threading session object, i think you will want to mention this behavior in the doc on release
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
run=False, | ||
) | ||
def test_temp_name_placeholder_for_sync(threadsafe_session): | ||
from snowflake.snowpark._internal.analyzer import analyzer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sfc-gh-aalam can you get a ticket to add the tests? we should make sure all things are tested out
Seems like your changes contain some Local Testing changes, please request review from @snowflakedb/local-testing |
Seems like your changes contain some Local Testing changes, please request review from @snowflakedb/local-testing |
Seems like your changes contain some Local Testing changes, please request review from @snowflakedb/local-testing |
Which Jira issue is this PR addressing? Make sure that there is an accompanying issue to your PR.
Fixes SNOW-1418533
Fill out the following pre-review checklist:
Please describe how your code solves the related issue.
This PR updates the way we generate name for temp objects. This is done to protect unexpected behavior when query generated by snowflake plan is as follows:
i. Create temp object
ii. Build sql query involving the temp object
iii. Drop temp object in
post_actions
.When a dataframe with queries that create the same temp object and drop the same temp object, when run using multiple threads might cause a failure if one thread has not started working on temp object but a different thread has already dropped the temp object.
In this PR, we update temp name creation for those objects that are dropped in a subsequent post action. We add
temp_name_placeholder
inQuery
class and generate a temp object name at query submission stage similar toquery_id_placeholder
.