Skip to content

Commit

Permalink
SNOW-1645316: Modularize skip lqb conditions and add no db check (#2451)
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-aalam authored Oct 23, 2024
1 parent b239039 commit 9dd7a23
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 16 deletions.
76 changes: 62 additions & 14 deletions src/snowflake/snowpark/_internal/compiler/large_query_breakdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,15 @@ def __init__(
self.complexity_score_upper_bound = complexity_bounds[1]

def apply(self) -> List[LogicalPlan]:
if is_active_transaction(self.session):
reason = self._should_skip_optimization_for_session()
if reason is not None:
# Skip optimization if the session is in an active transaction.
_logger.debug(
"Skipping large query breakdown optimization due to active transaction."
)
self.session._conn._telemetry_client.send_large_query_optimization_skipped_telemetry(
self.session.session_id,
SkipLargeQueryBreakdownCategory.ACTIVE_TRANSACTION.value,
reason.value,
)
return self.logical_plans

Expand All @@ -145,6 +146,62 @@ def apply(self) -> List[LogicalPlan]:

return resulting_plans

def _should_skip_optimization_for_session(
self,
) -> Optional[SkipLargeQueryBreakdownCategory]:
"""Method to check if the optimization should be skipped based on the session state.
Returns:
SkipLargeQueryBreakdownCategory: enum indicating the reason for skipping the optimization.
if the optimization should be skipped, otherwise None.
"""
if self.session.get_current_database() is None:
# Skip optimization if there is no active database.
_logger.debug(
"Skipping large query breakdown optimization since there is no active database."
)
return SkipLargeQueryBreakdownCategory.NO_ACTIVE_DATABASE

if self.session.get_current_schema() is None:
# Skip optimization if there is no active schema.
_logger.debug(
"Skipping large query breakdown optimization since there is no active schema."
)
return SkipLargeQueryBreakdownCategory.NO_ACTIVE_SCHEMA

if is_active_transaction(self.session):
# Skip optimization if the session is in an active transaction.
_logger.debug(
"Skipping large query breakdown optimization due to active transaction."
)
return SkipLargeQueryBreakdownCategory.ACTIVE_TRANSACTION

return None

def _should_skip_optimization_for_root(
self, root: TreeNode
) -> Optional[SkipLargeQueryBreakdownCategory]:
"""Method to check if the optimization should be skipped based on the root node type.
Returns:
SkipLargeQueryBreakdownCategory enum indicating the reason for skipping the optimization
if the optimization should be skipped, otherwise None.
"""
if (
isinstance(root, SnowflakePlan)
and root.source_plan is not None
and isinstance(
root.source_plan, (CreateViewCommand, CreateDynamicTableCommand)
)
):
# Skip optimization if the root is a view or a dynamic table.
_logger.debug(
"Skipping large query breakdown optimization for view/dynamic table plan."
)
return SkipLargeQueryBreakdownCategory.VIEW_DYNAMIC_TABLE

return None

def _try_to_breakdown_plan(self, root: TreeNode) -> List[LogicalPlan]:
"""Method to breakdown a single plan into smaller partitions based on
cumulative complexity score and node type.
Expand All @@ -161,20 +218,11 @@ def _try_to_breakdown_plan(self, root: TreeNode) -> List[LogicalPlan]:
_logger.debug(
f"Applying large query breakdown optimization for root of type {type(root)}"
)
if (
isinstance(root, SnowflakePlan)
and root.source_plan is not None
and isinstance(
root.source_plan, (CreateViewCommand, CreateDynamicTableCommand)
)
):
# Skip optimization if the root is a view or a dynamic table.
_logger.debug(
"Skipping large query breakdown optimization for view/dynamic table plan."
)
reason = self._should_skip_optimization_for_root(root)
if reason is not None:
self.session._conn._telemetry_client.send_large_query_optimization_skipped_telemetry(
self.session.session_id,
SkipLargeQueryBreakdownCategory.VIEW_DYNAMIC_TABLE.value,
reason.value,
)
return [root]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,5 @@ class CompilationStageTelemetryField(Enum):
class SkipLargeQueryBreakdownCategory(Enum):
ACTIVE_TRANSACTION = "active transaction"
VIEW_DYNAMIC_TABLE = "view or dynamic table command"
NO_ACTIVE_DATABASE = "no active database"
NO_ACTIVE_SCHEMA = "no active schema"
38 changes: 36 additions & 2 deletions tests/integ/test_large_query_breakdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,18 +432,52 @@ def test_optimization_skipped_with_views_and_dynamic_tables(session, caplog):
in caplog.text
)

with caplog.at_level(logging.DEBUG):
df.create_or_replace_view(view_name)
with patch.object(
session._conn._telemetry_client,
"send_large_query_optimization_skipped_telemetry",
) as patched_send_telemetry:
with caplog.at_level(logging.DEBUG):
df.create_or_replace_view(view_name)
assert (
"Skipping large query breakdown optimization for view/dynamic table plan"
in caplog.text
)
patched_send_telemetry.assert_called_once
called_with_reason = patched_send_telemetry.call_args[0][1]
assert called_with_reason == "view or dynamic table command"
finally:
Utils.drop_dynamic_table(session, table_name)
Utils.drop_view(session, view_name)
Utils.drop_table(session, source_table)


@pytest.mark.skipif(
IS_IN_STORED_PROC, reason="cannot create a new session in stored procedure"
)
@pytest.mark.parametrize("db_or_schema", ["database", "schema"])
def test_optimization_skipped_with_no_active_db_or_schema(
session, db_or_schema, caplog
):
df = session.sql("select 1 as a, 2 as b")

# no database check
with patch.object(
session._conn._telemetry_client,
"send_large_query_optimization_skipped_telemetry",
) as patched_send_telemetry:
with patch.object(session, f"get_current_{db_or_schema}", return_value=None):
with caplog.at_level(logging.DEBUG):
with SqlCounter(query_count=0, describe_count=0):
df.queries
assert (
f"Skipping large query breakdown optimization since there is no active {db_or_schema}"
in caplog.text
)
patched_send_telemetry.assert_called_once
called_with_reason = patched_send_telemetry.call_args[0][1]
assert called_with_reason == f"no active {db_or_schema}"


def test_async_job_with_large_query_breakdown(session, large_query_df):
"""Test large query breakdown gives same result for async and non-async jobs"""
with SqlCounter(query_count=2):
Expand Down

0 comments on commit 9dd7a23

Please sign in to comment.