Skip to content

Commit

Permalink
Control merging partitioning in aggregation node via session property
Browse files Browse the repository at this point in the history
  • Loading branch information
jessesleeping committed Dec 19, 2019
1 parent 43e8c02 commit 82e90bf
Show file tree
Hide file tree
Showing 7 changed files with 157 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.session.PropertyMetadata;
import com.facebook.presto.sql.analyzer.FeaturesConfig;
import com.facebook.presto.sql.analyzer.FeaturesConfig.AggregationPartitioningMergingStrategy;
import com.facebook.presto.sql.analyzer.FeaturesConfig.JoinDistributionType;
import com.facebook.presto.sql.analyzer.FeaturesConfig.JoinReorderingStrategy;
import com.facebook.presto.sql.analyzer.FeaturesConfig.PartialMergePushdownStrategy;
Expand Down Expand Up @@ -135,6 +136,7 @@ public final class SystemSessionProperties
public static final String OPTIMIZE_FULL_OUTER_JOIN_WITH_COALESCE = "optimize_full_outer_join_with_coalesce";
public static final String INDEX_LOADER_TIMEOUT = "index_loader_timeout";
public static final String OPTIMIZED_REPARTITIONING_ENABLED = "optimized_repartitioning";
public static final String AGGREGATION_PARTITIONING_MERGING_STRATEGY = "aggregation_partitioning_merging_strategy";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -664,7 +666,19 @@ public SystemSessionProperties(
OPTIMIZED_REPARTITIONING_ENABLED,
"Experimental: Use optimized repartitioning",
featuresConfig.isOptimizedRepartitioningEnabled(),
false));
false),
new PropertyMetadata<>(
AGGREGATION_PARTITIONING_MERGING_STRATEGY,
format("Strategy to merge partition preference in aggregation node. Options are %s",
Stream.of(AggregationPartitioningMergingStrategy.values())
.map(AggregationPartitioningMergingStrategy::name)
.collect(joining(","))),
VARCHAR,
AggregationPartitioningMergingStrategy.class,
featuresConfig.getAggregationPartitioningMergingStrategy(),
false,
value -> AggregationPartitioningMergingStrategy.valueOf(((String) value).toUpperCase()),
AggregationPartitioningMergingStrategy::name));
}

public List<PropertyMetadata<?>> getSessionProperties()
Expand Down Expand Up @@ -1134,4 +1148,9 @@ public static boolean isOptimizedRepartitioningEnabled(Session session)
{
return session.getSystemProperty(OPTIMIZED_REPARTITIONING_ENABLED, Boolean.class);
}

public static AggregationPartitioningMergingStrategy getAggregationPartitioningMergingStrategy(Session session)
{
return session.getSystemProperty(AGGREGATION_PARTITIONING_MERGING_STRATEGY, AggregationPartitioningMergingStrategy.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.nio.file.Paths;
import java.util.List;

import static com.facebook.presto.sql.analyzer.FeaturesConfig.AggregationPartitioningMergingStrategy.LEGACY;
import static com.facebook.presto.sql.analyzer.FeaturesConfig.JoinDistributionType.PARTITIONED;
import static com.facebook.presto.sql.analyzer.FeaturesConfig.JoinReorderingStrategy.ELIMINATE_CROSS_JOINS;
import static com.facebook.presto.sql.analyzer.RegexLibrary.JONI;
Expand Down Expand Up @@ -132,6 +133,7 @@ public class FeaturesConfig
private int filterAndProjectMinOutputPageRowCount = 256;
private int maxGroupingSets = 2048;
private boolean legacyUnnestArrayRows;
private AggregationPartitioningMergingStrategy aggregationPartitioningMergingStrategy = LEGACY;

private boolean jsonSerdeCodeGenerationEnabled;
private int maxConcurrentMaterializations = 3;
Expand Down Expand Up @@ -175,6 +177,23 @@ public enum PartialMergePushdownStrategy
PUSH_THROUGH_LOW_MEMORY_OPERATORS
}

public enum AggregationPartitioningMergingStrategy
{
LEGACY, // merge partition preference with parent but apply current partition preference
TOP_DOWN, // merge partition preference with parent and apply the merged partition preference
BOTTOM_UP; // don't merge partition preference and apply current partition preference only

public boolean isMergingWithParent()
{
return this == LEGACY || this == TOP_DOWN;
}

public boolean isAdoptingMergedPreference()
{
return this == TOP_DOWN;
}
}

public double getCpuCostWeight()
{
return cpuCostWeight;
Expand Down Expand Up @@ -489,6 +508,18 @@ public FeaturesConfig setMaxReorderedJoins(int maxReorderedJoins)
return this;
}

public AggregationPartitioningMergingStrategy getAggregationPartitioningMergingStrategy()
{
return aggregationPartitioningMergingStrategy;
}

@Config("optimizer.aggregation-partition-merging")
public FeaturesConfig setAggregationPartitioningMergingStrategy(AggregationPartitioningMergingStrategy aggregationPartitioningMergingStrategy)
{
this.aggregationPartitioningMergingStrategy = requireNonNull(aggregationPartitioningMergingStrategy, "aggregationPartitioningMergingStrategy is null");
return this;
}

public boolean isRedistributeWrites()
{
return redistributeWrites;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ private Partitioning(PartitioningHandle handle, List<RowExpression> arguments)
this.arguments = ImmutableList.copyOf(requireNonNull(arguments, "arguments is null"));
}

public static <T extends RowExpression> Partitioning create(PartitioningHandle handle, List<T> columns)
public static <T extends RowExpression> Partitioning create(PartitioningHandle handle, Collection<T> columns)
{
return new Partitioning(handle, columns.stream()
.map(RowExpression.class::cast)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.facebook.presto.spi.relation.RowExpression;
import com.facebook.presto.spi.relation.VariableReferenceExpression;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.sql.analyzer.FeaturesConfig;
import com.facebook.presto.sql.analyzer.FeaturesConfig.PartialMergePushdownStrategy;
import com.facebook.presto.sql.parser.SqlParser;
import com.facebook.presto.sql.planner.ExpressionDomainTranslator;
Expand Down Expand Up @@ -84,6 +85,7 @@
import com.google.common.collect.SetMultimap;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
Expand All @@ -93,6 +95,7 @@
import java.util.Set;
import java.util.function.Function;

import static com.facebook.presto.SystemSessionProperties.getAggregationPartitioningMergingStrategy;
import static com.facebook.presto.SystemSessionProperties.getExchangeMaterializationStrategy;
import static com.facebook.presto.SystemSessionProperties.getHashPartitionCount;
import static com.facebook.presto.SystemSessionProperties.getPartialMergePushdownStrategy;
Expand Down Expand Up @@ -246,8 +249,14 @@ public PlanWithProperties visitAggregation(AggregationNode node, PreferredProper
// If aggregation has a mixed of non-global and global grouping set, an repartition exchange is any way needed to eliminate duplicate default outputs
// from partial aggregations (enforced in `ValidateAggregationWithDefaultValues.java`). Therefore, we don't have preference on what the child will return.
if (!node.getGroupingKeys().isEmpty() && !hasMixedGroupingSets) {
FeaturesConfig.AggregationPartitioningMergingStrategy aggregationPartitioningMergingStrategy = getAggregationPartitioningMergingStrategy(session);
preferredProperties = PreferredProperties.partitionedWithLocal(partitioningRequirement, grouped(node.getGroupingKeys()))
.mergeWithParent(parentPreferredProperties);
.mergeWithParent(parentPreferredProperties, aggregationPartitioningMergingStrategy.isMergingWithParent());

if (aggregationPartitioningMergingStrategy.isAdoptingMergedPreference()) {
checkState(preferredProperties.getGlobalProperties().isPresent() && preferredProperties.getGlobalProperties().get().getPartitioningProperties().isPresent());
partitioningRequirement = ImmutableSet.copyOf(preferredProperties.getGlobalProperties().get().getPartitioningProperties().get().getPartitioningColumns());
}
}

PlanWithProperties child = planChild(node, preferredProperties);
Expand All @@ -269,7 +278,7 @@ else if (hasMixedGroupingSets
idAllocator.getNextId(),
selectExchangeScopeForPartitionedRemoteExchange(child.getNode(), false),
child.getNode(),
createPartitioning(node.getGroupingKeys()),
createPartitioning(partitioningRequirement),
node.getHashVariable()),
child.getProperties());
}
Expand Down Expand Up @@ -303,7 +312,7 @@ private Function<VariableReferenceExpression, Optional<VariableReferenceExpressi
public PlanWithProperties visitMarkDistinct(MarkDistinctNode node, PreferredProperties preferredProperties)
{
PreferredProperties preferredChildProperties = PreferredProperties.partitionedWithLocal(ImmutableSet.copyOf(node.getDistinctVariables()), grouped(node.getDistinctVariables()))
.mergeWithParent(preferredProperties);
.mergeWithParent(preferredProperties, true);
PlanWithProperties child = node.getSource().accept(this, preferredChildProperties);

if (child.getProperties().isSingleNode() ||
Expand Down Expand Up @@ -336,7 +345,7 @@ public PlanWithProperties visitWindow(WindowNode node, PreferredProperties prefe
PlanWithProperties child = planChild(
node,
PreferredProperties.partitionedWithLocal(ImmutableSet.copyOf(node.getPartitionBy()), desiredProperties)
.mergeWithParent(preferredProperties));
.mergeWithParent(preferredProperties, true));

if (!child.getProperties().isStreamPartitionedOn(node.getPartitionBy()) &&
!child.getProperties().isNodePartitionedOn(node.getPartitionBy())) {
Expand Down Expand Up @@ -378,7 +387,7 @@ public PlanWithProperties visitRowNumber(RowNumberNode node, PreferredProperties
PlanWithProperties child = planChild(
node,
PreferredProperties.partitionedWithLocal(ImmutableSet.copyOf(node.getPartitionBy()), grouped(node.getPartitionBy()))
.mergeWithParent(preferredProperties));
.mergeWithParent(preferredProperties, true));

// TODO: add config option/session property to force parallel plan if child is unpartitioned and window has a PARTITION BY clause
if (!child.getProperties().isStreamPartitionedOn(node.getPartitionBy())
Expand Down Expand Up @@ -410,7 +419,7 @@ public PlanWithProperties visitTopNRowNumber(TopNRowNumberNode node, PreferredPr
}
else {
preferredChildProperties = PreferredProperties.partitionedWithLocal(ImmutableSet.copyOf(node.getPartitionBy()), grouped(node.getPartitionBy()))
.mergeWithParent(preferredProperties);
.mergeWithParent(preferredProperties, true);
addExchange = partial -> partitionedExchange(
idAllocator.getNextId(),
selectExchangeScopeForPartitionedRemoteExchange(partial, false),
Expand Down Expand Up @@ -992,7 +1001,7 @@ public PlanWithProperties visitIndexJoin(IndexJoinNode node, PreferredProperties
List<LocalProperty<VariableReferenceExpression>> desiredLocalProperties = preferredProperties.getLocalProperties().isEmpty() ? grouped(joinColumns) : ImmutableList.of();

PlanWithProperties probeSource = node.getProbeSource().accept(this, PreferredProperties.partitionedWithLocal(ImmutableSet.copyOf(joinColumns), desiredLocalProperties)
.mergeWithParent(preferredProperties));
.mergeWithParent(preferredProperties, true));
ActualProperties probeProperties = probeSource.getProperties();

PlanWithProperties indexSource = node.getIndexSource().accept(this, PreferredProperties.any());
Expand Down Expand Up @@ -1333,7 +1342,7 @@ private ActualProperties derivePropertiesRecursively(PlanNode result)
return PropertyDerivations.derivePropertiesRecursively(result, metadata, session, types, parser);
}

private Partitioning createPartitioning(List<VariableReferenceExpression> partitioningColumns)
private Partitioning createPartitioning(Collection<VariableReferenceExpression> partitioningColumns)
{
// TODO: Use SystemTablesMetadata instead of introducing a special case
if (GlobalSystemConnector.NAME.equals(partitioningProviderCatalog)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public List<LocalProperty<VariableReferenceExpression>> getLocalProperties()
return localProperties;
}

public PreferredProperties mergeWithParent(PreferredProperties parent)
public PreferredProperties mergeWithParent(PreferredProperties parent, boolean mergePartitionPreference)
{
List<LocalProperty<VariableReferenceExpression>> newLocal = ImmutableList.<LocalProperty<VariableReferenceExpression>>builder()
.addAll(localProperties)
Expand All @@ -143,7 +143,7 @@ public PreferredProperties mergeWithParent(PreferredProperties parent)
if (globalProperties.isPresent()) {
Global currentGlobal = globalProperties.get();
Global newGlobal = parent.getGlobalProperties()
.map(currentGlobal::mergeWithParent)
.map(global -> currentGlobal.mergeWithParent(global, mergePartitionPreference))
.orElse(currentGlobal);
builder.global(newGlobal);
}
Expand Down Expand Up @@ -249,15 +249,15 @@ public Optional<PartitioningProperties> getPartitioningProperties()
return partitioningProperties;
}

public Global mergeWithParent(Global parent)
public Global mergeWithParent(Global parent, boolean mergePartitionPreference)
{
if (distributed != parent.distributed) {
return this;
}
if (!partitioningProperties.isPresent()) {
return parent;
}
if (!parent.partitioningProperties.isPresent()) {
if (!parent.partitioningProperties.isPresent() || !mergePartitionPreference) {
return this;
}
return new Global(distributed, Optional.of(partitioningProperties.get().mergeWithParent(parent.partitioningProperties.get())));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@

import static com.facebook.airlift.configuration.testing.ConfigAssertions.assertFullMapping;
import static com.facebook.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults;
import static com.facebook.presto.sql.analyzer.FeaturesConfig.AggregationPartitioningMergingStrategy.LEGACY;
import static com.facebook.presto.sql.analyzer.FeaturesConfig.AggregationPartitioningMergingStrategy.TOP_DOWN;
import static com.facebook.presto.sql.analyzer.FeaturesConfig.JoinDistributionType.BROADCAST;
import static com.facebook.presto.sql.analyzer.FeaturesConfig.JoinDistributionType.PARTITIONED;
import static com.facebook.presto.sql.analyzer.FeaturesConfig.JoinReorderingStrategy.ELIMINATE_CROSS_JOINS;
Expand Down Expand Up @@ -73,6 +75,7 @@ public void testDefaults()
.setOptimizeHashGeneration(true)
.setPushTableWriteThroughUnion(true)
.setDictionaryAggregation(false)
.setAggregationPartitioningMergingStrategy(LEGACY)
.setLegacyArrayAgg(false)
.setGroupByUsesEqualTo(false)
.setLegacyMapSubscript(false)
Expand Down Expand Up @@ -168,6 +171,7 @@ public void testExplicitPropertyMappings()
.put("optimizer.push-table-write-through-union", "false")
.put("optimizer.dictionary-aggregation", "true")
.put("optimizer.push-aggregation-through-join", "false")
.put("optimizer.aggregation-partition-merging", "top_down")
.put("regex-library", "RE2J")
.put("re2j.dfa-states-limit", "42")
.put("re2j.dfa-retries", "42")
Expand Down Expand Up @@ -238,6 +242,7 @@ public void testExplicitPropertyMappings()
.setOptimizeMixedDistinctAggregations(true)
.setPushTableWriteThroughUnion(false)
.setDictionaryAggregation(true)
.setAggregationPartitioningMergingStrategy(TOP_DOWN)
.setPushAggregationThroughJoin(false)
.setLegacyArrayAgg(true)
.setGroupByUsesEqualTo(true)
Expand Down
Loading

0 comments on commit 82e90bf

Please sign in to comment.