Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1645316: Modularize skip lqb conditions and add no db check #2451

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,15 @@ def __init__(
self.complexity_score_upper_bound = complexity_bounds[1]

def apply(self) -> List[LogicalPlan]:
if is_active_transaction(self.session):
reason = self._should_skip_optimization_for_session()
if reason is not None:
# Skip optimization if the session is in an active transaction.
_logger.debug(
"Skipping large query breakdown optimization due to active transaction."
)
self.session._conn._telemetry_client.send_large_query_optimization_skipped_telemetry(
self.session.session_id,
SkipLargeQueryBreakdownCategory.ACTIVE_TRANSACTION.value,
reason.value,
)
return self.logical_plans

Expand All @@ -145,6 +146,62 @@ def apply(self) -> List[LogicalPlan]:

return resulting_plans

def _should_skip_optimization_for_session(
self,
) -> Optional[SkipLargeQueryBreakdownCategory]:
"""Method to check if the optimization should be skipped based on the session state.

Returns:
SkipLargeQueryBreakdownCategory: enum indicating the reason for skipping the optimization.
if the optimization should be skipped, otherwise None.
"""
if self.session.get_current_database() is None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about schema? when there no current schema, that won't work also

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to see if it is possible to get no schema. Snowflake automatically picks PUBLIC so I could create a scenario where there is no schema.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a check anyways.

# Skip optimization if there is no active database.
_logger.debug(
"Skipping large query breakdown optimization since there is no active database."
)
return SkipLargeQueryBreakdownCategory.NO_ACTIVE_DATABASE

if self.session.get_current_schema() is None:
# Skip optimization if there is no active schema.
_logger.debug(
"Skipping large query breakdown optimization since there is no active schema."
)
return SkipLargeQueryBreakdownCategory.NO_ACTIVE_SCHEMA

if is_active_transaction(self.session):
# Skip optimization if the session is in an active transaction.
_logger.debug(
"Skipping large query breakdown optimization due to active transaction."
)
return SkipLargeQueryBreakdownCategory.ACTIVE_TRANSACTION

return None

def _should_skip_optimization_for_root(
self, root: TreeNode
) -> Optional[SkipLargeQueryBreakdownCategory]:
"""Method to check if the optimization should be skipped based on the root node type.

Returns:
SkipLargeQueryBreakdownCategory enum indicating the reason for skipping the optimization
if the optimization should be skipped, otherwise None.
"""
if (
isinstance(root, SnowflakePlan)
and root.source_plan is not None
and isinstance(
root.source_plan, (CreateViewCommand, CreateDynamicTableCommand)
)
):
# Skip optimization if the root is a view or a dynamic table.
_logger.debug(
"Skipping large query breakdown optimization for view/dynamic table plan."
)
return SkipLargeQueryBreakdownCategory.VIEW_DYNAMIC_TABLE

return None

def _try_to_breakdown_plan(self, root: TreeNode) -> List[LogicalPlan]:
"""Method to breakdown a single plan into smaller partitions based on
cumulative complexity score and node type.
Expand All @@ -161,20 +218,11 @@ def _try_to_breakdown_plan(self, root: TreeNode) -> List[LogicalPlan]:
_logger.debug(
f"Applying large query breakdown optimization for root of type {type(root)}"
)
if (
isinstance(root, SnowflakePlan)
and root.source_plan is not None
and isinstance(
root.source_plan, (CreateViewCommand, CreateDynamicTableCommand)
)
):
# Skip optimization if the root is a view or a dynamic table.
_logger.debug(
"Skipping large query breakdown optimization for view/dynamic table plan."
)
reason = self._should_skip_optimization_for_root(root)
if reason is not None:
self.session._conn._telemetry_client.send_large_query_optimization_skipped_telemetry(
self.session.session_id,
SkipLargeQueryBreakdownCategory.VIEW_DYNAMIC_TABLE.value,
reason.value,
)
return [root]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,5 @@ class CompilationStageTelemetryField(Enum):
class SkipLargeQueryBreakdownCategory(Enum):
ACTIVE_TRANSACTION = "active transaction"
VIEW_DYNAMIC_TABLE = "view or dynamic table command"
NO_ACTIVE_DATABASE = "no active database"
NO_ACTIVE_SCHEMA = "no active schema"
38 changes: 36 additions & 2 deletions tests/integ/test_large_query_breakdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,18 +432,52 @@ def test_optimization_skipped_with_views_and_dynamic_tables(session, caplog):
in caplog.text
)

with caplog.at_level(logging.DEBUG):
df.create_or_replace_view(view_name)
with patch.object(
session._conn._telemetry_client,
"send_large_query_optimization_skipped_telemetry",
) as patched_send_telemetry:
with caplog.at_level(logging.DEBUG):
df.create_or_replace_view(view_name)
assert (
"Skipping large query breakdown optimization for view/dynamic table plan"
in caplog.text
)
patched_send_telemetry.assert_called_once
called_with_reason = patched_send_telemetry.call_args[0][1]
assert called_with_reason == "view or dynamic table command"
finally:
Utils.drop_dynamic_table(session, table_name)
Utils.drop_view(session, view_name)
Utils.drop_table(session, source_table)


@pytest.mark.skipif(
IS_IN_STORED_PROC, reason="cannot create a new session in stored procedure"
)
@pytest.mark.parametrize("db_or_schema", ["database", "schema"])
def test_optimization_skipped_with_no_active_db_or_schema(
session, db_or_schema, caplog
):
df = session.sql("select 1 as a, 2 as b")

# no database check
with patch.object(
session._conn._telemetry_client,
"send_large_query_optimization_skipped_telemetry",
) as patched_send_telemetry:
with patch.object(session, f"get_current_{db_or_schema}", return_value=None):
with caplog.at_level(logging.DEBUG):
with SqlCounter(query_count=0, describe_count=0):
df.queries
assert (
f"Skipping large query breakdown optimization since there is no active {db_or_schema}"
in caplog.text
)
patched_send_telemetry.assert_called_once
called_with_reason = patched_send_telemetry.call_args[0][1]
assert called_with_reason == f"no active {db_or_schema}"


def test_async_job_with_large_query_breakdown(session, large_query_df):
"""Test large query breakdown gives same result for async and non-async jobs"""
with SqlCounter(query_count=2):
Expand Down
Loading