Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add disjunctive predicate for filtered aggregates #11425

Merged
merged 1 commit into from
Sep 20, 2018

Conversation

atris
Copy link

@atris atris commented Sep 5, 2018

Filtered aggregates do not make an explicit predicate of the filter clause
that is present. This limits the ability of the optimizer to potentially
push down the predicate to table scan node. This commit adds a filter node
with a disjunctive predicate formed from all the filter clauses present in
the aggregation node's aggregates

Copy link
Contributor

@kokosing kokosing left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good % minor comments

public void rewriteAddFilterWithMultipleFilters()
{
assertPlan(
"SELECT sum(totalprice) FILTER(WHERE totalprice > 0), sum(custkey) FILTER(WHERE custkey > 0) FROM orders",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add also a test with:

SELECT sum(totalprice) FILTER(WHERE totalprice > 0), sum(custkey) FROM orders

@Test
public void rewriteAddFilter()
{
assertPlan(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you please also assert the results of these test queries?

context.getIdAllocator().getNextId(),
aggregation.getSource(),
newAssignments.build()),
projectNode,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inline

import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.project;
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.tableScan;

public class TestAddPredicateBelowFilteredAggregation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please rename to TestImplementFilteredAggregations

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TestImplementFilteredAggregations exists today, these tests are specifically for filter clause pushdown

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No one will find this class. And filter clause pushdown is a part of ImplementFilteredAggregations.

import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.tableScan;

public class TestAddPredicateBelowFilteredAggregation
extends BasePlanTest
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The best would be if you could extend from BaseRuleTest

@kokosing
Copy link
Contributor

kokosing commented Sep 6, 2018

Notice that this optimization works only for default aggregations.
Consider the case: SELECT count(a) FILTER (WHERE b > 0) FROM t GROUP BY c, in case of none of the rows matches the condition then we still need to output row per each group c with null output for aggregate function count(a)

@atris atris force-pushed the filter_push_expressions branch from 7765965 to 9275d97 Compare September 7, 2018 10:50
@@ -20,6 +20,8 @@
import com.facebook.presto.sql.planner.plan.AggregationNode;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add to the commit message information about rule application space, that this only work only for default aggregation.

import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.project;
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.tableScan;

public class TestAddPredicateBelowFilteredAggregation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No one will find this class. And filter clause pushdown is a part of ImplementFilteredAggregations.

@@ -92,7 +97,17 @@ public Result apply(AggregationNode aggregation, Captures captures, Context cont
verify(!mask.isPresent(), "Expected aggregation without mask symbols, see Rule pattern");
newAssignments.put(symbol, filter);
mask = Optional.of(symbol);

filterExpressions.add(filter);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of modifying this loop, I would introduce another method that would extract the predicate that you could pushdown or BooleanLiteral.TRUE if it is not possible extract such a predicate.

context.getIdAllocator().getNextId(),
aggregationNode.getSource(),
assignments),
combineDisjuncts(filterExpressions));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if combineDisjuncts(filterExpressions) evaluates to True or False?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use combineDisjunctsWithDefault(filterExpressions, TRUE)

Assignments assignments,
boolean doNotOptimizeForFilterPredicates)
{
if (doNotOptimizeForFilterPredicates) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think this flag is needed


filterExpressions.add(filter);

if (aggregation.hasNonEmptyGroupingSet()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If would change it to:

if (aggregation.hasNonEmptyGroupingSet()) {
    filterExpressions.add(TRUE);
else {
    filterExpressions.add(filter);
}

boolean doNotOptimizeForFilterPredicates)
{
if (doNotOptimizeForFilterPredicates) {
return new ProjectNode(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extract this as variable and reuse below

@atris atris force-pushed the filter_push_expressions branch from 9275d97 to 3ef0ddd Compare September 10, 2018 11:35
Copy link
Contributor

@kokosing kokosing left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Almost there

@@ -92,7 +100,11 @@ public Result apply(AggregationNode aggregation, Captures captures, Context cont
verify(!mask.isPresent(), "Expected aggregation without mask symbols, see Rule pattern");
newAssignments.put(symbol, filter);
mask = Optional.of(symbol);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

undo, such empty line can cause the build failure

@@ -78,6 +84,8 @@ public Result apply(AggregationNode aggregation, Captures captures, Context cont
{
Assignments.Builder newAssignments = Assignments.builder();
ImmutableMap.Builder<Symbol, Aggregation> aggregations = ImmutableMap.builder();
ImmutableList.Builder<Expression> filterExpressions = ImmutableList.builder();
boolean doNotOptimizeForFilterPredicate = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this flag

aggregations.build(),
aggregation.getGroupingSets(),
ImmutableList.of(),
aggregation.getStep(),
aggregation.getHashSymbol(),
aggregation.getGroupIdSymbol()));
}

private Expression getFilterPredicateToAdd(AggregationNode node, FunctionCall aggregate)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/getFilterPredicateToAdd/getFilterPredicate

context.getIdAllocator().getNextId(),
aggregation.getSource(),
newAssignments.build()),
combineDisjunctsWithDefault(filterExpressions.build(), TRUE_LITERAL)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's do not add FilterNode when combineDisjunctsWithDefault(filterExpressions.build(), TRUE_LITERAL) evaluates to TRUE and please add returnValuesNode when evaluates to FALSE. Make sure you covered this cases with unit tests (checking the query output and plan).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline, I will add tests only in this PR and let the upper optimiser rules handle this.

@atris atris force-pushed the filter_push_expressions branch 2 times, most recently from 358fec6 to ea6c0e5 Compare September 11, 2018 17:45
}

@Test
public void testDoNotRewriteIfNonFilteredAggregateIsPresent()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a wrong test name, as rule always rewrite filtered aggregations, but here it does not push down predicates

"sum_13", functionCall("sum", ImmutableList.of("custkey"))),
project(
tableScan("orders", ImmutableMap.of("totalprice", "totalprice",
"custkey", "custkey"))))))));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you split line with arguments, please put each argument in separate line.

This is the last time I comment this, next time I will write just eh.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry about that

}

@Test
public void testRewriteLiteralFilterToValuesNode()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update test name, say something about predicate push donw

"SELECT sum(totalprice) FILTER(WHERE FALSE) FROM orders",
anyTree(
aggregation(ImmutableMap.of("sum_6", functionCall("sum", ImmutableList.of("totalprice"))),
project(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert that predicate was pushed down

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the expected plan tests for the presence of ValuesNode with same projections as original source, shouldn't that rewrite be a check that the FILTER with constant predicate was pushed down?

anyTree(
aggregation(ImmutableMap.of("sum_2", functionCall("sum", ImmutableList.of("sum_13")),
"sum", functionCall("sum", ImmutableList.of("sum_12"))),
exchange(aggregation(ImmutableMap.of("sum_12", functionCall("sum", ImmutableList.of("totalprice")),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not see any assert that you check if FilterNode has TRUE predicate


@Test
public void testDoNotRewriteIfNonFilteredAggregateIsPresent()
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add test like SELECT sum(totalprice) FILTER(WHERE totalprice > 0) FROM orders ORDER BY x

@atris atris force-pushed the filter_push_expressions branch from ea6c0e5 to ac0ce9b Compare September 12, 2018 10:18
public void testAddPredicateForFilterClauses()
{
assertions.assertQuery(
"SELECT sum(x) FILTER(WHERE x > 0) FROM (VALUES 1, 1, 0, 2, 3, 3) t(x)",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please test also SELECT sum(x) FILTER(WHERE x > 0) FROM (VALUES 1, 1, 0, 2, 3, 3) t(x) GROUP BY x

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would ask , what does t(x) mean ? I have never seen this kind of way of expression. Could you tell me more ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

t(x) is a pseudo representation of a table with a single column x and rows 1, 1, 0, 2, 3, 3

"sum_15", functionCall("sum", ImmutableList.of("custkey"))),
project(
filter(
"(\"totalprice\" > 0E0 OR \"custkey\" > BIGINT '0')",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why 0E0 and not DOUBLE '0'?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test does not pass with DOUBLE '0'

aggregation(
ImmutableMap.of("sum_14", functionCall("sum", ImmutableList.of("totalprice")),
"sum_15", functionCall("sum", ImmutableList.of("custkey"))),
project(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would put all the above as anyTree


assertPlan(
"SELECT sum(totalprice) FILTER(WHERE TRUE) FROM orders",
anyTree(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead you can use something like: com.facebook.presto.sql.planner.TestLogicalPlanner#assertPlanContainsNoApplyOrAnyJoin to see if there is no filter node

anyTree(
aggregation(
ImmutableMap.of("sum_6", functionCall("sum", ImmutableList.of("totalprice"))),
project(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would wrap all the pattern above as anyTree

{
assertPlan(
"SELECT sum(totalprice) FILTER(WHERE totalprice > 0), sum(custkey) FROM orders",
anyTree(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead you can use something like: com.facebook.presto.sql.planner.TestLogicalPlanner#assertPlanContainsNoApplyOrAnyJoin to see if there is no filter node

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why you haven't used assertPlanContainsNoFilter here?

public void testDoNotPushdownPredicateIfNonFilteredAggregateIsPresent()
{
assertPlan(
"SELECT sum(totalprice) FILTER(WHERE totalprice > 0), sum(custkey) FROM orders",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test also a plan for SELECT sum(totalprice) FILTER(WHERE totalprice > 0) FROM orders GROUP BY totalprice

@atris atris force-pushed the filter_push_expressions branch from ac0ce9b to 8595d00 Compare September 13, 2018 08:30
Copy link
Contributor

@kokosing kokosing left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

% minor things


assertPlanContainsNoFilter("SELECT sum(totalprice) FILTER(WHERE TRUE) FROM orders");

assertPlanContainsNoFilter("SELECT sum(x) FILTER(WHERE x > 0) FROM (VALUES 1, 1, 0, 2, 3, 3) t(x) GROUP BY x");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this does not push down any filter predicate, so it does not belong to testPushDownConstantFilterPredicate, same with assertion below

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I have added a new test, testNoFilterAddedForConstantValueFilters and added the two tests.

{
assertPlan(
"SELECT sum(totalprice) FILTER(WHERE totalprice > 0), sum(custkey) FROM orders",
anyTree(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why you haven't used assertPlanContainsNoFilter here?

@atris
Copy link
Author

atris commented Sep 14, 2018

@kokosing I wanted the plan to be more readable for the case when one aggregate has a filter and other does not, might be just me though. I have changed it now.

@kokosing
Copy link
Contributor

I wanted the plan to be more readable for the case when one aggregate has a filter and other does not

I see, but plan pattern are expensive to maintain, so we if there is a way to avoid without losing coverage them let's avoid them.

@atris atris force-pushed the filter_push_expressions branch 2 times, most recently from 53a6503 to c7837c2 Compare September 14, 2018 07:48
kokosing
kokosing previously approved these changes Sep 14, 2018
Copy link
Contributor

@kokosing kokosing left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor comments

CC: @sopel39 would you like to take a look?

}

@Test
public void testNoFilterAddedForConstantValueFilters()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/testNoFilterAddedForConstantValueFilters/testNoFilterAddedFoNonDefaultAggregation

@Test
public void testPushDownConstantFilterPredicate()
{
assertPlanContainsNoFilter("SELECT sum(totalprice) FILTER(WHERE FALSE) FROM orders");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a comment that filter node was optimized

@@ -34,6 +46,22 @@ public void teardown()
assertions = null;
}

@Test
public void testAddPredicateForFilterClauses()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/testNoFilterAddedForConstantValueFilters/testFilterdAggregationPredicatePushdown

Copy link
Contributor

@sopel39 sopel39 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update ImplementFilteredAggregations javadoc (to include filter)

@@ -78,6 +81,7 @@ public Result apply(AggregationNode aggregation, Captures captures, Context cont
{
Assignments.Builder newAssignments = Assignments.builder();
ImmutableMap.Builder<Symbol, Aggregation> aggregations = ImmutableMap.builder();
ImmutableList.Builder<Expression> filterExpressions = ImmutableList.builder();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Collect mask symbols instead:

ImmutableList.Builder<Expression> maskSymbols = ...

@@ -93,6 +97,9 @@ public Result apply(AggregationNode aggregation, Captures captures, Context cont
newAssignments.put(symbol, filter);
mask = Optional.of(symbol);
}

filterExpressions.add(getFilterPredicate(aggregation, call));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove

aggregations.build(),
aggregation.getGroupingSets(),
ImmutableList.of(),
aggregation.getStep(),
aggregation.getHashSymbol(),
aggregation.getGroupIdSymbol()));
}

private Expression getFilterPredicate(AggregationNode node, FunctionCall aggregate)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove

@@ -93,6 +97,9 @@ public Result apply(AggregationNode aggregation, Captures captures, Context cont
newAssignments.put(symbol, filter);
mask = Optional.of(symbol);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add mask symbol to the list:

maskSymbols.add(symbol.toSymbolReference());

@@ -105,15 +112,34 @@ public Result apply(AggregationNode aggregation, Captures captures, Context cont
return Result.ofPlanNode(
new AggregationNode(
context.getIdAllocator().getNextId(),
new ProjectNode(
new FilterNode(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add FilterNode above ProjectNode using maskSymbols:

Expression predicate = TRUE_LITERAL;
if (!node.hasNonEmptyGroupingSet()) {
    predicate = combineDisjunctsWithDefault(maskSymbols.build(), TRUE_LITERAL));
}
new FilterNode(
  ...,
  predicate,
...

This way we won't do redundant computations of expressions

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also needs to consider if any aggregate without a FILTER clause is present, made the change and fixed

@sopel39
Copy link
Contributor

sopel39 commented Sep 17, 2018

I'm actually thinking that producing OR predicate (on different columns) might not be the best approach. Such predicate cannot be pushed to connectors and it can stuck high in the plan. It might also cover most of the data.

It doesn't make sense to have FilterNode just below AggregationNode as it doesn't bring benefit and might actually decrease performance. Keep in mind that global partial aggregations do not require dedicated data partitioning and therefore can be pushed low in the plan (even in source stages).

Therefore I think we should do an additional step when producing predicate. We should first try to extract tuple domain from the large OR expression (built using actual filter expressions) using DomainTranslator#fromPredicate and then convert produced TupleDomain back to predicate using com.facebook.presto.sql.planner.DomainTranslator#toPredicate only if such TupleDomain is not TupleDomain.all().
Such predicate would be much more pushable conjunction. I think its safer approach. What do you think?

CC: @martint

It would also be good to have examples of queries that execute much faster because of this PR.

@kokosing
Copy link
Contributor

It doesn't make sense to have FilterNode just below AggregationNode as it doesn't bring benefit and might actually decrease performance.

It would be nice to know the overhead it introduces.

Keep in mind that global partial aggregations do not require dedicated data partitioning and therefore can be pushed low in the plan (even in source stages).

This PR only addresses global aggregation.

Therefore I think we should do an additional step when producing predicate. We should first try to extract tuple domain from the large OR expression (built using actual filter expressions) using DomainTranslator#fromPredicate and then convert produced TupleDomain back to predicate using com.facebook.presto.sql.planner.DomainTranslator#toPredicate only if such TupleDomain is not TupleDomain.all().

It does not guarantee anything as you can calculate domain on intermediate (not source) symbols. I think easier would to extract symbols from each disjunct and push filter node only when all disjuncts are using single same symbol, otherwise it will only produce tuple domain all.

@sopel39
Copy link
Contributor

sopel39 commented Sep 18, 2018

After offline discussion we decided that we can go ahead with current implementation. The rationale is that even if such large OR might not be pushed down to connector it can still improve IO by filtering lazy pages even if partial aggregation is in source stage.

It would be good though to provide some query (could be based on TPCH) that would show performance improvement (should be fairly easy to produce either with or without partitions).

If we find this optimization to be problematic in the future we can always limit it's use to single filtered aggregation.

Thanks @atris ! Could you still apply my review comments (#11425 (review))?

@atris atris force-pushed the filter_push_expressions branch from c7837c2 to 06c38a1 Compare September 18, 2018 12:19
Filtered aggregates do not make an explicit predicate of the filter
clause which leads to optimizer not being able to push down predicate
to source whenever possible. Note that the pushdown is possible only
for cases when the aggregation is default and no grouping sets are
present
@atris atris force-pushed the filter_push_expressions branch from 06c38a1 to bf5457d Compare September 18, 2018 15:11
Copy link
Contributor

@sopel39 sopel39 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you provide query example that gets speedup?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants