Skip to content

Commit

Permalink
executor: support GROUP_CONCAT(ORDER BY) (#16591)
Browse files Browse the repository at this point in the history
  • Loading branch information
SunRunAway authored May 6, 2020
1 parent 2daee41 commit 7ebcc20
Show file tree
Hide file tree
Showing 41 changed files with 948 additions and 141 deletions.
4 changes: 2 additions & 2 deletions cmd/explaintest/r/explain.result
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ set session tidb_hashagg_partial_concurrency = 1;
set session tidb_hashagg_final_concurrency = 1;
explain select group_concat(a) from t group by id;
id estRows task access object operator info
StreamAgg_8 8000.00 root group by:Column#6, funcs:group_concat(Column#5, ",")->Column#4
StreamAgg_8 8000.00 root group by:Column#6, funcs:group_concat(Column#5 separator ",")->Column#4
└─Projection_18 10000.00 root cast(test.t.a, var_string(20))->Column#5, test.t.id
└─TableReader_15 10000.00 root data:TableFullScan_14
└─TableFullScan_14 10000.00 cop[tikv] table:t keep order:true, stats:pseudo
explain select group_concat(a, b) from t group by id;
id estRows task access object operator info
StreamAgg_8 8000.00 root group by:Column#7, funcs:group_concat(Column#5, Column#6, ",")->Column#4
StreamAgg_8 8000.00 root group by:Column#7, funcs:group_concat(Column#5, Column#6 separator ",")->Column#4
└─Projection_18 10000.00 root cast(test.t.a, var_string(20))->Column#5, cast(test.t.b, var_string(20))->Column#6, test.t.id
└─TableReader_15 10000.00 root data:TableFullScan_14
└─TableFullScan_14 10000.00 cop[tikv] table:t keep order:true, stats:pseudo
Expand Down
79 changes: 69 additions & 10 deletions executor/aggfuncs/aggfunc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/tidb/executor/aggfuncs"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/expression/aggregation"
"github.com/pingcap/tidb/planner/util"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/types/json"
Expand Down Expand Up @@ -68,6 +69,7 @@ type aggTest struct {
dataGen func(i int) types.Datum
funcName string
results []types.Datum
orderBy bool
}

type multiArgsAggTest struct {
Expand All @@ -77,6 +79,7 @@ type multiArgsAggTest struct {
dataGens []func(i int) types.Datum
funcName string
results []types.Datum
orderBy bool
}

func (s *testSuite) testMergePartialResult(c *C, p aggTest) {
Expand All @@ -93,6 +96,11 @@ func (s *testSuite) testMergePartialResult(c *C, p aggTest) {
}
desc, err := aggregation.NewAggFuncDesc(s.ctx, p.funcName, args, false)
c.Assert(err, IsNil)
if p.orderBy {
desc.OrderByItems = []*util.ByItems{
{Expr: args[0], Desc: true},
}
}
partialDesc, finalDesc := desc.Split([]int{0, 1})

// build partial func for partial phase.
Expand All @@ -112,7 +120,7 @@ func (s *testSuite) testMergePartialResult(c *C, p aggTest) {
dt := resultChk.GetRow(0).GetDatum(0, p.dataType)
result, err := dt.CompareDatum(s.ctx.GetSessionVars().StmtCtx, &p.results[0])
c.Assert(err, IsNil)
c.Assert(result, Equals, 0)
c.Assert(result, Equals, 0, Commentf("%v != %v", dt.String(), p.results[0]))

err = finalFunc.MergePartialResult(s.ctx, partialResult, finalPr)
c.Assert(err, IsNil)
Expand All @@ -128,7 +136,7 @@ func (s *testSuite) testMergePartialResult(c *C, p aggTest) {
dt = resultChk.GetRow(0).GetDatum(0, p.dataType)
result, err = dt.CompareDatum(s.ctx.GetSessionVars().StmtCtx, &p.results[1])
c.Assert(err, IsNil)
c.Assert(result, Equals, 0)
c.Assert(result, Equals, 0, Commentf("%v != %v", dt.String(), p.results[1]))
err = finalFunc.MergePartialResult(s.ctx, partialResult, finalPr)
c.Assert(err, IsNil)

Expand All @@ -139,7 +147,7 @@ func (s *testSuite) testMergePartialResult(c *C, p aggTest) {
dt = resultChk.GetRow(0).GetDatum(0, p.dataType)
result, err = dt.CompareDatum(s.ctx.GetSessionVars().StmtCtx, &p.results[2])
c.Assert(err, IsNil)
c.Assert(result, Equals, 0)
c.Assert(result, Equals, 0, Commentf("%v != %v", dt.String(), p.results[2]))
}

func buildAggTester(funcName string, tp byte, numRows int, results ...interface{}) aggTest {
Expand Down Expand Up @@ -176,6 +184,11 @@ func (s *testSuite) testMultiArgsMergePartialResult(c *C, p multiArgsAggTest) {

desc, err := aggregation.NewAggFuncDesc(s.ctx, p.funcName, args, false)
c.Assert(err, IsNil)
if p.orderBy {
desc.OrderByItems = []*util.ByItems{
{Expr: args[0], Desc: true},
}
}
partialDesc, finalDesc := desc.Split([]int{0, 1})

// build partial func for partial phase.
Expand Down Expand Up @@ -300,6 +313,11 @@ func (s *testSuite) testAggFunc(c *C, p aggTest) {
}
desc, err := aggregation.NewAggFuncDesc(s.ctx, p.funcName, args, false)
c.Assert(err, IsNil)
if p.orderBy {
desc.OrderByItems = []*util.ByItems{
{Expr: args[0], Desc: true},
}
}
finalFunc := aggfuncs.Build(s.ctx, desc, 0)
finalPr := finalFunc.AllocPartialResult()
resultChk := chunk.NewChunkWithCapacity([]*types.FieldType{desc.RetTp}, 1)
Expand All @@ -312,7 +330,7 @@ func (s *testSuite) testAggFunc(c *C, p aggTest) {
dt := resultChk.GetRow(0).GetDatum(0, desc.RetTp)
result, err := dt.CompareDatum(s.ctx.GetSessionVars().StmtCtx, &p.results[1])
c.Assert(err, IsNil)
c.Assert(result, Equals, 0)
c.Assert(result, Equals, 0, Commentf("%v != %v", dt.String(), p.results[1]))

// test the empty input
resultChk.Reset()
Expand All @@ -321,11 +339,16 @@ func (s *testSuite) testAggFunc(c *C, p aggTest) {
dt = resultChk.GetRow(0).GetDatum(0, desc.RetTp)
result, err = dt.CompareDatum(s.ctx.GetSessionVars().StmtCtx, &p.results[0])
c.Assert(err, IsNil)
c.Assert(result, Equals, 0)
c.Assert(result, Equals, 0, Commentf("%v != %v", dt.String(), p.results[0]))

// test the agg func with distinct
desc, err = aggregation.NewAggFuncDesc(s.ctx, p.funcName, args, true)
c.Assert(err, IsNil)
if p.orderBy {
desc.OrderByItems = []*util.ByItems{
{Expr: args[0], Desc: true},
}
}
finalFunc = aggfuncs.Build(s.ctx, desc, 0)
finalPr = finalFunc.AllocPartialResult()

Expand All @@ -341,7 +364,7 @@ func (s *testSuite) testAggFunc(c *C, p aggTest) {
dt = resultChk.GetRow(0).GetDatum(0, desc.RetTp)
result, err = dt.CompareDatum(s.ctx.GetSessionVars().StmtCtx, &p.results[1])
c.Assert(err, IsNil)
c.Assert(result, Equals, 0)
c.Assert(result, Equals, 0, Commentf("%v != %v", dt.String(), p.results[1]))

// test the empty input
resultChk.Reset()
Expand All @@ -350,7 +373,7 @@ func (s *testSuite) testAggFunc(c *C, p aggTest) {
dt = resultChk.GetRow(0).GetDatum(0, desc.RetTp)
result, err = dt.CompareDatum(s.ctx.GetSessionVars().StmtCtx, &p.results[0])
c.Assert(err, IsNil)
c.Assert(result, Equals, 0)
c.Assert(result, Equals, 0, Commentf("%v != %v", dt.String(), p.results[0]))
}

func (s *testSuite) testMultiArgsAggFunc(c *C, p multiArgsAggTest) {
Expand All @@ -367,9 +390,17 @@ func (s *testSuite) testMultiArgsAggFunc(c *C, p multiArgsAggTest) {
for k := 0; k < len(p.dataTypes); k++ {
args[k] = &expression.Column{RetType: p.dataTypes[k], Index: k}
}
if p.funcName == ast.AggFuncGroupConcat {
args = append(args, &expression.Constant{Value: types.NewStringDatum(" "), RetType: types.NewFieldType(mysql.TypeString)})
}

desc, err := aggregation.NewAggFuncDesc(s.ctx, p.funcName, args, false)
c.Assert(err, IsNil)
if p.orderBy {
desc.OrderByItems = []*util.ByItems{
{Expr: args[0], Desc: true},
}
}
finalFunc := aggfuncs.Build(s.ctx, desc, 0)
finalPr := finalFunc.AllocPartialResult()
resultChk := chunk.NewChunkWithCapacity([]*types.FieldType{desc.RetTp}, 1)
Expand All @@ -382,7 +413,7 @@ func (s *testSuite) testMultiArgsAggFunc(c *C, p multiArgsAggTest) {
dt := resultChk.GetRow(0).GetDatum(0, desc.RetTp)
result, err := dt.CompareDatum(s.ctx.GetSessionVars().StmtCtx, &p.results[1])
c.Assert(err, IsNil)
c.Assert(result, Equals, 0)
c.Assert(result, Equals, 0, Commentf("%v != %v", dt.String(), p.results[1]))

// test the empty input
resultChk.Reset()
Expand All @@ -391,11 +422,16 @@ func (s *testSuite) testMultiArgsAggFunc(c *C, p multiArgsAggTest) {
dt = resultChk.GetRow(0).GetDatum(0, desc.RetTp)
result, err = dt.CompareDatum(s.ctx.GetSessionVars().StmtCtx, &p.results[0])
c.Assert(err, IsNil)
c.Assert(result, Equals, 0)
c.Assert(result, Equals, 0, Commentf("%v != %v", dt.String(), p.results[0]))

// test the agg func with distinct
desc, err = aggregation.NewAggFuncDesc(s.ctx, p.funcName, args, true)
c.Assert(err, IsNil)
if p.orderBy {
desc.OrderByItems = []*util.ByItems{
{Expr: args[0], Desc: true},
}
}
finalFunc = aggfuncs.Build(s.ctx, desc, 0)
finalPr = finalFunc.AllocPartialResult()

Expand All @@ -411,7 +447,7 @@ func (s *testSuite) testMultiArgsAggFunc(c *C, p multiArgsAggTest) {
dt = resultChk.GetRow(0).GetDatum(0, desc.RetTp)
result, err = dt.CompareDatum(s.ctx.GetSessionVars().StmtCtx, &p.results[1])
c.Assert(err, IsNil)
c.Assert(result, Equals, 0)
c.Assert(result, Equals, 0, Commentf("%v != %v", dt.String(), p.results[1]))

// test the empty input
resultChk.Reset()
Expand Down Expand Up @@ -439,6 +475,11 @@ func (s *testSuite) benchmarkAggFunc(b *testing.B, p aggTest) {
if err != nil {
b.Fatal(err)
}
if p.orderBy {
desc.OrderByItems = []*util.ByItems{
{Expr: args[0], Desc: true},
}
}
finalFunc := aggfuncs.Build(s.ctx, desc, 0)
resultChk := chunk.NewChunkWithCapacity([]*types.FieldType{desc.RetTp}, 1)
iter := chunk.NewIterator4Chunk(srcChk)
Expand All @@ -454,6 +495,11 @@ func (s *testSuite) benchmarkAggFunc(b *testing.B, p aggTest) {
if err != nil {
b.Fatal(err)
}
if p.orderBy {
desc.OrderByItems = []*util.ByItems{
{Expr: args[0], Desc: true},
}
}
finalFunc = aggfuncs.Build(s.ctx, desc, 0)
resultChk.Reset()
b.Run(fmt.Sprintf("%v(distinct)/%v", p.funcName, p.dataType), func(b *testing.B) {
Expand All @@ -475,11 +521,19 @@ func (s *testSuite) benchmarkMultiArgsAggFunc(b *testing.B, p multiArgsAggTest)
for k := 0; k < len(p.dataTypes); k++ {
args[k] = &expression.Column{RetType: p.dataTypes[k], Index: k}
}
if p.funcName == ast.AggFuncGroupConcat {
args = append(args, &expression.Constant{Value: types.NewStringDatum(" "), RetType: types.NewFieldType(mysql.TypeString)})
}

desc, err := aggregation.NewAggFuncDesc(s.ctx, p.funcName, args, false)
if err != nil {
b.Fatal(err)
}
if p.orderBy {
desc.OrderByItems = []*util.ByItems{
{Expr: args[0], Desc: true},
}
}
finalFunc := aggfuncs.Build(s.ctx, desc, 0)
resultChk := chunk.NewChunkWithCapacity([]*types.FieldType{desc.RetTp}, 1)
iter := chunk.NewIterator4Chunk(srcChk)
Expand All @@ -495,6 +549,11 @@ func (s *testSuite) benchmarkMultiArgsAggFunc(b *testing.B, p multiArgsAggTest)
if err != nil {
b.Fatal(err)
}
if p.orderBy {
desc.OrderByItems = []*util.ByItems{
{Expr: args[0], Desc: true},
}
}
finalFunc = aggfuncs.Build(s.ctx, desc, 0)
resultChk.Reset()
b.Run(fmt.Sprintf("%v(distinct)/%v", p.funcName, p.dataTypes), func(b *testing.B) {
Expand Down
24 changes: 18 additions & 6 deletions executor/aggfuncs/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,10 +323,6 @@ func buildGroupConcat(ctx sessionctx.Context, aggFuncDesc *aggregation.AggFuncDe
case aggregation.DedupMode:
return nil
default:
base := baseAggFunc{
args: aggFuncDesc.Args[:len(aggFuncDesc.Args)-1],
ordinal: ordinal,
}
// The last arg is promised to be a not-null string constant, so the error can be ignored.
c, _ := aggFuncDesc.Args[len(aggFuncDesc.Args)-1].(*expression.Constant)
sep, _, err := c.EvalString(nil, chunk.Row{})
Expand All @@ -345,10 +341,26 @@ func buildGroupConcat(ctx sessionctx.Context, aggFuncDesc *aggregation.AggFuncDe
panic(fmt.Sprintf("Error happened when buildGroupConcat: %s", err.Error()))
}
var truncated int32
base := baseGroupConcat4String{
baseAggFunc: baseAggFunc{
args: aggFuncDesc.Args[:len(aggFuncDesc.Args)-1],
ordinal: ordinal,
},
byItems: aggFuncDesc.OrderByItems,
sep: sep,
maxLen: maxLen,
truncated: &truncated,
}
if aggFuncDesc.HasDistinct {
return &groupConcatDistinct{baseGroupConcat4String{baseAggFunc: base, sep: sep, maxLen: maxLen, truncated: &truncated}}
if len(aggFuncDesc.OrderByItems) > 0 {
return &groupConcatDistinctOrder{base}
}
return &groupConcatDistinct{base}
}
if len(aggFuncDesc.OrderByItems) > 0 {
return &groupConcatOrder{base}
}
return &groupConcat{baseGroupConcat4String{baseAggFunc: base, sep: sep, maxLen: maxLen, truncated: &truncated}}
return &groupConcat{base}
}
}

Expand Down
Loading

0 comments on commit 7ebcc20

Please sign in to comment.