From 03b6042922c60a2efb4fcf0834fae833f4f09174 Mon Sep 17 00:00:00 2001 From: Feng Liyuan Date: Wed, 6 May 2020 18:41:22 +0800 Subject: [PATCH] executor: support GROUP_CONCAT(ORDER BY) (#16591) --- cmd/explaintest/r/explain.result | 4 +- executor/aggfuncs/aggfunc_test.go | 79 ++++- executor/aggfuncs/builder.go | 24 +- executor/aggfuncs/func_group_concat.go | 330 +++++++++++++++++- executor/aggfuncs/func_group_concat_test.go | 19 +- executor/aggregate_test.go | 97 +++++ executor/benchmark_test.go | 9 +- executor/builder.go | 5 +- executor/executor_pkg_test.go | 6 +- executor/executor_required_rows_test.go | 13 +- executor/sort.go | 3 +- expression/aggregation/agg_to_pb.go | 3 + expression/aggregation/aggregation.go | 3 + expression/aggregation/descriptor.go | 15 + expression/aggregation/explain.go | 21 +- planner/cascades/enforcer_rules.go | 5 +- .../transformation_rules_suite_out.json | 2 +- planner/cascades/transformation_rules.go | 15 +- planner/core/exhaust_physical_plans.go | 2 +- planner/core/explain.go | 5 +- planner/core/find_best_task.go | 4 +- planner/core/logical_plan_builder.go | 99 ++++-- planner/core/logical_plan_test.go | 9 +- planner/core/logical_plans.go | 4 +- planner/core/pb_to_plan.go | 5 +- planner/core/physical_plan_test.go | 45 +++ planner/core/physical_plans.go | 7 +- planner/core/plan.go | 5 +- planner/core/planbuilder.go | 2 +- planner/core/property_cols_prune.go | 3 +- planner/core/resolve_indices.go | 6 + planner/core/rule_aggregation_push_down.go | 28 +- planner/core/rule_column_pruning.go | 53 +-- planner/core/rule_inject_extra_projection.go | 23 +- planner/core/rule_max_min_eliminate.go | 2 +- planner/core/rule_topn_push_down.go | 5 +- planner/core/task.go | 3 +- planner/core/testdata/plan_suite_in.json | 9 + planner/core/testdata/plan_suite_out.json | 59 ++++ planner/util/byitem.go | 45 +++ sessionctx/variable/varsutil.go | 13 +- 41 files changed, 948 insertions(+), 141 deletions(-) create mode 100644 planner/util/byitem.go diff --git a/cmd/explaintest/r/explain.result b/cmd/explaintest/r/explain.result index ed55753b0ac4e..34ba40144e66e 100644 --- a/cmd/explaintest/r/explain.result +++ b/cmd/explaintest/r/explain.result @@ -28,13 +28,13 @@ set session tidb_hashagg_partial_concurrency = 1; set session tidb_hashagg_final_concurrency = 1; explain select group_concat(a) from t group by id; id estRows task access object operator info -StreamAgg_8 8000.00 root group by:Column#6, funcs:group_concat(Column#5, ",")->Column#4 +StreamAgg_8 8000.00 root group by:Column#6, funcs:group_concat(Column#5 separator ",")->Column#4 └─Projection_18 10000.00 root cast(test.t.a, var_string(20))->Column#5, test.t.id └─TableReader_15 10000.00 root data:TableFullScan_14 └─TableFullScan_14 10000.00 cop[tikv] table:t keep order:true, stats:pseudo explain select group_concat(a, b) from t group by id; id estRows task access object operator info -StreamAgg_8 8000.00 root group by:Column#7, funcs:group_concat(Column#5, Column#6, ",")->Column#4 +StreamAgg_8 8000.00 root group by:Column#7, funcs:group_concat(Column#5, Column#6 separator ",")->Column#4 └─Projection_18 10000.00 root cast(test.t.a, var_string(20))->Column#5, cast(test.t.b, var_string(20))->Column#6, test.t.id └─TableReader_15 10000.00 root data:TableFullScan_14 └─TableFullScan_14 10000.00 cop[tikv] table:t keep order:true, stats:pseudo diff --git a/executor/aggfuncs/aggfunc_test.go b/executor/aggfuncs/aggfunc_test.go index 3a995be8f4e2b..4f2cb8d35b9db 100644 --- a/executor/aggfuncs/aggfunc_test.go +++ b/executor/aggfuncs/aggfunc_test.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/tidb/executor/aggfuncs" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/expression/aggregation" + "github.com/pingcap/tidb/planner/util" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/types/json" @@ -68,6 +69,7 @@ type aggTest struct { dataGen func(i int) types.Datum funcName string results []types.Datum + orderBy bool } type multiArgsAggTest struct { @@ -77,6 +79,7 @@ type multiArgsAggTest struct { dataGens []func(i int) types.Datum funcName string results []types.Datum + orderBy bool } func (s *testSuite) testMergePartialResult(c *C, p aggTest) { @@ -93,6 +96,11 @@ func (s *testSuite) testMergePartialResult(c *C, p aggTest) { } desc, err := aggregation.NewAggFuncDesc(s.ctx, p.funcName, args, false) c.Assert(err, IsNil) + if p.orderBy { + desc.OrderByItems = []*util.ByItems{ + {Expr: args[0], Desc: true}, + } + } partialDesc, finalDesc := desc.Split([]int{0, 1}) // build partial func for partial phase. @@ -112,7 +120,7 @@ func (s *testSuite) testMergePartialResult(c *C, p aggTest) { dt := resultChk.GetRow(0).GetDatum(0, p.dataType) result, err := dt.CompareDatum(s.ctx.GetSessionVars().StmtCtx, &p.results[0]) c.Assert(err, IsNil) - c.Assert(result, Equals, 0) + c.Assert(result, Equals, 0, Commentf("%v != %v", dt.String(), p.results[0])) err = finalFunc.MergePartialResult(s.ctx, partialResult, finalPr) c.Assert(err, IsNil) @@ -128,7 +136,7 @@ func (s *testSuite) testMergePartialResult(c *C, p aggTest) { dt = resultChk.GetRow(0).GetDatum(0, p.dataType) result, err = dt.CompareDatum(s.ctx.GetSessionVars().StmtCtx, &p.results[1]) c.Assert(err, IsNil) - c.Assert(result, Equals, 0) + c.Assert(result, Equals, 0, Commentf("%v != %v", dt.String(), p.results[1])) err = finalFunc.MergePartialResult(s.ctx, partialResult, finalPr) c.Assert(err, IsNil) @@ -139,7 +147,7 @@ func (s *testSuite) testMergePartialResult(c *C, p aggTest) { dt = resultChk.GetRow(0).GetDatum(0, p.dataType) result, err = dt.CompareDatum(s.ctx.GetSessionVars().StmtCtx, &p.results[2]) c.Assert(err, IsNil) - c.Assert(result, Equals, 0) + c.Assert(result, Equals, 0, Commentf("%v != %v", dt.String(), p.results[2])) } func buildAggTester(funcName string, tp byte, numRows int, results ...interface{}) aggTest { @@ -176,6 +184,11 @@ func (s *testSuite) testMultiArgsMergePartialResult(c *C, p multiArgsAggTest) { desc, err := aggregation.NewAggFuncDesc(s.ctx, p.funcName, args, false) c.Assert(err, IsNil) + if p.orderBy { + desc.OrderByItems = []*util.ByItems{ + {Expr: args[0], Desc: true}, + } + } partialDesc, finalDesc := desc.Split([]int{0, 1}) // build partial func for partial phase. @@ -300,6 +313,11 @@ func (s *testSuite) testAggFunc(c *C, p aggTest) { } desc, err := aggregation.NewAggFuncDesc(s.ctx, p.funcName, args, false) c.Assert(err, IsNil) + if p.orderBy { + desc.OrderByItems = []*util.ByItems{ + {Expr: args[0], Desc: true}, + } + } finalFunc := aggfuncs.Build(s.ctx, desc, 0) finalPr := finalFunc.AllocPartialResult() resultChk := chunk.NewChunkWithCapacity([]*types.FieldType{desc.RetTp}, 1) @@ -312,7 +330,7 @@ func (s *testSuite) testAggFunc(c *C, p aggTest) { dt := resultChk.GetRow(0).GetDatum(0, desc.RetTp) result, err := dt.CompareDatum(s.ctx.GetSessionVars().StmtCtx, &p.results[1]) c.Assert(err, IsNil) - c.Assert(result, Equals, 0) + c.Assert(result, Equals, 0, Commentf("%v != %v", dt.String(), p.results[1])) // test the empty input resultChk.Reset() @@ -321,11 +339,16 @@ func (s *testSuite) testAggFunc(c *C, p aggTest) { dt = resultChk.GetRow(0).GetDatum(0, desc.RetTp) result, err = dt.CompareDatum(s.ctx.GetSessionVars().StmtCtx, &p.results[0]) c.Assert(err, IsNil) - c.Assert(result, Equals, 0) + c.Assert(result, Equals, 0, Commentf("%v != %v", dt.String(), p.results[0])) // test the agg func with distinct desc, err = aggregation.NewAggFuncDesc(s.ctx, p.funcName, args, true) c.Assert(err, IsNil) + if p.orderBy { + desc.OrderByItems = []*util.ByItems{ + {Expr: args[0], Desc: true}, + } + } finalFunc = aggfuncs.Build(s.ctx, desc, 0) finalPr = finalFunc.AllocPartialResult() @@ -341,7 +364,7 @@ func (s *testSuite) testAggFunc(c *C, p aggTest) { dt = resultChk.GetRow(0).GetDatum(0, desc.RetTp) result, err = dt.CompareDatum(s.ctx.GetSessionVars().StmtCtx, &p.results[1]) c.Assert(err, IsNil) - c.Assert(result, Equals, 0) + c.Assert(result, Equals, 0, Commentf("%v != %v", dt.String(), p.results[1])) // test the empty input resultChk.Reset() @@ -350,7 +373,7 @@ func (s *testSuite) testAggFunc(c *C, p aggTest) { dt = resultChk.GetRow(0).GetDatum(0, desc.RetTp) result, err = dt.CompareDatum(s.ctx.GetSessionVars().StmtCtx, &p.results[0]) c.Assert(err, IsNil) - c.Assert(result, Equals, 0) + c.Assert(result, Equals, 0, Commentf("%v != %v", dt.String(), p.results[0])) } func (s *testSuite) testMultiArgsAggFunc(c *C, p multiArgsAggTest) { @@ -367,9 +390,17 @@ func (s *testSuite) testMultiArgsAggFunc(c *C, p multiArgsAggTest) { for k := 0; k < len(p.dataTypes); k++ { args[k] = &expression.Column{RetType: p.dataTypes[k], Index: k} } + if p.funcName == ast.AggFuncGroupConcat { + args = append(args, &expression.Constant{Value: types.NewStringDatum(" "), RetType: types.NewFieldType(mysql.TypeString)}) + } desc, err := aggregation.NewAggFuncDesc(s.ctx, p.funcName, args, false) c.Assert(err, IsNil) + if p.orderBy { + desc.OrderByItems = []*util.ByItems{ + {Expr: args[0], Desc: true}, + } + } finalFunc := aggfuncs.Build(s.ctx, desc, 0) finalPr := finalFunc.AllocPartialResult() resultChk := chunk.NewChunkWithCapacity([]*types.FieldType{desc.RetTp}, 1) @@ -382,7 +413,7 @@ func (s *testSuite) testMultiArgsAggFunc(c *C, p multiArgsAggTest) { dt := resultChk.GetRow(0).GetDatum(0, desc.RetTp) result, err := dt.CompareDatum(s.ctx.GetSessionVars().StmtCtx, &p.results[1]) c.Assert(err, IsNil) - c.Assert(result, Equals, 0) + c.Assert(result, Equals, 0, Commentf("%v != %v", dt.String(), p.results[1])) // test the empty input resultChk.Reset() @@ -391,11 +422,16 @@ func (s *testSuite) testMultiArgsAggFunc(c *C, p multiArgsAggTest) { dt = resultChk.GetRow(0).GetDatum(0, desc.RetTp) result, err = dt.CompareDatum(s.ctx.GetSessionVars().StmtCtx, &p.results[0]) c.Assert(err, IsNil) - c.Assert(result, Equals, 0) + c.Assert(result, Equals, 0, Commentf("%v != %v", dt.String(), p.results[0])) // test the agg func with distinct desc, err = aggregation.NewAggFuncDesc(s.ctx, p.funcName, args, true) c.Assert(err, IsNil) + if p.orderBy { + desc.OrderByItems = []*util.ByItems{ + {Expr: args[0], Desc: true}, + } + } finalFunc = aggfuncs.Build(s.ctx, desc, 0) finalPr = finalFunc.AllocPartialResult() @@ -411,7 +447,7 @@ func (s *testSuite) testMultiArgsAggFunc(c *C, p multiArgsAggTest) { dt = resultChk.GetRow(0).GetDatum(0, desc.RetTp) result, err = dt.CompareDatum(s.ctx.GetSessionVars().StmtCtx, &p.results[1]) c.Assert(err, IsNil) - c.Assert(result, Equals, 0) + c.Assert(result, Equals, 0, Commentf("%v != %v", dt.String(), p.results[1])) // test the empty input resultChk.Reset() @@ -439,6 +475,11 @@ func (s *testSuite) benchmarkAggFunc(b *testing.B, p aggTest) { if err != nil { b.Fatal(err) } + if p.orderBy { + desc.OrderByItems = []*util.ByItems{ + {Expr: args[0], Desc: true}, + } + } finalFunc := aggfuncs.Build(s.ctx, desc, 0) resultChk := chunk.NewChunkWithCapacity([]*types.FieldType{desc.RetTp}, 1) iter := chunk.NewIterator4Chunk(srcChk) @@ -454,6 +495,11 @@ func (s *testSuite) benchmarkAggFunc(b *testing.B, p aggTest) { if err != nil { b.Fatal(err) } + if p.orderBy { + desc.OrderByItems = []*util.ByItems{ + {Expr: args[0], Desc: true}, + } + } finalFunc = aggfuncs.Build(s.ctx, desc, 0) resultChk.Reset() b.Run(fmt.Sprintf("%v(distinct)/%v", p.funcName, p.dataType), func(b *testing.B) { @@ -475,11 +521,19 @@ func (s *testSuite) benchmarkMultiArgsAggFunc(b *testing.B, p multiArgsAggTest) for k := 0; k < len(p.dataTypes); k++ { args[k] = &expression.Column{RetType: p.dataTypes[k], Index: k} } + if p.funcName == ast.AggFuncGroupConcat { + args = append(args, &expression.Constant{Value: types.NewStringDatum(" "), RetType: types.NewFieldType(mysql.TypeString)}) + } desc, err := aggregation.NewAggFuncDesc(s.ctx, p.funcName, args, false) if err != nil { b.Fatal(err) } + if p.orderBy { + desc.OrderByItems = []*util.ByItems{ + {Expr: args[0], Desc: true}, + } + } finalFunc := aggfuncs.Build(s.ctx, desc, 0) resultChk := chunk.NewChunkWithCapacity([]*types.FieldType{desc.RetTp}, 1) iter := chunk.NewIterator4Chunk(srcChk) @@ -495,6 +549,11 @@ func (s *testSuite) benchmarkMultiArgsAggFunc(b *testing.B, p multiArgsAggTest) if err != nil { b.Fatal(err) } + if p.orderBy { + desc.OrderByItems = []*util.ByItems{ + {Expr: args[0], Desc: true}, + } + } finalFunc = aggfuncs.Build(s.ctx, desc, 0) resultChk.Reset() b.Run(fmt.Sprintf("%v(distinct)/%v", p.funcName, p.dataTypes), func(b *testing.B) { diff --git a/executor/aggfuncs/builder.go b/executor/aggfuncs/builder.go index 4f09918e3eeeb..1eea035c0553b 100644 --- a/executor/aggfuncs/builder.go +++ b/executor/aggfuncs/builder.go @@ -320,10 +320,6 @@ func buildGroupConcat(ctx sessionctx.Context, aggFuncDesc *aggregation.AggFuncDe case aggregation.DedupMode: return nil default: - base := baseAggFunc{ - args: aggFuncDesc.Args[:len(aggFuncDesc.Args)-1], - ordinal: ordinal, - } // The last arg is promised to be a not-null string constant, so the error can be ignored. c, _ := aggFuncDesc.Args[len(aggFuncDesc.Args)-1].(*expression.Constant) sep, _, err := c.EvalString(nil, chunk.Row{}) @@ -342,10 +338,26 @@ func buildGroupConcat(ctx sessionctx.Context, aggFuncDesc *aggregation.AggFuncDe panic(fmt.Sprintf("Error happened when buildGroupConcat: %s", err.Error())) } var truncated int32 + base := baseGroupConcat4String{ + baseAggFunc: baseAggFunc{ + args: aggFuncDesc.Args[:len(aggFuncDesc.Args)-1], + ordinal: ordinal, + }, + byItems: aggFuncDesc.OrderByItems, + sep: sep, + maxLen: maxLen, + truncated: &truncated, + } if aggFuncDesc.HasDistinct { - return &groupConcatDistinct{baseGroupConcat4String{baseAggFunc: base, sep: sep, maxLen: maxLen, truncated: &truncated}} + if len(aggFuncDesc.OrderByItems) > 0 { + return &groupConcatDistinctOrder{base} + } + return &groupConcatDistinct{base} + } + if len(aggFuncDesc.OrderByItems) > 0 { + return &groupConcatOrder{base} } - return &groupConcat{baseGroupConcat4String{baseAggFunc: base, sep: sep, maxLen: maxLen, truncated: &truncated}} + return &groupConcat{base} } } diff --git a/executor/aggfuncs/func_group_concat.go b/executor/aggfuncs/func_group_concat.go index 636b2bb69006d..b0612292921f6 100644 --- a/executor/aggfuncs/func_group_concat.go +++ b/executor/aggfuncs/func_group_concat.go @@ -15,11 +15,16 @@ package aggfuncs import ( "bytes" + "container/heap" + "sort" "sync/atomic" - "github.com/cznic/mathutil" + "github.com/pingcap/parser/terror" + mysql "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/expression" + "github.com/pingcap/tidb/planner/util" "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/codec" "github.com/pingcap/tidb/util/hack" @@ -28,6 +33,7 @@ import ( type baseGroupConcat4String struct { baseAggFunc + byItems []*util.ByItems sep string maxLen uint64 @@ -47,19 +53,20 @@ func (e *baseGroupConcat4String) AppendFinalResult2Chunk(sctx sessionctx.Context return nil } +func (e *baseGroupConcat4String) handleTruncateError(sctx sessionctx.Context) (err error) { + if atomic.CompareAndSwapInt32(e.truncated, 0, 1) { + if !sctx.GetSessionVars().StmtCtx.TruncateAsWarning { + return expression.ErrCutValueGroupConcat.GenWithStackByArgs(e.args[0].String()) + } + sctx.GetSessionVars().StmtCtx.AppendWarning(expression.ErrCutValueGroupConcat.GenWithStackByArgs(e.args[0].String())) + } + return nil +} + func (e *baseGroupConcat4String) truncatePartialResultIfNeed(sctx sessionctx.Context, buffer *bytes.Buffer) (err error) { if e.maxLen > 0 && uint64(buffer.Len()) > e.maxLen { - i := mathutil.MaxInt - if uint64(i) > e.maxLen { - i = int(e.maxLen) - } - buffer.Truncate(i) - if atomic.CompareAndSwapInt32(e.truncated, 0, 1) { - if !sctx.GetSessionVars().StmtCtx.TruncateAsWarning { - return expression.ErrCutValueGroupConcat.GenWithStackByArgs(e.args[0].String()) - } - sctx.GetSessionVars().StmtCtx.AppendWarning(expression.ErrCutValueGroupConcat.GenWithStackByArgs(e.args[0].String())) - } + buffer.Truncate(int(e.maxLen)) + return e.handleTruncateError(sctx) } return nil } @@ -214,3 +221,302 @@ func (e *groupConcatDistinct) SetTruncated(t *int32) { func (e *groupConcatDistinct) GetTruncated() *int32 { return e.truncated } + +type sortRow struct { + buffer *bytes.Buffer + byItems []types.Datum +} + +type topNRows struct { + rows []sortRow + desc []bool + sctx sessionctx.Context + err error + + currSize uint64 + limitSize uint64 + sepSize uint64 +} + +func (h topNRows) Len() int { + return len(h.rows) +} + +func (h topNRows) Less(i, j int) bool { + n := len(h.rows[i].byItems) + for k := 0; k < n; k++ { + ret, err := h.rows[i].byItems[k].CompareDatum(h.sctx.GetSessionVars().StmtCtx, &h.rows[j].byItems[k]) + if err != nil { + h.err = err + return false + } + if h.desc[k] { + ret = -ret + } + if ret > 0 { + return true + } + if ret < 0 { + return false + } + } + return false +} + +func (h topNRows) Swap(i, j int) { + h.rows[i], h.rows[j] = h.rows[j], h.rows[i] +} + +func (h *topNRows) Push(x interface{}) { + h.rows = append(h.rows, x.(sortRow)) +} + +func (h *topNRows) Pop() interface{} { + n := len(h.rows) + x := h.rows[n-1] + h.rows = h.rows[:n-1] + return x +} + +func (h *topNRows) tryToAdd(row sortRow) (truncated bool) { + h.currSize += uint64(row.buffer.Len()) + if len(h.rows) > 0 { + h.currSize += h.sepSize + } + heap.Push(h, row) + if h.currSize <= h.limitSize { + return false + } + + for h.currSize > h.limitSize { + debt := h.currSize - h.limitSize + if uint64(h.rows[0].buffer.Len()) > debt { + h.currSize -= debt + h.rows[0].buffer.Truncate(h.rows[0].buffer.Len() - int(debt)) + } else { + h.currSize -= uint64(h.rows[0].buffer.Len()) + h.sepSize + heap.Pop(h) + } + } + return true +} + +func (h *topNRows) reset() { + h.rows = h.rows[:0] + h.err = nil + h.currSize = 0 +} + +func (h *topNRows) concat(sep string, truncated bool) string { + buffer := new(bytes.Buffer) + sort.Sort(sort.Reverse(h)) + for i, row := range h.rows { + if i != 0 { + buffer.WriteString(sep) + } + buffer.Write(row.buffer.Bytes()) + } + if truncated && uint64(buffer.Len()) < h.limitSize { + // append the last separator, because the last separator may be truncated in tryToAdd. + buffer.WriteString(sep) + buffer.Truncate(int(h.limitSize)) + } + return buffer.String() +} + +type partialResult4GroupConcatOrder struct { + topN *topNRows +} + +type groupConcatOrder struct { + baseGroupConcat4String +} + +func (e *groupConcatOrder) AppendFinalResult2Chunk(sctx sessionctx.Context, pr PartialResult, chk *chunk.Chunk) error { + p := (*partialResult4GroupConcatOrder)(pr) + if p.topN.Len() == 0 { + chk.AppendNull(e.ordinal) + return nil + } + chk.AppendString(e.ordinal, p.topN.concat(e.sep, *e.truncated == 1)) + return nil +} + +func (e *groupConcatOrder) AllocPartialResult() PartialResult { + desc := make([]bool, len(e.byItems)) + for i, byItem := range e.byItems { + desc[i] = byItem.Desc + } + p := &partialResult4GroupConcatOrder{ + topN: &topNRows{ + desc: desc, + currSize: 0, + limitSize: e.maxLen, + sepSize: uint64(len(e.sep)), + }, + } + return PartialResult(p) +} + +func (e *groupConcatOrder) ResetPartialResult(pr PartialResult) { + p := (*partialResult4GroupConcatOrder)(pr) + p.topN.reset() +} + +func (e *groupConcatOrder) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, pr PartialResult) (err error) { + p := (*partialResult4GroupConcatOrder)(pr) + p.topN.sctx = sctx + v, isNull := "", false + for _, row := range rowsInGroup { + buffer := new(bytes.Buffer) + for _, arg := range e.args { + v, isNull, err = arg.EvalString(sctx, row) + if err != nil { + return err + } + if isNull { + break + } + buffer.WriteString(v) + } + if isNull { + continue + } + sortRow := sortRow{ + buffer: buffer, + byItems: make([]types.Datum, 0, len(e.byItems)), + } + for _, byItem := range e.byItems { + d, err := byItem.Expr.Eval(row) + if err != nil { + return err + } + sortRow.byItems = append(sortRow.byItems, d) + } + truncated := p.topN.tryToAdd(sortRow) + if p.topN.err != nil { + return p.topN.err + } + if truncated { + if err := e.handleTruncateError(sctx); err != nil { + return err + } + } + } + return nil +} + +func (e *groupConcatOrder) MergePartialResult(sctx sessionctx.Context, src, dst PartialResult) error { + // If order by exists, the parallel hash aggregation is forbidden in executorBuilder.buildHashAgg. + // So MergePartialResult will not be called. + return terror.ClassOptimizer.New(mysql.ErrInternal, mysql.MySQLErrName[mysql.ErrInternal]).GenWithStack("groupConcatOrder.MergePartialResult should not be called") +} + +// SetTruncated will be called in `executorBuilder#buildHashAgg` with duck-type. +func (e *groupConcatOrder) SetTruncated(t *int32) { + e.truncated = t +} + +// GetTruncated will be called in `executorBuilder#buildHashAgg` with duck-type. +func (e *groupConcatOrder) GetTruncated() *int32 { + return e.truncated +} + +type partialResult4GroupConcatOrderDistinct struct { + topN *topNRows + valSet set.StringSet + encodeBytesBuffer []byte +} + +type groupConcatDistinctOrder struct { + baseGroupConcat4String +} + +func (e *groupConcatDistinctOrder) AppendFinalResult2Chunk(sctx sessionctx.Context, pr PartialResult, chk *chunk.Chunk) error { + p := (*partialResult4GroupConcatOrderDistinct)(pr) + if p.topN.Len() == 0 { + chk.AppendNull(e.ordinal) + return nil + } + chk.AppendString(e.ordinal, p.topN.concat(e.sep, *e.truncated == 1)) + return nil +} + +func (e *groupConcatDistinctOrder) AllocPartialResult() PartialResult { + desc := make([]bool, len(e.byItems)) + for i, byItem := range e.byItems { + desc[i] = byItem.Desc + } + p := &partialResult4GroupConcatOrderDistinct{ + topN: &topNRows{ + desc: desc, + currSize: 0, + limitSize: e.maxLen, + sepSize: uint64(len(e.sep)), + }, + valSet: set.NewStringSet(), + } + return PartialResult(p) +} + +func (e *groupConcatDistinctOrder) ResetPartialResult(pr PartialResult) { + p := (*partialResult4GroupConcatOrderDistinct)(pr) + p.topN.reset() + p.valSet = set.NewStringSet() +} + +func (e *groupConcatDistinctOrder) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, pr PartialResult) (err error) { + p := (*partialResult4GroupConcatOrderDistinct)(pr) + p.topN.sctx = sctx + v, isNull := "", false + for _, row := range rowsInGroup { + buffer := new(bytes.Buffer) + p.encodeBytesBuffer = p.encodeBytesBuffer[:0] + for _, arg := range e.args { + v, isNull, err = arg.EvalString(sctx, row) + if err != nil { + return err + } + if isNull { + break + } + p.encodeBytesBuffer = codec.EncodeBytes(p.encodeBytesBuffer, hack.Slice(v)) + buffer.WriteString(v) + } + if isNull { + continue + } + joinedVal := string(p.encodeBytesBuffer) + if p.valSet.Exist(joinedVal) { + continue + } + p.valSet.Insert(joinedVal) + sortRow := sortRow{ + buffer: buffer, + byItems: make([]types.Datum, 0, len(e.byItems)), + } + for _, byItem := range e.byItems { + d, err := byItem.Expr.Eval(row) + if err != nil { + return err + } + sortRow.byItems = append(sortRow.byItems, d) + } + truncated := p.topN.tryToAdd(sortRow) + if p.topN.err != nil { + return p.topN.err + } + if truncated { + if err := e.handleTruncateError(sctx); err != nil { + return err + } + } + } + return nil +} + +func (e *groupConcatDistinctOrder) MergePartialResult(sctx sessionctx.Context, src, dst PartialResult) error { + // If order by exists, the parallel hash aggregation is forbidden in executorBuilder.buildHashAgg. + // So MergePartialResult will not be called. + return terror.ClassOptimizer.New(mysql.ErrInternal, mysql.MySQLErrName[mysql.ErrInternal]).GenWithStack("groupConcatDistinctOrder.MergePartialResult should not be called") +} diff --git a/executor/aggfuncs/func_group_concat_test.go b/executor/aggfuncs/func_group_concat_test.go index 576ed8ffd2ad9..7e68e93cfed3b 100644 --- a/executor/aggfuncs/func_group_concat_test.go +++ b/executor/aggfuncs/func_group_concat_test.go @@ -14,9 +14,13 @@ package aggfuncs_test import ( + "fmt" + . "github.com/pingcap/check" "github.com/pingcap/parser/ast" "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/types" ) func (s *testSuite) TestMergePartialResult4GroupConcat(c *C) { @@ -25,6 +29,19 @@ func (s *testSuite) TestMergePartialResult4GroupConcat(c *C) { } func (s *testSuite) TestGroupConcat(c *C) { - test := buildAggTester(ast.AggFuncGroupConcat, mysql.TypeString, 5, nil, "0 1 2 3 4", "0 1 2 3 4 2 3 4") + test := buildAggTester(ast.AggFuncGroupConcat, mysql.TypeString, 5, nil, "0 1 2 3 4") s.testAggFunc(c, test) + + test2 := buildMultiArgsAggTester(ast.AggFuncGroupConcat, []byte{mysql.TypeString, mysql.TypeString}, mysql.TypeString, 5, nil, "44 33 22 11 00") + test2.orderBy = true + s.testMultiArgsAggFunc(c, test2) + + defer variable.SetSessionSystemVar(s.ctx.GetSessionVars(), variable.GroupConcatMaxLen, types.NewStringDatum("1024")) + // minimum GroupConcatMaxLen is 4 + for i := 4; i <= 7; i++ { + variable.SetSessionSystemVar(s.ctx.GetSessionVars(), variable.GroupConcatMaxLen, types.NewStringDatum(fmt.Sprint(i))) + test2 = buildMultiArgsAggTester(ast.AggFuncGroupConcat, []byte{mysql.TypeString, mysql.TypeString}, mysql.TypeString, 5, nil, "44 33 22 11 00"[:i]) + test2.orderBy = true + s.testMultiArgsAggFunc(c, test2) + } } diff --git a/executor/aggregate_test.go b/executor/aggregate_test.go index 56eb6627e6c45..2364ddcd07ff9 100644 --- a/executor/aggregate_test.go +++ b/executor/aggregate_test.go @@ -14,6 +14,8 @@ package executor_test import ( + "fmt" + . "github.com/pingcap/check" "github.com/pingcap/errors" "github.com/pingcap/parser/terror" @@ -437,9 +439,11 @@ func (s *testSuiteAgg) TestAggPrune(c *C) { } func (s *testSuiteAgg) TestGroupConcatAggr(c *C) { + var err error // issue #5411 tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") + tk.MustExec("drop table if exists test;") tk.MustExec("create table test(id int, name int)") tk.MustExec("insert into test values(1, 10);") tk.MustExec("insert into test values(1, 20);") @@ -465,6 +469,99 @@ func (s *testSuiteAgg) TestGroupConcatAggr(c *C) { result = tk.MustQuery("select id, group_concat(name SEPARATOR '123') from test group by id order by id") result.Check(testkit.Rows("1 101232012330", "2 20", "3 200123500")) + tk.MustQuery("select group_concat(id ORDER BY name) from (select * from test order by id, name limit 2,2) t").Check(testkit.Rows("2,1")) + tk.MustQuery("select group_concat(id ORDER BY name desc) from (select * from test order by id, name limit 2,2) t").Check(testkit.Rows("1,2")) + tk.MustQuery("select group_concat(name ORDER BY id) from (select * from test order by id, name limit 2,2) t").Check(testkit.Rows("30,20")) + tk.MustQuery("select group_concat(name ORDER BY id desc) from (select * from test order by id, name limit 2,2) t").Check(testkit.Rows("20,30")) + + result = tk.MustQuery("select group_concat(name ORDER BY name desc SEPARATOR '++') from test;") + result.Check(testkit.Rows("500++200++30++20++20++10")) + + result = tk.MustQuery("select group_concat(id ORDER BY name desc, id asc SEPARATOR '--') from test;") + result.Check(testkit.Rows("3--3--1--1--2--1")) + + result = tk.MustQuery("select group_concat(name ORDER BY name desc SEPARATOR '++'), group_concat(id ORDER BY name desc, id asc SEPARATOR '--') from test;") + result.Check(testkit.Rows("500++200++30++20++20++10 3--3--1--1--2--1")) + + result = tk.MustQuery("select group_concat(distinct name order by name desc) from test;") + result.Check(testkit.Rows("500,200,30,20,10")) + + expected := "3--3--1--1--2--1" + for maxLen := 4; maxLen < len(expected); maxLen++ { + tk.MustExec(fmt.Sprintf("set session group_concat_max_len=%v", maxLen)) + result = tk.MustQuery("select group_concat(id ORDER BY name desc, id asc SEPARATOR '--') from test;") + result.Check(testkit.Rows(expected[:maxLen])) + c.Assert(tk.Se.GetSessionVars().StmtCtx.GetWarnings(), HasLen, 1) + } + expected = "1--2--1--1--3--3" + for maxLen := 4; maxLen < len(expected); maxLen++ { + tk.MustExec(fmt.Sprintf("set session group_concat_max_len=%v", maxLen)) + result = tk.MustQuery("select group_concat(id ORDER BY name asc, id desc SEPARATOR '--') from test;") + result.Check(testkit.Rows(expected[:maxLen])) + c.Assert(tk.Se.GetSessionVars().StmtCtx.GetWarnings(), HasLen, 1) + } + expected = "500,200,30,20,10" + for maxLen := 4; maxLen < len(expected); maxLen++ { + tk.MustExec(fmt.Sprintf("set session group_concat_max_len=%v", maxLen)) + result = tk.MustQuery("select group_concat(distinct name order by name desc) from test;") + result.Check(testkit.Rows(expected[:maxLen])) + c.Assert(tk.Se.GetSessionVars().StmtCtx.GetWarnings(), HasLen, 1) + } + + tk.MustExec(fmt.Sprintf("set session group_concat_max_len=%v", 1024)) + + // test varchar table + tk.MustExec("drop table if exists test2;") + tk.MustExec("create table test2(id varchar(20), name varchar(20));") + tk.MustExec("insert into test2 select * from test;") + + tk.MustQuery("select group_concat(id ORDER BY name) from (select * from test2 order by id, name limit 2,2) t").Check(testkit.Rows("2,1")) + tk.MustQuery("select group_concat(id ORDER BY name desc) from (select * from test2 order by id, name limit 2,2) t").Check(testkit.Rows("1,2")) + tk.MustQuery("select group_concat(name ORDER BY id) from (select * from test2 order by id, name limit 2,2) t").Check(testkit.Rows("30,20")) + tk.MustQuery("select group_concat(name ORDER BY id desc) from (select * from test2 order by id, name limit 2,2) t").Check(testkit.Rows("20,30")) + + result = tk.MustQuery("select group_concat(name ORDER BY name desc SEPARATOR '++'), group_concat(id ORDER BY name desc, id asc SEPARATOR '--') from test2;") + result.Check(testkit.Rows("500++30++200++20++20++10 3--1--3--1--2--1")) + + // test Position Expr + tk.MustQuery("select 1, 2, 3, 4, 5 , group_concat(name, id ORDER BY 1 desc, id SEPARATOR '++') from test;").Check(testkit.Rows("1 2 3 4 5 5003++2003++301++201++202++101")) + tk.MustQuery("select 1, 2, 3, 4, 5 , group_concat(name, id ORDER BY 2 desc, name SEPARATOR '++') from test;").Check(testkit.Rows("1 2 3 4 5 2003++5003++202++101++201++301")) + err = tk.ExecToErr("select 1, 2, 3, 4, 5 , group_concat(name, id ORDER BY 3 desc, name SEPARATOR '++') from test;") + c.Assert(err.Error(), Equals, "[planner:1054]Unknown column '3' in 'order clause'") + + // test Param Marker + tk.MustExec(`prepare s1 from "select 1, 2, 3, 4, 5 , group_concat(name, id ORDER BY floor(id/?) desc, name SEPARATOR '++') from test";`) + tk.MustExec("set @a=2;") + tk.MustQuery("execute s1 using @a;").Check(testkit.Rows("1 2 3 4 5 202++2003++5003++101++201++301")) + + tk.MustExec(`prepare s1 from "select 1, 2, 3, 4, 5 , group_concat(name, id ORDER BY ? desc, name SEPARATOR '++') from test";`) + tk.MustExec("set @a=2;") + tk.MustQuery("execute s1 using @a;").Check(testkit.Rows("1 2 3 4 5 2003++5003++202++101++201++301")) + tk.MustExec("set @a=3;") + err = tk.ExecToErr("execute s1 using @a;") + c.Assert(err.Error(), Equals, "[planner:1054]Unknown column '?' in 'order clause'") + tk.MustExec("set @a=3.0;") + tk.MustQuery("execute s1 using @a;").Check(testkit.Rows("1 2 3 4 5 101++202++201++301++2003++5003")) + + // test partition table + tk.MustExec("drop table if exists ptest;") + tk.MustExec("CREATE TABLE ptest (id int,name int) PARTITION BY RANGE ( id ) " + + "(PARTITION `p0` VALUES LESS THAN (2), PARTITION `p1` VALUES LESS THAN (11))") + tk.MustExec("insert into ptest select * from test;") + + for i := 0; i <= 1; i++ { + for j := 0; j <= 1; j++ { + tk.MustExec(fmt.Sprintf("set session tidb_opt_distinct_agg_push_down = %v", i)) + tk.MustExec(fmt.Sprintf("set session tidb_opt_agg_push_down = %v", j)) + + result = tk.MustQuery("select /*+ agg_to_cop */ group_concat(name ORDER BY name desc SEPARATOR '++'), group_concat(id ORDER BY name desc, id asc SEPARATOR '--') from ptest;") + result.Check(testkit.Rows("500++200++30++20++20++10 3--3--1--1--2--1")) + + result = tk.MustQuery("select /*+ agg_to_cop */ group_concat(distinct name order by name desc) from ptest;") + result.Check(testkit.Rows("500,200,30,20,10")) + } + } + // issue #9920 tk.MustQuery("select group_concat(123, null)").Check(testkit.Rows("")) } diff --git a/executor/benchmark_test.go b/executor/benchmark_test.go index 2bf90342b0b3d..562d7b167212d 100644 --- a/executor/benchmark_test.go +++ b/executor/benchmark_test.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/tidb/expression/aggregation" "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/planner/property" + "github.com/pingcap/tidb/planner/util" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/types" @@ -469,9 +470,9 @@ func buildWindowExecutor(ctx sessionctx.Context, windowFunc string, funcs int, f var tail core.PhysicalPlan = win if !dataSourceSorted { - byItems := make([]*core.ByItems, 0, len(partitionBy)) + byItems := make([]*util.ByItems, 0, len(partitionBy)) for _, col := range partitionBy { - byItems = append(byItems, &core.ByItems{Expr: col, Desc: false}) + byItems = append(byItems, &util.ByItems{Expr: col, Desc: false}) } sort := &core.PhysicalSort{ByItems: byItems} sort.SetChildren(src) @@ -1583,11 +1584,11 @@ func benchmarkSortExec(b *testing.B, cas *sortCase) { dataSource := buildMockDataSource(opt) exec := &SortExec{ baseExecutor: newBaseExecutor(cas.ctx, dataSource.schema, stringutil.StringerStr("sort"), dataSource), - ByItems: make([]*core.ByItems, 0, len(cas.orderByIdx)), + ByItems: make([]*util.ByItems, 0, len(cas.orderByIdx)), schema: dataSource.schema, } for _, idx := range cas.orderByIdx { - exec.ByItems = append(exec.ByItems, &core.ByItems{Expr: cas.columns()[idx]}) + exec.ByItems = append(exec.ByItems, &util.ByItems{Expr: cas.columns()[idx]}) } b.ResetTimer() for i := 0; i < b.N; i++ { diff --git a/executor/builder.go b/executor/builder.go index 7fa4fdda48fdf..dcf0fe24a9f71 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -40,6 +40,7 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/metrics" plannercore "github.com/pingcap/tidb/planner/core" + plannerutil "github.com/pingcap/tidb/planner/util" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/table" @@ -869,7 +870,7 @@ func (b *executorBuilder) buildTrace(v *plannercore.Trace) Executor { if t.format == plannercore.TraceFormatLog { return &SortExec{ baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), t), - ByItems: []*plannercore.ByItems{ + ByItems: []*plannerutil.ByItems{ {Expr: &expression.Column{ Index: 0, RetType: types.NewFieldType(mysql.TypeTimestamp), @@ -1196,7 +1197,7 @@ func (b *executorBuilder) buildHashAgg(v *plannercore.PhysicalHashAgg) Executor e.defaultVal = chunk.NewChunkWithCapacity(retTypes(e), 1) } for _, aggDesc := range v.AggFuncs { - if aggDesc.HasDistinct { + if aggDesc.HasDistinct || len(aggDesc.OrderByItems) > 0 { e.isUnparallelExec = true } } diff --git a/executor/executor_pkg_test.go b/executor/executor_pkg_test.go index ae0e71886cf48..e9bb78c2691f2 100644 --- a/executor/executor_pkg_test.go +++ b/executor/executor_pkg_test.go @@ -23,7 +23,7 @@ import ( "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/expression" - "github.com/pingcap/tidb/planner/core" + plannerutil "github.com/pingcap/tidb/planner/util" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util" @@ -276,11 +276,11 @@ func (s *testExecSerialSuite) TestSortSpillDisk(c *C) { dataSource := buildMockDataSource(opt) exec := &SortExec{ baseExecutor: newBaseExecutor(cas.ctx, dataSource.schema, stringutil.StringerStr("sort"), dataSource), - ByItems: make([]*core.ByItems, 0, len(cas.orderByIdx)), + ByItems: make([]*plannerutil.ByItems, 0, len(cas.orderByIdx)), schema: dataSource.schema, } for _, idx := range cas.orderByIdx { - exec.ByItems = append(exec.ByItems, &core.ByItems{Expr: cas.columns()[idx]}) + exec.ByItems = append(exec.ByItems, &plannerutil.ByItems{Expr: cas.columns()[idx]}) } tmpCtx := context.Background() chk := newFirstChunk(exec) diff --git a/executor/executor_required_rows_test.go b/executor/executor_required_rows_test.go index 56527396346a9..d6f8271efcbf5 100644 --- a/executor/executor_required_rows_test.go +++ b/executor/executor_required_rows_test.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/expression/aggregation" plannercore "github.com/pingcap/tidb/planner/core" + "github.com/pingcap/tidb/planner/util" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/types" @@ -255,10 +256,10 @@ func (s *testExecSuite) TestSortRequiredRows(c *C) { sctx := defaultCtx() ctx := context.Background() ds := newRequiredRowsDataSource(sctx, testCase.totalRows, testCase.expectedRowsDS) - byItems := make([]*plannercore.ByItems, 0, len(testCase.groupBy)) + byItems := make([]*util.ByItems, 0, len(testCase.groupBy)) for _, groupBy := range testCase.groupBy { col := ds.Schema().Columns[groupBy] - byItems = append(byItems, &plannercore.ByItems{Expr: col}) + byItems = append(byItems, &util.ByItems{Expr: col}) } exec := buildSortExec(sctx, byItems, ds) c.Assert(exec.Open(ctx), IsNil) @@ -273,7 +274,7 @@ func (s *testExecSuite) TestSortRequiredRows(c *C) { } } -func buildSortExec(sctx sessionctx.Context, byItems []*plannercore.ByItems, src Executor) Executor { +func buildSortExec(sctx sessionctx.Context, byItems []*util.ByItems, src Executor) Executor { sortExec := SortExec{ baseExecutor: newBaseExecutor(sctx, src.Schema(), nil, src), ByItems: byItems, @@ -362,10 +363,10 @@ func (s *testExecSuite) TestTopNRequiredRows(c *C) { sctx := defaultCtx() ctx := context.Background() ds := newRequiredRowsDataSource(sctx, testCase.totalRows, testCase.expectedRowsDS) - byItems := make([]*plannercore.ByItems, 0, len(testCase.groupBy)) + byItems := make([]*util.ByItems, 0, len(testCase.groupBy)) for _, groupBy := range testCase.groupBy { col := ds.Schema().Columns[groupBy] - byItems = append(byItems, &plannercore.ByItems{Expr: col}) + byItems = append(byItems, &util.ByItems{Expr: col}) } exec := buildTopNExec(sctx, testCase.topNOffset, testCase.topNCount, byItems, ds) c.Assert(exec.Open(ctx), IsNil) @@ -380,7 +381,7 @@ func (s *testExecSuite) TestTopNRequiredRows(c *C) { } } -func buildTopNExec(ctx sessionctx.Context, offset, count int, byItems []*plannercore.ByItems, src Executor) Executor { +func buildTopNExec(ctx sessionctx.Context, offset, count int, byItems []*util.ByItems, src Executor) Executor { sortExec := SortExec{ baseExecutor: newBaseExecutor(ctx, src.Schema(), nil, src), ByItems: byItems, diff --git a/executor/sort.go b/executor/sort.go index 8f5be20742055..ff67fa61a370f 100644 --- a/executor/sort.go +++ b/executor/sort.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/expression" plannercore "github.com/pingcap/tidb/planner/core" + "github.com/pingcap/tidb/planner/util" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/disk" @@ -35,7 +36,7 @@ var rowChunksLabel fmt.Stringer = stringutil.StringerStr("rowChunks") type SortExec struct { baseExecutor - ByItems []*plannercore.ByItems + ByItems []*util.ByItems Idx int fetched bool schema *expression.Schema diff --git a/expression/aggregation/agg_to_pb.go b/expression/aggregation/agg_to_pb.go index 1213e91c3adf4..f82982447190d 100644 --- a/expression/aggregation/agg_to_pb.go +++ b/expression/aggregation/agg_to_pb.go @@ -29,6 +29,9 @@ func AggFuncToPBExpr(sc *stmtctx.StatementContext, client kv.Client, aggFunc *Ag if aggFunc.HasDistinct { // do nothing and ignore aggFunc.HasDistinct } + if len(aggFunc.OrderByItems) > 0 { + return nil + } pc := expression.NewPBConverter(client, sc) var tp tipb.ExprType switch aggFunc.Name { diff --git a/expression/aggregation/aggregation.go b/expression/aggregation/aggregation.go index 5a47f6ac01777..12cca5873f5bb 100644 --- a/expression/aggregation/aggregation.go +++ b/expression/aggregation/aggregation.go @@ -191,6 +191,9 @@ func IsAllFirstRow(aggFuncs []*AggFuncDesc) bool { // CheckAggPushDown checks whether an agg function can be pushed to storage. func CheckAggPushDown(aggFunc *AggFuncDesc, storeType kv.StoreType) bool { + if len(aggFunc.OrderByItems) > 0 { + return false + } ret := true switch storeType { case kv.TiFlash: diff --git a/expression/aggregation/descriptor.go b/expression/aggregation/descriptor.go index 58b2e98502a86..c3aab50bd66cc 100644 --- a/expression/aggregation/descriptor.go +++ b/expression/aggregation/descriptor.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/parser/ast" "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/expression" + "github.com/pingcap/tidb/planner/util" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/types" @@ -34,6 +35,8 @@ type AggFuncDesc struct { Mode AggFunctionMode // HasDistinct represents whether the aggregation function contains distinct attribute. HasDistinct bool + // OrderByItems represents the order by clause used in GROUP_CONCAT + OrderByItems []*util.ByItems } // NewAggFuncDesc creates an aggregation function signature descriptor. @@ -67,6 +70,14 @@ func (a *AggFuncDesc) Equal(ctx sessionctx.Context, other *AggFuncDesc) bool { if a.HasDistinct != other.HasDistinct { return false } + if len(a.OrderByItems) != len(other.OrderByItems) { + return false + } + for i := range a.OrderByItems { + if !a.OrderByItems[i].Equal(ctx, other.OrderByItems[i]) { + return false + } + } return a.baseFuncDesc.equal(ctx, &other.baseFuncDesc) } @@ -74,6 +85,10 @@ func (a *AggFuncDesc) Equal(ctx sessionctx.Context, other *AggFuncDesc) bool { func (a *AggFuncDesc) Clone() *AggFuncDesc { clone := *a clone.baseFuncDesc = *a.baseFuncDesc.clone() + clone.OrderByItems = make([]*util.ByItems, len(a.OrderByItems)) + for i, byItem := range a.OrderByItems { + clone.OrderByItems[i] = byItem.Clone() + } return &clone } diff --git a/expression/aggregation/explain.go b/expression/aggregation/explain.go index 0a6a01a4ed8b7..b001a21c23d1e 100644 --- a/expression/aggregation/explain.go +++ b/expression/aggregation/explain.go @@ -16,6 +16,8 @@ package aggregation import ( "bytes" "fmt" + + "github.com/pingcap/parser/ast" ) // ExplainAggFunc generates explain information for a aggregation function. @@ -26,10 +28,25 @@ func ExplainAggFunc(agg *AggFuncDesc) string { buffer.WriteString("distinct ") } for i, arg := range agg.Args { - buffer.WriteString(arg.ExplainInfo()) - if i+1 < len(agg.Args) { + if agg.Name == ast.AggFuncGroupConcat && i == len(agg.Args)-1 { + if len(agg.OrderByItems) > 0 { + buffer.WriteString(" order by ") + for i, item := range agg.OrderByItems { + order := "asc" + if item.Desc { + order = "desc" + } + fmt.Fprintf(&buffer, "%s %s", item.Expr.ExplainInfo(), order) + if i+1 < len(agg.OrderByItems) { + buffer.WriteString(", ") + } + } + } + buffer.WriteString(" separator ") + } else if i != 0 { buffer.WriteString(", ") } + buffer.WriteString(arg.ExplainInfo()) } buffer.WriteString(")") return buffer.String() diff --git a/planner/cascades/enforcer_rules.go b/planner/cascades/enforcer_rules.go index 8c96d955d0b94..183169ffa3913 100644 --- a/planner/cascades/enforcer_rules.go +++ b/planner/cascades/enforcer_rules.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/tidb/planner/implementation" "github.com/pingcap/tidb/planner/memo" "github.com/pingcap/tidb/planner/property" + "github.com/pingcap/tidb/planner/util" ) // Enforcer defines the interface for enforcer rules. @@ -62,10 +63,10 @@ func (e *OrderEnforcer) NewProperty(prop *property.PhysicalProperty) (newProp *p func (e *OrderEnforcer) OnEnforce(reqProp *property.PhysicalProperty, child memo.Implementation) (impl memo.Implementation) { childPlan := child.GetPlan() sort := plannercore.PhysicalSort{ - ByItems: make([]*plannercore.ByItems, 0, len(reqProp.Items)), + ByItems: make([]*util.ByItems, 0, len(reqProp.Items)), }.Init(childPlan.SCtx(), childPlan.Stats(), childPlan.SelectBlockOffset(), &property.PhysicalProperty{ExpectedCnt: math.MaxFloat64}) for _, item := range reqProp.Items { - item := &plannercore.ByItems{ + item := &util.ByItems{ Expr: item.Col, Desc: item.Desc, } diff --git a/planner/cascades/testdata/transformation_rules_suite_out.json b/planner/cascades/testdata/transformation_rules_suite_out.json index f0d8475bfb911..ba8bbcae8e221 100644 --- a/planner/cascades/testdata/transformation_rules_suite_out.json +++ b/planner/cascades/testdata/transformation_rules_suite_out.json @@ -2325,7 +2325,7 @@ "Group#0 Schema:[Column#13,Column#14,Column#15,test.t.f,Column#16,Column#17,Column#18,Column#19,Column#20,Column#21]", " Projection_3 input:[Group#1], Column#13, Column#14, Column#15, test.t.f, Column#16, Column#17, Column#18, Column#19, Column#20, Column#21", "Group#1 Schema:[Column#13,Column#14,Column#15,Column#16,Column#17,Column#18,Column#19,Column#20,Column#21,test.t.f]", - " Aggregation_2 input:[Group#2], group by:test.t.a, funcs:count(test.t.b), sum(test.t.b), avg(test.t.b), max(test.t.c), min(test.t.c), bit_and(test.t.c), bit_or(test.t.d), bit_xor(test.t.g), group_concat(test.t.b, test.t.c, test.t.d, test.t.f, \",\"), firstrow(test.t.f)", + " Aggregation_2 input:[Group#2], group by:test.t.a, funcs:count(test.t.b), sum(test.t.b), avg(test.t.b), max(test.t.c), min(test.t.c), bit_and(test.t.c), bit_or(test.t.d), bit_xor(test.t.g), group_concat(test.t.b, test.t.c, test.t.d, test.t.f separator \",\"), firstrow(test.t.f)", "Group#2 Schema:[test.t.a,test.t.b,test.t.c,test.t.d,test.t.f,test.t.g]", " DataSource_1 table:t" ] diff --git a/planner/cascades/transformation_rules.go b/planner/cascades/transformation_rules.go index 9296bd9c468d0..9ea7b8c4234b2 100644 --- a/planner/cascades/transformation_rules.go +++ b/planner/cascades/transformation_rules.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/tidb/kv" plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/planner/memo" + "github.com/pingcap/tidb/planner/util" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/ranger" @@ -1146,7 +1147,7 @@ func pushTopNDownOuterJoinToChild(topN *plannercore.LogicalTopN, outerGroup *mem newTopN := plannercore.LogicalTopN{ Count: topN.Count + topN.Offset, - ByItems: make([]*plannercore.ByItems, len(topN.ByItems)), + ByItems: make([]*util.ByItems, len(topN.ByItems)), }.Init(topN.SCtx(), topN.SelectBlockOffset()) for i := range topN.ByItems { @@ -1225,9 +1226,9 @@ func (r *PushTopNDownProjection) OnTransform(old *memo.ExprIter) (newExprs []*me Count: topN.Count, }.Init(topN.SCtx(), topN.SelectBlockOffset()) - newTopN.ByItems = make([]*plannercore.ByItems, 0, len(topN.ByItems)) + newTopN.ByItems = make([]*util.ByItems, 0, len(topN.ByItems)) for _, by := range topN.ByItems { - newTopN.ByItems = append(newTopN.ByItems, &plannercore.ByItems{ + newTopN.ByItems = append(newTopN.ByItems, &util.ByItems{ Expr: expression.ColumnSubstitute(by.Expr, old.Children[0].Group.Prop.Schema, proj.Exprs), Desc: by.Desc, }) @@ -1538,8 +1539,8 @@ func (r *EliminateSingleMaxMin) OnTransform(old *memo.ExprIter) (newExprs []*mem // Add top(1) operators. // For max function, the sort order should be desc. desc := f.Name == ast.AggFuncMax - var byItems []*plannercore.ByItems - byItems = append(byItems, &plannercore.ByItems{ + var byItems []*util.ByItems + byItems = append(byItems, &util.ByItems{ Expr: f.Args[0], Desc: desc, }) @@ -2180,7 +2181,7 @@ func (r *InjectProjectionBelowTopN) OnTransform(old *memo.ExprIter) (newExprs [] bottomProjExprs = append(bottomProjExprs, col) bottomProjSchema = append(bottomProjSchema, col) } - newByItems := make([]*plannercore.ByItems, 0, len(topN.ByItems)) + newByItems := make([]*util.ByItems, 0, len(topN.ByItems)) for _, item := range topN.ByItems { itemExpr := item.Expr if _, isScalarFunc := itemExpr.(*expression.ScalarFunction); !isScalarFunc { @@ -2193,7 +2194,7 @@ func (r *InjectProjectionBelowTopN) OnTransform(old *memo.ExprIter) (newExprs [] RetType: itemExpr.GetType(), } bottomProjSchema = append(bottomProjSchema, newCol) - newByItems = append(newByItems, &plannercore.ByItems{Expr: newCol, Desc: item.Desc}) + newByItems = append(newByItems, &util.ByItems{Expr: newCol, Desc: item.Desc}) } bottomProj := plannercore.LogicalProjection{ Exprs: bottomProjExprs, diff --git a/planner/core/exhaust_physical_plans.go b/planner/core/exhaust_physical_plans.go index 137df9fa30118..57aaad642af17 100644 --- a/planner/core/exhaust_physical_plans.go +++ b/planner/core/exhaust_physical_plans.go @@ -1516,7 +1516,7 @@ func (lt *LogicalTopN) getPhysLimits() []PhysicalPlan { } // MatchItems checks if this prop's columns can match by items totally. -func MatchItems(p *property.PhysicalProperty, items []*ByItems) bool { +func MatchItems(p *property.PhysicalProperty, items []*util.ByItems) bool { if len(items) < len(p.Items) { return false } diff --git a/planner/core/explain.go b/planner/core/explain.go index 126817e8db9ad..71f5a0b3fbbd9 100644 --- a/planner/core/explain.go +++ b/planner/core/explain.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/parser/ast" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/expression/aggregation" + "github.com/pingcap/tidb/planner/util" "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/util/stringutil" ) @@ -674,7 +675,7 @@ func (p *LogicalUnionScan) ExplainInfo() string { return buffer.String() } -func explainByItems(buffer *bytes.Buffer, byItems []*ByItems) *bytes.Buffer { +func explainByItems(buffer *bytes.Buffer, byItems []*util.ByItems) *bytes.Buffer { for i, item := range byItems { order := "asc" if item.Desc { @@ -688,7 +689,7 @@ func explainByItems(buffer *bytes.Buffer, byItems []*ByItems) *bytes.Buffer { return buffer } -func explainNormalizedByItems(buffer *bytes.Buffer, byItems []*ByItems) *bytes.Buffer { +func explainNormalizedByItems(buffer *bytes.Buffer, byItems []*util.ByItems) *bytes.Buffer { for i, item := range byItems { order := "asc" if item.Desc { diff --git a/planner/core/find_best_task.go b/planner/core/find_best_task.go index 578f226871122..a928ffb5db921 100644 --- a/planner/core/find_best_task.go +++ b/planner/core/find_best_task.go @@ -68,7 +68,7 @@ var invalidTask = &rootTask{cst: math.MaxFloat64} // GetPropByOrderByItems will check if this sort property can be pushed or not. In order to simplify the problem, we only // consider the case that all expression are columns. -func GetPropByOrderByItems(items []*ByItems) (*property.PhysicalProperty, bool) { +func GetPropByOrderByItems(items []*util.ByItems) (*property.PhysicalProperty, bool) { propItems := make([]property.Item, 0, len(items)) for _, item := range items { col, ok := item.Expr.(*expression.Column) @@ -82,7 +82,7 @@ func GetPropByOrderByItems(items []*ByItems) (*property.PhysicalProperty, bool) // GetPropByOrderByItemsContainScalarFunc will check if this sort property can be pushed or not. In order to simplify the // problem, we only consider the case that all expression are columns or some special scalar functions. -func GetPropByOrderByItemsContainScalarFunc(items []*ByItems) (*property.PhysicalProperty, bool, bool) { +func GetPropByOrderByItemsContainScalarFunc(items []*util.ByItems) (*property.PhysicalProperty, bool, bool) { propItems := make([]property.Item, 0, len(items)) onlyColumn := true for _, item := range items { diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index 6abb0a30df9e2..6892d0b60e966 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -20,6 +20,7 @@ import ( "math/bits" "reflect" "sort" + "strconv" "strings" "time" "unicode" @@ -39,6 +40,7 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/planner/property" + "github.com/pingcap/tidb/planner/util" "github.com/pingcap/tidb/privilege" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/statistics" @@ -109,6 +111,54 @@ func (la *LogicalAggregation) collectGroupByColumns() { } } +// aggOrderByResolver is currently resolving expressions of order by clause +// in aggregate function GROUP_CONCAT. +type aggOrderByResolver struct { + ctx sessionctx.Context + err error + args []ast.ExprNode + exprDepth int // exprDepth is the depth of current expression in expression tree. +} + +func (a *aggOrderByResolver) Enter(inNode ast.Node) (ast.Node, bool) { + a.exprDepth++ + switch n := inNode.(type) { + case *driver.ParamMarkerExpr: + if a.exprDepth == 1 { + _, isNull, isExpectedType := getUintFromNode(a.ctx, n) + // For constant uint expression in top level, it should be treated as position expression. + if !isNull && isExpectedType { + return expression.ConstructPositionExpr(n), true + } + } + } + return inNode, false +} + +func (a *aggOrderByResolver) Leave(inNode ast.Node) (ast.Node, bool) { + switch v := inNode.(type) { + case *ast.PositionExpr: + pos, isNull, err := expression.PosFromPositionExpr(a.ctx, v) + if err != nil { + a.err = err + } + if err != nil || isNull { + return inNode, false + } + if pos < 1 || pos > len(a.args) { + errPos := strconv.Itoa(pos) + if v.P != nil { + errPos = "?" + } + a.err = ErrUnknownColumn.FastGenByArgs(errPos, "order clause") + return inNode, false + } + ret := a.args[pos-1] + return ret, true + } + return inNode, true +} + func (b *PlanBuilder) buildAggregation(ctx context.Context, p LogicalPlan, aggFuncList []*ast.AggregateFuncExpr, gbyItems []expression.Expression) (LogicalPlan, map[int]int, error) { b.optFlag |= flagBuildKeyInfo b.optFlag |= flagPushDownAgg @@ -144,6 +194,27 @@ func (b *PlanBuilder) buildAggregation(ctx context.Context, p LogicalPlan, aggFu if err != nil { return nil, nil, err } + if aggFunc.Order != nil { + trueArgs := aggFunc.Args[:len(aggFunc.Args)-1] // the last argument is SEPARATOR, remote it. + resolver := &aggOrderByResolver{ + ctx: b.ctx, + args: trueArgs, + } + for _, byItem := range aggFunc.Order.Items { + resolver.exprDepth = 0 + resolver.err = nil + retExpr, _ := byItem.Expr.Accept(resolver) + if resolver.err != nil { + return nil, nil, errors.Trace(resolver.err) + } + newByItem, np, err := b.rewrite(ctx, retExpr.(ast.ExprNode), p, nil, true) + if err != nil { + return nil, nil, err + } + p = np + newFunc.OrderByItems = append(newFunc.OrderByItems, &util.ByItems{Expr: newByItem, Desc: byItem.Desc}) + } + } combined := false for j, oldFunc := range plan4Agg.AggFuncs { if oldFunc.Equal(b.ctx, newFunc) { @@ -1164,30 +1235,6 @@ func (b *PlanBuilder) buildUnionAll(ctx context.Context, subPlan []LogicalPlan) return u } -// ByItems wraps a "by" item. -type ByItems struct { - Expr expression.Expression - Desc bool -} - -// String implements fmt.Stringer interface. -func (by *ByItems) String() string { - if by.Desc { - return fmt.Sprintf("%s true", by.Expr) - } - return by.Expr.String() -} - -// Clone makes a copy of ByItems. -func (by *ByItems) Clone() *ByItems { - return &ByItems{Expr: by.Expr.Clone(), Desc: by.Desc} -} - -// Equal checks whether two ByItems are equal. -func (by *ByItems) Equal(ctx sessionctx.Context, other *ByItems) bool { - return by.Expr.Equal(ctx, other.Expr) && by.Desc == other.Desc -} - // itemTransformer transforms ParamMarkerExpr to PositionExpr in the context of ByItem type itemTransformer struct { } @@ -1212,7 +1259,7 @@ func (b *PlanBuilder) buildSort(ctx context.Context, p LogicalPlan, byItems []*a b.curClause = orderByClause } sort := LogicalSort{}.Init(b.ctx, b.getSelectOffset()) - exprs := make([]*ByItems, 0, len(byItems)) + exprs := make([]*util.ByItems, 0, len(byItems)) transformer := &itemTransformer{} for _, item := range byItems { newExpr, _ := item.Expr.Accept(transformer) @@ -1223,7 +1270,7 @@ func (b *PlanBuilder) buildSort(ctx context.Context, p LogicalPlan, byItems []*a } p = np - exprs = append(exprs, &ByItems{Expr: it, Desc: item.Desc}) + exprs = append(exprs, &util.ByItems{Expr: it, Desc: item.Desc}) } sort.ByItems = exprs sort.SetChildren(p) diff --git a/planner/core/logical_plan_test.go b/planner/core/logical_plan_test.go index 7c0d9dafc0af8..b0955ced8061c 100644 --- a/planner/core/logical_plan_test.go +++ b/planner/core/logical_plan_test.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/planner/property" + "github.com/pingcap/tidb/planner/util" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/util/hint" @@ -1381,7 +1382,7 @@ func (s *testPlanSuite) optimize(ctx context.Context, sql string) (PhysicalPlan, return p.(PhysicalPlan), stmt, err } -func byItemsToProperty(byItems []*ByItems) *property.PhysicalProperty { +func byItemsToProperty(byItems []*util.ByItems) *property.PhysicalProperty { pp := &property.PhysicalProperty{} for _, item := range byItems { pp.Items = append(pp.Items, property.Item{Col: item.Expr.(*expression.Column), Desc: item.Desc}) @@ -1463,7 +1464,7 @@ func (s *testPlanSuite) TestSkylinePruning(c *C) { _, err = lp.recursiveDeriveStats() c.Assert(err, IsNil, comment) var ds *DataSource - var byItems []*ByItems + var byItems []*util.ByItems for ds == nil { switch v := lp.(type) { case *DataSource: @@ -1472,12 +1473,12 @@ func (s *testPlanSuite) TestSkylinePruning(c *C) { byItems = v.ByItems lp = lp.Children()[0] case *LogicalProjection: - newItems := make([]*ByItems, 0, len(byItems)) + newItems := make([]*util.ByItems, 0, len(byItems)) for _, col := range byItems { idx := v.schema.ColumnIndex(col.Expr.(*expression.Column)) switch expr := v.Exprs[idx].(type) { case *expression.Column: - newItems = append(newItems, &ByItems{Expr: expr, Desc: col.Desc}) + newItems = append(newItems, &util.ByItems{Expr: expr, Desc: col.Desc}) } } byItems = newItems diff --git a/planner/core/logical_plans.go b/planner/core/logical_plans.go index 09e5fce7fbefd..236bbf1a682ec 100644 --- a/planner/core/logical_plans.go +++ b/planner/core/logical_plans.go @@ -884,7 +884,7 @@ type LogicalUnionAll struct { type LogicalSort struct { baseLogicalPlan - ByItems []*ByItems + ByItems []*util.ByItems } // ExtractCorrelatedCols implements LogicalPlan interface. @@ -900,7 +900,7 @@ func (ls *LogicalSort) ExtractCorrelatedCols() []*expression.CorrelatedColumn { type LogicalTopN struct { baseLogicalPlan - ByItems []*ByItems + ByItems []*util.ByItems Offset uint64 Count uint64 } diff --git a/planner/core/pb_to_plan.go b/planner/core/pb_to_plan.go index 04b13d3b7b743..5ab1e76062b6f 100644 --- a/planner/core/pb_to_plan.go +++ b/planner/core/pb_to_plan.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/expression/aggregation" "github.com/pingcap/tidb/infoschema" + "github.com/pingcap/tidb/planner/util" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/types" "github.com/pingcap/tipb/go-tipb" @@ -137,13 +138,13 @@ func (b *PBPlanBuilder) pbToSelection(e *tipb.Executor) (PhysicalPlan, error) { func (b *PBPlanBuilder) pbToTopN(e *tipb.Executor) (PhysicalPlan, error) { topN := e.TopN sc := b.sctx.GetSessionVars().StmtCtx - byItems := make([]*ByItems, 0, len(topN.OrderBy)) + byItems := make([]*util.ByItems, 0, len(topN.OrderBy)) for _, item := range topN.OrderBy { expr, err := expression.PBToExpr(item.Expr, b.tps, sc) if err != nil { return nil, errors.Trace(err) } - byItems = append(byItems, &ByItems{Expr: expr, Desc: item.Desc}) + byItems = append(byItems, &util.ByItems{Expr: expr, Desc: item.Desc}) } p := PhysicalTopN{ ByItems: byItems, diff --git a/planner/core/physical_plan_test.go b/planner/core/physical_plan_test.go index 13ab04b563796..d28681262225d 100644 --- a/planner/core/physical_plan_test.go +++ b/planner/core/physical_plan_test.go @@ -948,6 +948,51 @@ func (s *testPlanSuite) doTestPushdownDistinct(c *C, vars, input []string, outpu } } +func (s *testPlanSuite) TestGroupConcatOrderby(c *C) { + var ( + input []string + output []struct { + SQL string + Plan []string + Result []string + } + ) + s.testData.GetTestCases(c, &input, &output) + store, dom, err := newStoreWithBootstrap() + c.Assert(err, IsNil) + defer func() { + dom.Close() + store.Close() + }() + tk := testkit.NewTestKit(c, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists test;") + tk.MustExec("create table test(id int, name int)") + tk.MustExec("insert into test values(1, 10);") + tk.MustExec("insert into test values(1, 20);") + tk.MustExec("insert into test values(1, 30);") + tk.MustExec("insert into test values(2, 20);") + tk.MustExec("insert into test values(3, 200);") + tk.MustExec("insert into test values(3, 500);") + + tk.MustExec("drop table if exists ptest;") + tk.MustExec("CREATE TABLE ptest (id int,name int) PARTITION BY RANGE ( id ) " + + "(PARTITION `p0` VALUES LESS THAN (2), PARTITION `p1` VALUES LESS THAN (11))") + tk.MustExec("insert into ptest select * from test;") + tk.MustExec(fmt.Sprintf("set session tidb_opt_distinct_agg_push_down = %v", 1)) + tk.MustExec(fmt.Sprintf("set session tidb_opt_agg_push_down = %v", 1)) + + for i, ts := range input { + s.testData.OnRecord(func() { + output[i].SQL = ts + output[i].Plan = s.testData.ConvertRowsToStrings(tk.MustQuery("explain " + ts).Rows()) + output[i].Result = s.testData.ConvertRowsToStrings(tk.MustQuery(ts).Sort().Rows()) + }) + tk.MustQuery("explain " + ts).Check(testkit.Rows(output[i].Plan...)) + tk.MustQuery(ts).Check(testkit.Rows(output[i].Result...)) + } +} + func (s *testPlanSuite) TestHintAlias(c *C) { defer testleak.AfterTest(c)() store, dom, err := newStoreWithBootstrap() diff --git a/planner/core/physical_plans.go b/planner/core/physical_plans.go index 6a68a036ff5f2..00697ddd628e7 100644 --- a/planner/core/physical_plans.go +++ b/planner/core/physical_plans.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/tidb/expression/aggregation" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/planner/property" + "github.com/pingcap/tidb/planner/util" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/statistics" @@ -297,7 +298,7 @@ type PhysicalProjection struct { type PhysicalTopN struct { basePhysicalPlan - ByItems []*ByItems + ByItems []*util.ByItems Offset uint64 Count uint64 } @@ -491,7 +492,7 @@ type PhysicalStreamAgg struct { type PhysicalSort struct { basePhysicalPlan - ByItems []*ByItems + ByItems []*util.ByItems } // NominalSort asks sort properties for its child. It is a fake operator that will not @@ -502,7 +503,7 @@ type NominalSort struct { // These two fields are used to switch ScalarFunctions to Constants. For these // NominalSorts, we need to converted to Projections check if the ScalarFunctions // are out of bounds. (issue #11653) - ByItems []*ByItems + ByItems []*util.ByItems OnlyColumn bool } diff --git a/planner/core/plan.go b/planner/core/plan.go index 441027ee5f7c4..b4a70d6dc7767 100644 --- a/planner/core/plan.go +++ b/planner/core/plan.go @@ -21,6 +21,7 @@ import ( "github.com/cznic/mathutil" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/planner/property" + "github.com/pingcap/tidb/planner/util" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/stringutil" @@ -69,9 +70,9 @@ func enforceProperty(p *property.PhysicalProperty, tsk task, ctx sessionctx.Cont } tsk = finishCopTask(ctx, tsk) sortReqProp := &property.PhysicalProperty{TaskTp: property.RootTaskType, Items: p.Items, ExpectedCnt: math.MaxFloat64} - sort := PhysicalSort{ByItems: make([]*ByItems, 0, len(p.Items))}.Init(ctx, tsk.plan().statsInfo(), tsk.plan().SelectBlockOffset(), sortReqProp) + sort := PhysicalSort{ByItems: make([]*util.ByItems, 0, len(p.Items))}.Init(ctx, tsk.plan().statsInfo(), tsk.plan().SelectBlockOffset(), sortReqProp) for _, col := range p.Items { - sort.ByItems = append(sort.ByItems, &ByItems{col.Col, col.Desc}) + sort.ByItems = append(sort.ByItems, &util.ByItems{Expr: col.Col, Desc: col.Desc}) } return sort.attach2Task(tsk) } diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index 61d917c7f69cb..f1016959e762e 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -1742,7 +1742,7 @@ func (b *PlanBuilder) buildShow(ctx context.Context, show *ast.ShowStmt) (Plan, b.curClause = orderByClause orderByCol := np.Schema().Columns[0].Clone().(*expression.Column) sort := LogicalSort{ - ByItems: []*ByItems{{Expr: orderByCol}}, + ByItems: []*util.ByItems{{Expr: orderByCol}}, }.Init(b.ctx, b.getSelectOffset()) sort.SetChildren(np) np = sort diff --git a/planner/core/property_cols_prune.go b/planner/core/property_cols_prune.go index 98779c6c1e30d..adabea29fc15e 100644 --- a/planner/core/property_cols_prune.go +++ b/planner/core/property_cols_prune.go @@ -15,6 +15,7 @@ package core import ( "github.com/pingcap/tidb/expression" + "github.com/pingcap/tidb/planner/util" ) // preparePossibleProperties traverses the plan tree by a post-order method, @@ -114,7 +115,7 @@ func (p *LogicalTopN) PreparePossibleProperties(schema *expression.Schema, child return [][]*expression.Column{propCols} } -func getPossiblePropertyFromByItems(items []*ByItems) []*expression.Column { +func getPossiblePropertyFromByItems(items []*util.ByItems) []*expression.Column { cols := make([]*expression.Column, 0, len(items)) for _, item := range items { if col, ok := item.Expr.(*expression.Column); ok { diff --git a/planner/core/resolve_indices.go b/planner/core/resolve_indices.go index f17e7582cb444..c9e67a95a3cba 100644 --- a/planner/core/resolve_indices.go +++ b/planner/core/resolve_indices.go @@ -343,6 +343,12 @@ func (p *basePhysicalAgg) ResolveIndices() (err error) { return err } } + for _, byItem := range aggFun.OrderByItems { + byItem.Expr, err = byItem.Expr.ResolveIndices(p.children[0].Schema()) + if err != nil { + return err + } + } } for i, item := range p.GroupByItems { p.GroupByItems[i], err = item.ResolveIndices(p.children[0].Schema()) diff --git a/planner/core/rule_aggregation_push_down.go b/planner/core/rule_aggregation_push_down.go index 2a8a5109e9891..179a2d311f414 100644 --- a/planner/core/rule_aggregation_push_down.go +++ b/planner/core/rule_aggregation_push_down.go @@ -33,7 +33,10 @@ type aggregationPushDownSolver struct { // It's easy to see that max, min, first row is decomposable, no matter whether it's distinct, but sum(distinct) and // count(distinct) is not. // Currently we don't support avg and concat. -func (a *aggregationPushDownSolver) isDecomposable(fun *aggregation.AggFuncDesc) bool { +func (a *aggregationPushDownSolver) isDecomposableWithJoin(fun *aggregation.AggFuncDesc) bool { + if len(fun.OrderByItems) > 0 { + return false + } switch fun.Name { case ast.AggFuncAvg, ast.AggFuncGroupConcat, ast.AggFuncVarPop, ast.AggFuncJsonObjectAgg: // TODO: Support avg push down. @@ -47,6 +50,22 @@ func (a *aggregationPushDownSolver) isDecomposable(fun *aggregation.AggFuncDesc) } } +func (a *aggregationPushDownSolver) isDecomposableWithUnion(fun *aggregation.AggFuncDesc) bool { + if len(fun.OrderByItems) > 0 { + return false + } + switch fun.Name { + case ast.AggFuncGroupConcat, ast.AggFuncVarPop, ast.AggFuncJsonObjectAgg: + return false + case ast.AggFuncMax, ast.AggFuncMin, ast.AggFuncFirstRow: + return true + case ast.AggFuncSum, ast.AggFuncCount, ast.AggFuncAvg: + return true + default: + return false + } +} + // getAggFuncChildIdx gets which children it belongs to, 0 stands for left, 1 stands for right, -1 stands for both. func (a *aggregationPushDownSolver) getAggFuncChildIdx(aggFunc *aggregation.AggFuncDesc, schema *expression.Schema) int { fromLeft, fromRight := false, false @@ -74,7 +93,7 @@ func (a *aggregationPushDownSolver) collectAggFuncs(agg *LogicalAggregation, joi valid = true leftChild := join.children[0] for _, aggFunc := range agg.AggFuncs { - if !a.isDecomposable(aggFunc) { + if !a.isDecomposableWithJoin(aggFunc) { return false, nil, nil } index := a.getAggFuncChildIdx(aggFunc, leftChild.Schema()) @@ -403,6 +422,11 @@ func (a *aggregationPushDownSolver) aggPushDown(p LogicalPlan) (_ LogicalPlan, e projChild := proj.children[0] agg.SetChildren(projChild) } else if union, ok1 := child.(*LogicalUnionAll); ok1 { + for _, aggFunc := range agg.AggFuncs { + if !a.isDecomposableWithUnion(aggFunc) { + return p, nil + } + } pushedAgg := a.splitPartialAgg(agg) newChildren := make([]LogicalPlan, 0, len(union.children)) for _, child := range union.children { diff --git a/planner/core/rule_column_pruning.go b/planner/core/rule_column_pruning.go index 2ec433bdffa62..f29d59031b126 100644 --- a/planner/core/rule_column_pruning.go +++ b/planner/core/rule_column_pruning.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/expression/aggregation" + "github.com/pingcap/tidb/planner/util" "github.com/pingcap/tidb/types" ) @@ -97,6 +98,10 @@ func (la *LogicalAggregation) PruneColumns(parentUsedCols []*expression.Column) var selfUsedCols []*expression.Column for _, aggrFunc := range la.AggFuncs { selfUsedCols = expression.ExtractColumnsFromExpressions(selfUsedCols, aggrFunc.Args, nil) + + var cols []*expression.Column + aggrFunc.OrderByItems, cols = pruneByItems(aggrFunc.OrderByItems) + selfUsedCols = append(selfUsedCols, cols...) } if len(la.AggFuncs) == 0 { // If all the aggregate functions are pruned, we should add an aggregate function to keep the correctness. @@ -130,24 +135,32 @@ func (la *LogicalAggregation) PruneColumns(parentUsedCols []*expression.Column) return child.PruneColumns(selfUsedCols) } -// PruneColumns implements LogicalPlan interface. -// If any expression can view as a constant in execution stage, such as correlated column, constant, -// we do prune them. Note that we can't prune the expressions contain non-deterministic functions, such as rand(). -func (ls *LogicalSort) PruneColumns(parentUsedCols []*expression.Column) error { - child := ls.children[0] - for i := len(ls.ByItems) - 1; i >= 0; i-- { - cols := expression.ExtractColumns(ls.ByItems[i].Expr) +func pruneByItems(old []*util.ByItems) (new []*util.ByItems, parentUsedCols []*expression.Column) { + new = make([]*util.ByItems, 0, len(old)) + for _, byItem := range old { + cols := expression.ExtractColumns(byItem.Expr) if len(cols) == 0 { - if !expression.IsRuntimeConstExpr(ls.ByItems[i].Expr) { - continue + if !expression.IsRuntimeConstExpr(byItem.Expr) { + new = append(new, byItem) } - ls.ByItems = append(ls.ByItems[:i], ls.ByItems[i+1:]...) - } else if ls.ByItems[i].Expr.GetType().Tp == mysql.TypeNull { - ls.ByItems = append(ls.ByItems[:i], ls.ByItems[i+1:]...) + } else if byItem.Expr.GetType().Tp == mysql.TypeNull { + // do nothing, should be filtered } else { parentUsedCols = append(parentUsedCols, cols...) + new = append(new, byItem) } } + return +} + +// PruneColumns implements LogicalPlan interface. +// If any expression can view as a constant in execution stage, such as correlated column, constant, +// we do prune them. Note that we can't prune the expressions contain non-deterministic functions, such as rand(). +func (ls *LogicalSort) PruneColumns(parentUsedCols []*expression.Column) error { + child := ls.children[0] + var cols []*expression.Column + ls.ByItems, cols = pruneByItems(ls.ByItems) + parentUsedCols = append(parentUsedCols, cols...) return child.PruneColumns(parentUsedCols) } @@ -156,19 +169,9 @@ func (ls *LogicalSort) PruneColumns(parentUsedCols []*expression.Column) error { // we do prune them. Note that we can't prune the expressions contain non-deterministic functions, such as rand(). func (lt *LogicalTopN) PruneColumns(parentUsedCols []*expression.Column) error { child := lt.children[0] - for i := len(lt.ByItems) - 1; i >= 0; i-- { - cols := expression.ExtractColumns(lt.ByItems[i].Expr) - if len(cols) == 0 { - if expression.IsMutableEffectsExpr(lt.ByItems[i].Expr) { - continue - } - lt.ByItems = append(lt.ByItems[:i], lt.ByItems[i+1:]...) - } else if lt.ByItems[i].Expr.GetType().Tp == mysql.TypeNull { - lt.ByItems = append(lt.ByItems[:i], lt.ByItems[i+1:]...) - } else { - parentUsedCols = append(parentUsedCols, cols...) - } - } + var cols []*expression.Column + lt.ByItems, cols = pruneByItems(lt.ByItems) + parentUsedCols = append(parentUsedCols, cols...) return child.PruneColumns(parentUsedCols) } diff --git a/planner/core/rule_inject_extra_projection.go b/planner/core/rule_inject_extra_projection.go index b13278750d72e..bee2e77b63e15 100644 --- a/planner/core/rule_inject_extra_projection.go +++ b/planner/core/rule_inject_extra_projection.go @@ -16,6 +16,7 @@ package core import ( "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/expression/aggregation" + "github.com/pingcap/tidb/planner/util" "github.com/pingcap/tidb/sessionctx" ) @@ -78,6 +79,10 @@ func InjectProjBelowAgg(aggPlan PhysicalPlan, aggFuncs []*aggregation.AggFuncDes _, isScalarFunc := arg.(*expression.ScalarFunction) hasScalarFunc = hasScalarFunc || isScalarFunc } + for _, byItem := range aggFuncs[i].OrderByItems { + _, isScalarFunc := byItem.Expr.(*expression.ScalarFunction) + hasScalarFunc = hasScalarFunc || isScalarFunc + } } for i := 0; !hasScalarFunc && i < len(groupByItems); i++ { _, isScalarFunc := groupByItems[i].(*expression.ScalarFunction) @@ -106,6 +111,20 @@ func InjectProjBelowAgg(aggPlan PhysicalPlan, aggFuncs []*aggregation.AggFuncDes f.Args[i] = newArg cursor++ } + for _, byItem := range f.OrderByItems { + if _, isCnst := byItem.Expr.(*expression.Constant); isCnst { + continue + } + projExprs = append(projExprs, byItem.Expr) + newArg := &expression.Column{ + UniqueID: aggPlan.SCtx().GetSessionVars().AllocPlanColumnID(), + RetType: byItem.Expr.GetType(), + Index: cursor, + } + projSchemaCols = append(projSchemaCols, newArg) + byItem.Expr = newArg + cursor++ + } } for i, item := range groupByItems { @@ -143,7 +162,7 @@ func InjectProjBelowAgg(aggPlan PhysicalPlan, aggFuncs []*aggregation.AggFuncDes // PhysicalTopN, some extra columns will be added into the schema of the // Projection, thus we need to add another Projection upon them to prune the // redundant columns. -func InjectProjBelowSort(p PhysicalPlan, orderByItems []*ByItems) PhysicalPlan { +func InjectProjBelowSort(p PhysicalPlan, orderByItems []*util.ByItems) PhysicalPlan { hasScalarFunc, numOrderByItems := false, len(orderByItems) for i := 0; !hasScalarFunc && i < numOrderByItems; i++ { _, isScalarFunc := orderByItems[i].Expr.(*expression.ScalarFunction) @@ -209,7 +228,7 @@ func InjectProjBelowSort(p PhysicalPlan, orderByItems []*ByItems) PhysicalPlan { // TurnNominalSortIntoProj will turn nominal sort into two projections. This is to check if the scalar functions will // overflow. -func TurnNominalSortIntoProj(p PhysicalPlan, onlyColumn bool, orderByItems []*ByItems) PhysicalPlan { +func TurnNominalSortIntoProj(p PhysicalPlan, onlyColumn bool, orderByItems []*util.ByItems) PhysicalPlan { if onlyColumn { return p.Children()[0] } diff --git a/planner/core/rule_max_min_eliminate.go b/planner/core/rule_max_min_eliminate.go index 8639a1e1df872..710d2ed301e97 100644 --- a/planner/core/rule_max_min_eliminate.go +++ b/planner/core/rule_max_min_eliminate.go @@ -175,7 +175,7 @@ func (a *maxMinEliminator) eliminateSingleMaxMin(agg *LogicalAggregation) *Logic desc := f.Name == ast.AggFuncMax // Compose Sort operator. sort := LogicalSort{}.Init(ctx, agg.blockOffset) - sort.ByItems = append(sort.ByItems, &ByItems{f.Args[0], desc}) + sort.ByItems = append(sort.ByItems, &util.ByItems{Expr: f.Args[0], Desc: desc}) sort.SetChildren(child) child = sort } diff --git a/planner/core/rule_topn_push_down.go b/planner/core/rule_topn_push_down.go index 53663713ef17a..851ff90c0516e 100644 --- a/planner/core/rule_topn_push_down.go +++ b/planner/core/rule_topn_push_down.go @@ -18,6 +18,7 @@ import ( "github.com/cznic/mathutil" "github.com/pingcap/tidb/expression" + "github.com/pingcap/tidb/planner/util" ) // pushDownTopNOptimizer pushes down the topN or limit. In the future we will remove the limit from `requiredProperty` in CBO phase. @@ -95,7 +96,7 @@ func (p *LogicalUnionAll) pushDownTopN(topN *LogicalTopN) LogicalPlan { if topN != nil { newTopN = LogicalTopN{Count: topN.Count + topN.Offset}.Init(p.ctx, topN.blockOffset) for _, by := range topN.ByItems { - newTopN.ByItems = append(newTopN.ByItems, &ByItems{by.Expr, by.Desc}) + newTopN.ByItems = append(newTopN.ByItems, &util.ByItems{Expr: by.Expr, Desc: by.Desc}) } } p.children[i] = child.pushDownTopN(newTopN) @@ -153,7 +154,7 @@ func (p *LogicalJoin) pushDownTopNToChild(topN *LogicalTopN, idx int) LogicalPla newTopN := LogicalTopN{ Count: topN.Count + topN.Offset, - ByItems: make([]*ByItems, len(topN.ByItems)), + ByItems: make([]*util.ByItems, len(topN.ByItems)), }.Init(topN.ctx, topN.blockOffset) for i := range topN.ByItems { newTopN.ByItems[i] = topN.ByItems[i].Clone() diff --git a/planner/core/task.go b/planner/core/task.go index f5a4bfe3b1c19..6f5ec293c326b 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/tidb/expression/aggregation" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/planner/property" + "github.com/pingcap/tidb/planner/util" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/types" @@ -866,7 +867,7 @@ func (p *NominalSort) attach2Task(tasks ...task) task { } func (p *PhysicalTopN) getPushedDownTopN(childPlan PhysicalPlan) *PhysicalTopN { - newByItems := make([]*ByItems, 0, len(p.ByItems)) + newByItems := make([]*util.ByItems, 0, len(p.ByItems)) for _, expr := range p.ByItems { newByItems = append(newByItems, expr.Clone()) } diff --git a/planner/core/testdata/plan_suite_in.json b/planner/core/testdata/plan_suite_in.json index 7607ad9e5b394..284832e91d601 100644 --- a/planner/core/testdata/plan_suite_in.json +++ b/planner/core/testdata/plan_suite_in.json @@ -573,6 +573,15 @@ "select /*+ HASH_AGG(), AGG_TO_COP() */ count(distinct a) from (select * from ta union all select * from tb) t;" ] }, + { + "name": "TestGroupConcatOrderby", + "cases": [ + "select /*+ agg_to_cop */ group_concat(name ORDER BY name desc SEPARATOR '++'), group_concat(id ORDER BY name desc, id asc SEPARATOR '--') from test;", + "select /*+ agg_to_cop */ group_concat(name ORDER BY name desc SEPARATOR '++'), group_concat(id ORDER BY name desc, id asc SEPARATOR '--') from ptest;", + "select /*+ agg_to_cop */ group_concat(distinct name order by name desc) from test;", + "select /*+ agg_to_cop */ group_concat(distinct name order by name desc) from ptest;" + ] + }, { "name": "TestDAGPlanBuilderWindow", "cases":[ diff --git a/planner/core/testdata/plan_suite_out.json b/planner/core/testdata/plan_suite_out.json index 0523ce2d4594c..aa7e8a2706e96 100644 --- a/planner/core/testdata/plan_suite_out.json +++ b/planner/core/testdata/plan_suite_out.json @@ -1741,6 +1741,65 @@ } ] }, + { + "Name": "TestGroupConcatOrderby", + "Cases": [ + { + "SQL": "select /*+ agg_to_cop */ group_concat(name ORDER BY name desc SEPARATOR '++'), group_concat(id ORDER BY name desc, id asc SEPARATOR '--') from test;", + "Plan": [ + "HashAgg_5 1.00 root funcs:group_concat(Column#6 order by Column#7 desc separator \"++\")->Column#4, funcs:group_concat(Column#8 order by Column#9 desc, Column#10 asc separator \"--\")->Column#5", + "└─Projection_18 10000.00 root cast(test.test.name, var_string(20))->Column#6, test.test.name, cast(test.test.id, var_string(20))->Column#8, test.test.name, test.test.id", + " └─TableReader_11 10000.00 root data:TableFullScan_10", + " └─TableFullScan_10 10000.00 cop[tikv] table:test keep order:false, stats:pseudo" + ], + "Result": [ + "500++200++30++20++20++10 3--3--1--1--2--1" + ] + }, + { + "SQL": "select /*+ agg_to_cop */ group_concat(name ORDER BY name desc SEPARATOR '++'), group_concat(id ORDER BY name desc, id asc SEPARATOR '--') from ptest;", + "Plan": [ + "HashAgg_10 1.00 root funcs:group_concat(Column#6 order by Column#7 desc separator \"++\")->Column#4, funcs:group_concat(Column#8 order by Column#9 desc, Column#10 asc separator \"--\")->Column#5", + "└─Projection_23 20000.00 root cast(test.ptest.name, var_string(20))->Column#6, test.ptest.name, cast(test.ptest.id, var_string(20))->Column#8, test.ptest.name, test.ptest.id", + " └─Union_13 20000.00 root ", + " ├─TableReader_15 10000.00 root data:TableFullScan_14", + " │ └─TableFullScan_14 10000.00 cop[tikv] table:ptest, partition:p0 keep order:false, stats:pseudo", + " └─TableReader_17 10000.00 root data:TableFullScan_16", + " └─TableFullScan_16 10000.00 cop[tikv] table:ptest, partition:p1 keep order:false, stats:pseudo" + ], + "Result": [ + "500++200++30++20++20++10 3--3--1--1--2--1" + ] + }, + { + "SQL": "select /*+ agg_to_cop */ group_concat(distinct name order by name desc) from test;", + "Plan": [ + "HashAgg_5 1.00 root funcs:group_concat(distinct Column#5 order by Column#6 desc separator \",\")->Column#4", + "└─Projection_9 10000.00 root cast(test.test.name, var_string(20))->Column#5, test.test.name", + " └─TableReader_8 10000.00 root data:TableFullScan_7", + " └─TableFullScan_7 10000.00 cop[tikv] table:test keep order:false, stats:pseudo" + ], + "Result": [ + "500,200,30,20,10" + ] + }, + { + "SQL": "select /*+ agg_to_cop */ group_concat(distinct name order by name desc) from ptest;", + "Plan": [ + "StreamAgg_9 1.00 root funcs:group_concat(distinct Column#5 order by Column#6 desc separator \",\")->Column#4", + "└─Projection_20 20000.00 root cast(test.ptest.name, var_string(20))->Column#5, test.ptest.name", + " └─Union_15 20000.00 root ", + " ├─TableReader_17 10000.00 root data:TableFullScan_16", + " │ └─TableFullScan_16 10000.00 cop[tikv] table:ptest, partition:p0 keep order:false, stats:pseudo", + " └─TableReader_19 10000.00 root data:TableFullScan_18", + " └─TableFullScan_18 10000.00 cop[tikv] table:ptest, partition:p1 keep order:false, stats:pseudo" + ], + "Result": [ + "500,200,30,20,10" + ] + } + ] + }, { "Name": "TestDAGPlanBuilderWindow", "Cases": [ diff --git a/planner/util/byitem.go b/planner/util/byitem.go new file mode 100644 index 0000000000000..550bb93572cbe --- /dev/null +++ b/planner/util/byitem.go @@ -0,0 +1,45 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + "fmt" + + "github.com/pingcap/tidb/expression" + "github.com/pingcap/tidb/sessionctx" +) + +// ByItems wraps a "by" item. +type ByItems struct { + Expr expression.Expression + Desc bool +} + +// String implements fmt.Stringer interface. +func (by *ByItems) String() string { + if by.Desc { + return fmt.Sprintf("%s true", by.Expr) + } + return by.Expr.String() +} + +// Clone makes a copy of ByItems. +func (by *ByItems) Clone() *ByItems { + return &ByItems{Expr: by.Expr.Clone(), Desc: by.Desc} +} + +// Equal checks whether two ByItems are equal. +func (by *ByItems) Equal(ctx sessionctx.Context, other *ByItems) bool { + return by.Expr.Equal(ctx, other.Expr) && by.Desc == other.Desc +} diff --git a/sessionctx/variable/varsutil.go b/sessionctx/variable/varsutil.go index 313208a65e8f2..13df56f4e0e47 100644 --- a/sessionctx/variable/varsutil.go +++ b/sessionctx/variable/varsutil.go @@ -23,6 +23,7 @@ import ( "sync/atomic" "time" + "github.com/cznic/mathutil" "github.com/pingcap/errors" "github.com/pingcap/parser/ast" "github.com/pingcap/parser/mysql" @@ -322,9 +323,15 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string, scope Sc } return value, ErrWrongValueForVar.GenWithStackByArgs(name, value) case GroupConcatMaxLen: - // The reasonable range of 'group_concat_max_len' is 4~18446744073709551615(64-bit platforms) - // See https://dev.mysql.com/doc/refman/8.0/en/server-system-variables.html#sysvar_group_concat_max_len for details - return checkUInt64SystemVar(name, value, 4, math.MaxUint64, vars) + // https://dev.mysql.com/doc/refman/8.0/en/server-system-variables.html#sysvar_group_concat_max_len + // Minimum Value 4 + // Maximum Value (64-bit platforms) 18446744073709551615 + // Maximum Value (32-bit platforms) 4294967295 + maxLen := uint64(math.MaxUint64) + if mathutil.IntBits == 32 { + maxLen = uint64(math.MaxUint32) + } + return checkUInt64SystemVar(name, value, 4, maxLen, vars) case InteractiveTimeout: return checkUInt64SystemVar(name, value, 1, secondsPerYear, vars) case InnodbCommitConcurrency: