Skip to content

Commit

Permalink
Add too many stages warning
Browse files Browse the repository at this point in the history
Emit a warning if the number of stages exceeds threshold

Extracted-From: https://github.com/prestodb/presto
  • Loading branch information
Elon Azoulay authored and findepi committed Mar 12, 2019
1 parent eee083a commit 019736f
Show file tree
Hide file tree
Showing 14 changed files with 206 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public class QueryManagerConfig
private int maxQueryHistory = 100;
private int maxQueryLength = 1_000_000;
private int maxStageCount = 100;
private int stageCountWarningThreshold = 50;

private Duration clientTimeout = new Duration(5, TimeUnit.MINUTES);

private int queryManagerExecutorPoolSize = 5;
Expand Down Expand Up @@ -186,6 +188,20 @@ public QueryManagerConfig setMaxStageCount(int maxStageCount)
return this;
}

@Min(1)
public int getStageCountWarningThreshold()
{
return stageCountWarningThreshold;
}

@Config("query.stage-count-warning-threshold")
@ConfigDescription("Emit a warning when stage count exceeds this threshold")
public QueryManagerConfig setStageCountWarningThreshold(int stageCountWarningThreshold)
{
this.stageCountWarningThreshold = stageCountWarningThreshold;
return this;
}

@MinDuration("5s")
@NotNull
public Duration getClientTimeout()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ private PlanRoot doAnalyzeQuery()
stateMachine.setOutput(output);

// fragment the plan
SubPlan fragmentedPlan = planFragmenter.createSubPlans(stateMachine.getSession(), plan, false);
SubPlan fragmentedPlan = planFragmenter.createSubPlans(stateMachine.getSession(), plan, false, stateMachine.getWarningCollector());

// record analysis time
stateMachine.endAnalysis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,6 @@ public Plan getLogicalPlan(Session session, Statement statement, List<Expression
private SubPlan getDistributedPlan(Session session, Statement statement, List<Expression> parameters, WarningCollector warningCollector)
{
Plan plan = getLogicalPlan(session, statement, parameters, warningCollector);
return planFragmenter.createSubPlans(session, plan, false);
return planFragmenter.createSubPlans(session, plan, false, warningCollector);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
import io.prestosql.SystemSessionProperties;
import io.prestosql.cost.StatsAndCosts;
import io.prestosql.execution.QueryManagerConfig;
import io.prestosql.execution.warnings.WarningCollector;
import io.prestosql.metadata.Metadata;
import io.prestosql.metadata.TableHandle;
import io.prestosql.metadata.TableProperties.TablePartitioning;
import io.prestosql.spi.PrestoException;
import io.prestosql.spi.PrestoWarning;
import io.prestosql.spi.connector.ConnectorPartitionHandle;
import io.prestosql.spi.connector.ConnectorPartitioningHandle;
import io.prestosql.spi.type.Type;
Expand Down Expand Up @@ -67,6 +69,7 @@
import static io.prestosql.operator.StageExecutionDescriptor.ungroupedExecution;
import static io.prestosql.spi.StandardErrorCode.QUERY_HAS_TOO_MANY_STAGES;
import static io.prestosql.spi.connector.NotPartitionedPartitionHandle.NOT_PARTITIONED;
import static io.prestosql.spi.connector.StandardWarningCode.TOO_MANY_STAGES;
import static io.prestosql.sql.planner.SchedulingOrderVisitor.scheduleOrder;
import static io.prestosql.sql.planner.SystemPartitioningHandle.COORDINATOR_DISTRIBUTION;
import static io.prestosql.sql.planner.SystemPartitioningHandle.SINGLE_DISTRIBUTION;
Expand All @@ -81,17 +84,23 @@
*/
public class PlanFragmenter
{
private static final String TOO_MANY_STAGES_MESSAGE = "" +
"If the query contains multiple aggregates with DISTINCT over different columns, please set the 'use_mark_distinct' session property to false. " +
"If the query contains WITH clauses that are referenced more than once, please create temporary table(s) for the queries in those clauses.";

private final Metadata metadata;
private final NodePartitioningManager nodePartitioningManager;
private final QueryManagerConfig config;

@Inject
public PlanFragmenter(Metadata metadata, NodePartitioningManager nodePartitioningManager, QueryManagerConfig queryManagerConfig)
{
this.metadata = requireNonNull(metadata, "metadata is null");
this.nodePartitioningManager = requireNonNull(nodePartitioningManager, "nodePartitioningManager is null");
this.config = requireNonNull(queryManagerConfig, "queryManagerConfig is null");
}

public SubPlan createSubPlans(Session session, Plan plan, boolean forceSingleNode)
public SubPlan createSubPlans(Session session, Plan plan, boolean forceSingleNode, WarningCollector warningCollector)
{
Fragmenter fragmenter = new Fragmenter(session, metadata, plan.getTypes(), plan.getStatsAndCosts());

Expand All @@ -108,21 +117,26 @@ public SubPlan createSubPlans(Session session, Plan plan, boolean forceSingleNod
checkState(!isForceSingleNodeOutput(session) || subPlan.getFragment().getPartitioning().isSingleNode(), "Root of PlanFragment is not single node");

// TODO: Remove query_max_stage_count session property and use queryManagerConfig.getMaxStageCount() here
sanityCheckFragmentedPlan(subPlan, getQueryMaxStageCount(session));
sanityCheckFragmentedPlan(subPlan, warningCollector, getQueryMaxStageCount(session), config.getStageCountWarningThreshold());

return subPlan;
}

private void sanityCheckFragmentedPlan(SubPlan subPlan, int maxStageCount)
private void sanityCheckFragmentedPlan(SubPlan subPlan, WarningCollector warningCollector, int maxStageCount, int stageCountSoftLimit)
{
subPlan.sanityCheck();
int fragmentCount = subPlan.getAllFragments().size();
if (fragmentCount > maxStageCount) {
throw new PrestoException(QUERY_HAS_TOO_MANY_STAGES, format(
"Number of stages in the query (%s) exceeds the allowed maximum (%s). " +
"If the query contains multiple DISTINCTs, please set the use_mark_distinct session property to false. " +
"If the query contains multiple CTEs that are referenced more than once, please create temporary table(s) for one or more of the CTEs.",
fragmentCount, maxStageCount));
"Number of stages in the query (%s) exceeds the allowed maximum (%s). " + TOO_MANY_STAGES_MESSAGE,
fragmentCount,
maxStageCount));
}
if (fragmentCount > stageCountSoftLimit) {
warningCollector.add(new PrestoWarning(TOO_MANY_STAGES, format(
"Number of stages in the query (%s) exceeds the soft limit (%s). " + TOO_MANY_STAGES_MESSAGE,
fragmentCount,
stageCountSoftLimit)));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,7 @@ private List<Driver> createDrivers(Session session, Plan plan, OutputFactory out
System.out.println(PlanPrinter.textLogicalPlan(plan.getRoot(), plan.getTypes(), metadata.getFunctionRegistry(), plan.getStatsAndCosts(), session, 0, false));
}

SubPlan subplan = planFragmenter.createSubPlans(session, plan, true);
SubPlan subplan = planFragmenter.createSubPlans(session, plan, true, WarningCollector.NOOP);
if (!subplan.getChildren().isEmpty()) {
throw new AssertionError("Expected subplan to have no children");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.common.collect.ImmutableSet;
import io.airlift.slice.Slices;
import io.prestosql.Session;
import io.prestosql.client.Warning;
import io.prestosql.spi.Page;
import io.prestosql.spi.PageBuilder;
import io.prestosql.spi.block.Block;
Expand Down Expand Up @@ -96,10 +97,11 @@ public class MaterializedResult
private final Set<String> resetSessionProperties;
private final Optional<String> updateType;
private final OptionalLong updateCount;
private final List<Warning> warnings;

public MaterializedResult(List<MaterializedRow> rows, List<? extends Type> types)
{
this(rows, types, ImmutableMap.of(), ImmutableSet.of(), Optional.empty(), OptionalLong.empty());
this(rows, types, ImmutableMap.of(), ImmutableSet.of(), Optional.empty(), OptionalLong.empty(), ImmutableList.of());
}

public MaterializedResult(
Expand All @@ -108,14 +110,16 @@ public MaterializedResult(
Map<String, String> setSessionProperties,
Set<String> resetSessionProperties,
Optional<String> updateType,
OptionalLong updateCount)
OptionalLong updateCount,
List<Warning> warnings)
{
this.rows = ImmutableList.copyOf(requireNonNull(rows, "rows is null"));
this.types = ImmutableList.copyOf(requireNonNull(types, "types is null"));
this.setSessionProperties = ImmutableMap.copyOf(requireNonNull(setSessionProperties, "setSessionProperties is null"));
this.resetSessionProperties = ImmutableSet.copyOf(requireNonNull(resetSessionProperties, "resetSessionProperties is null"));
this.updateType = requireNonNull(updateType, "updateType is null");
this.updateCount = requireNonNull(updateCount, "updateCount is null");
this.warnings = requireNonNull(warnings, "warnings is null");
}

public int getRowCount()
Expand Down Expand Up @@ -159,6 +163,11 @@ public OptionalLong getUpdateCount()
return updateCount;
}

public List<Warning> getWarnings()
{
return warnings;
}

@Override
public boolean equals(Object obj)
{
Expand Down Expand Up @@ -348,7 +357,8 @@ public MaterializedResult toTestTypes()
setSessionProperties,
resetSessionProperties,
updateType,
updateCount);
updateCount,
warnings);
}

private static MaterializedRow convertToTestTypes(MaterializedRow prestoRow)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.prestosql.execution.scheduler.LegacyNetworkTopology;
import io.prestosql.execution.scheduler.NodeScheduler;
import io.prestosql.execution.scheduler.NodeSchedulerConfig;
import io.prestosql.execution.warnings.WarningCollector;
import io.prestosql.metadata.CatalogManager;
import io.prestosql.metadata.InMemoryNodeManager;
import io.prestosql.metadata.MetadataManager;
Expand Down Expand Up @@ -811,7 +812,7 @@ private JoinNode join(String planNodeId, PlanNode left, PlanNode right, JoinNode

private SubPlan fragment(Plan plan)
{
return inTransaction(session -> planFragmenter.createSubPlans(session, plan, false));
return inTransaction(session -> planFragmenter.createSubPlans(session, plan, false, WarningCollector.NOOP));
}

private <T> T inTransaction(Function<Session, T> transactionSessionConsumer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public void testDefaults()
.setMaxQueryHistory(100)
.setMaxQueryLength(1_000_000)
.setMaxStageCount(100)
.setStageCountWarningThreshold(50)
.setClientTimeout(new Duration(5, TimeUnit.MINUTES))
.setScheduleSplitBatchSize(1000)
.setMinScheduleSplitBatchSize(100)
Expand Down Expand Up @@ -60,6 +61,7 @@ public void testExplicitPropertyMappings()
.put("query.max-history", "10")
.put("query.max-length", "10000")
.put("query.max-stage-count", "12345")
.put("query.stage-count-warning-threshold", "12300")
.put("query.schedule-split-batch-size", "99")
.put("query.min-schedule-split-batch-size", "9")
.put("query.max-concurrent-queries", "10")
Expand All @@ -84,6 +86,7 @@ public void testExplicitPropertyMappings()
.setMaxQueryHistory(10)
.setMaxQueryLength(10000)
.setMaxStageCount(12345)
.setStageCountWarningThreshold(12300)
.setClientTimeout(new Duration(10, TimeUnit.SECONDS))
.setScheduleSplitBatchSize(99)
.setMinScheduleSplitBatchSize(9)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.prestosql.spi.connector;

import io.prestosql.spi.WarningCode;
import io.prestosql.spi.WarningCodeSupplier;

public enum StandardWarningCode
implements WarningCodeSupplier
{
TOO_MANY_STAGES(0x0000_0001),
/**/;
private final WarningCode warningCode;

StandardWarningCode(int code)
{
warningCode = new WarningCode(code, name());
}

@Override
public WarningCode toWarningCode()
{
return warningCode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ public ResultWithQueryId<T> execute(Session session, @Language("SQL") String sql
resultsSession.setUpdateCount(results.getUpdateCount());
}

resultsSession.setWarnings(results.getWarnings());

T result = resultsSession.build(client.getSetSessionProperties(), client.getResetSessionProperties());
return new ResultWithQueryId<>(new QueryId(results.getId()), result);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@

import io.prestosql.client.QueryData;
import io.prestosql.client.QueryStatusInfo;
import io.prestosql.client.Warning;

import java.util.List;
import java.util.Map;
import java.util.Set;

Expand All @@ -31,6 +33,8 @@ default void setUpdateCount(long count)
throw new UnsupportedOperationException();
}

default void setWarnings(List<Warning> warnings) {}

void addResults(QueryStatusInfo statusInfo, QueryData data);

T build(Map<String, String> setSessionProperties, Set<String> resetSessionProperties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.prestosql.client.IntervalYearMonth;
import io.prestosql.client.QueryData;
import io.prestosql.client.QueryStatusInfo;
import io.prestosql.client.Warning;
import io.prestosql.server.testing.TestingPrestoServer;
import io.prestosql.spi.type.ArrayType;
import io.prestosql.spi.type.DecimalType;
Expand Down Expand Up @@ -102,6 +103,7 @@ private class MaterializedResultSession

private final AtomicReference<Optional<String>> updateType = new AtomicReference<>(Optional.empty());
private final AtomicReference<OptionalLong> updateCount = new AtomicReference<>(OptionalLong.empty());
private final AtomicReference<List<Warning>> warnings = new AtomicReference<>(ImmutableList.of());

@Override
public void setUpdateType(String type)
Expand All @@ -115,6 +117,12 @@ public void setUpdateCount(long count)
updateCount.set(OptionalLong.of(count));
}

@Override
public void setWarnings(List<Warning> warnings)
{
this.warnings.set(warnings);
}

@Override
public void addResults(QueryStatusInfo statusInfo, QueryData data)
{
Expand All @@ -138,7 +146,8 @@ public MaterializedResult build(Map<String, String> setSessionProperties, Set<St
setSessionProperties,
resetSessionProperties,
updateType.get(),
updateCount.get());
updateCount.get(),
warnings.get());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@
*/
package io.prestosql.execution;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.prestosql.Session;
import io.prestosql.plugin.tpch.TpchPlugin;
import io.prestosql.server.BasicQueryInfo;
import io.prestosql.spi.QueryId;
import io.prestosql.tests.DistributedQueryRunner;

import java.util.Map;
import java.util.Set;

import static io.airlift.concurrent.MoreFutures.getFutureValue;
Expand Down Expand Up @@ -67,8 +69,15 @@ public static void waitForQueryState(DistributedQueryRunner queryRunner, QueryId

public static DistributedQueryRunner createQueryRunner()
throws Exception
{
return createQueryRunner(ImmutableMap.of());
}

public static DistributedQueryRunner createQueryRunner(Map<String, String> extraProperties)
throws Exception
{
DistributedQueryRunner queryRunner = DistributedQueryRunner.builder(testSessionBuilder().build())
.setExtraProperties(extraProperties)
.setNodeCount(2)
.build();

Expand Down
Loading

0 comments on commit 019736f

Please sign in to comment.