Skip to content

Commit

Permalink
planner: stop pushing Agg down through Projection if substitution fail (
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Feb 20, 2024
1 parent 15790a2 commit 56249fd
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 7 deletions.
1 change: 1 addition & 0 deletions pkg/expression/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,7 @@ func SetExprColumnInOperand(expr Expression) Expression {

// ColumnSubstitute substitutes the columns in filter to expressions in select fields.
// e.g. select * from (select b as a from t) k where a < 10 => select * from (select b as a from t where b < 10) k.
// TODO: remove this function and only use ColumnSubstituteImpl since this function swallows the error, which seems unsafe.
func ColumnSubstitute(expr Expression, schema *Schema, newExprs []Expression) Expression {
_, _, resExpr := ColumnSubstituteImpl(expr, schema, newExprs, false)
return resExpr
Expand Down
2 changes: 1 addition & 1 deletion pkg/planner/core/casetest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ go_test(
],
data = glob(["testdata/**"]),
flaky = True,
shard_count = 20,
shard_count = 21,
deps = [
"//pkg/domain",
"//pkg/parser",
Expand Down
9 changes: 9 additions & 0 deletions pkg/planner/core/casetest/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,15 @@ func TestTiFlashFineGrainedShuffle(t *testing.T) {
}
}

func TestIssue50926(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t (a int)")
tk.MustExec("create or replace definer='root'@'localhost' view v (a,b) AS select 1 as a, json_object('k', '0') as b from t")
tk.MustQuery("select sum(json_extract(b, '$.path')) from v group by a").Check(testkit.Rows()) // no error
}

func TestFixControl45132(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
Expand Down
27 changes: 21 additions & 6 deletions pkg/planner/core/rule_aggregation_push_down.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,12 @@ func (a *aggregationPushDownSolver) aggPushDown(p LogicalPlan, opt *logicalOptim
noSideEffects := true
newGbyItems := make([]expression.Expression, 0, len(agg.GroupByItems))
for _, gbyItem := range agg.GroupByItems {
newGbyItems = append(newGbyItems, expression.ColumnSubstitute(gbyItem, proj.schema, proj.Exprs))
_, failed, groupBy := expression.ColumnSubstituteImpl(gbyItem, proj.schema, proj.Exprs, true)
if failed {
noSideEffects = false
break
}
newGbyItems = append(newGbyItems, groupBy)
if ExprsHasSideEffects(newGbyItems) {
noSideEffects = false
break
Expand All @@ -569,7 +574,12 @@ func (a *aggregationPushDownSolver) aggPushDown(p LogicalPlan, opt *logicalOptim
oldAggFuncsArgs = append(oldAggFuncsArgs, aggFunc.Args)
newArgs := make([]expression.Expression, 0, len(aggFunc.Args))
for _, arg := range aggFunc.Args {
newArgs = append(newArgs, expression.ColumnSubstitute(arg, proj.schema, proj.Exprs))
_, failed, newArg := expression.ColumnSubstituteImpl(arg, proj.schema, proj.Exprs, true)
if failed {
noSideEffects = false
break
}
newArgs = append(newArgs, newArg)
}
if ExprsHasSideEffects(newArgs) {
noSideEffects = false
Expand All @@ -581,7 +591,12 @@ func (a *aggregationPushDownSolver) aggPushDown(p LogicalPlan, opt *logicalOptim
oldAggOrderItems = append(oldAggOrderItems, aggFunc.OrderByItems)
newOrderByItems := make([]expression.Expression, 0, len(aggFunc.OrderByItems))
for _, oby := range aggFunc.OrderByItems {
newOrderByItems = append(newOrderByItems, expression.ColumnSubstitute(oby.Expr, proj.schema, proj.Exprs))
_, failed, byItem := expression.ColumnSubstituteImpl(oby.Expr, proj.schema, proj.Exprs, true)
if failed {
noSideEffects = false
break
}
newOrderByItems = append(newOrderByItems, byItem)
}
if ExprsHasSideEffects(newOrderByItems) {
noSideEffects = false
Expand All @@ -600,6 +615,9 @@ func (a *aggregationPushDownSolver) aggPushDown(p LogicalPlan, opt *logicalOptim
}
}
for i, funcsArgs := range oldAggFuncsArgs {
if !noSideEffects {
break
}
for j := range funcsArgs {
if oldAggFuncsArgs[i][j].GetType().EvalType() != newAggFuncsArgs[i][j].GetType().EvalType() {
noSideEffects = false
Expand All @@ -616,9 +634,6 @@ func (a *aggregationPushDownSolver) aggPushDown(p LogicalPlan, opt *logicalOptim
break
}
}
if !noSideEffects {
break
}
}
if noSideEffects {
agg.GroupByItems = newGbyItems
Expand Down

0 comments on commit 56249fd

Please sign in to comment.