Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: support GROUP_CONCAT(ORDER BY) (#16591) #16990

Merged
merged 3 commits into from
May 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/explaintest/r/explain.result
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ set session tidb_hashagg_partial_concurrency = 1;
set session tidb_hashagg_final_concurrency = 1;
explain select group_concat(a) from t group by id;
id estRows task access object operator info
StreamAgg_8 8000.00 root group by:Column#6, funcs:group_concat(Column#5, ",")->Column#4
StreamAgg_8 8000.00 root group by:Column#6, funcs:group_concat(Column#5 separator ",")->Column#4
└─Projection_18 10000.00 root cast(test.t.a, var_string(20))->Column#5, test.t.id
└─TableReader_15 10000.00 root data:TableFullScan_14
└─TableFullScan_14 10000.00 cop[tikv] table:t keep order:true, stats:pseudo
explain select group_concat(a, b) from t group by id;
id estRows task access object operator info
StreamAgg_8 8000.00 root group by:Column#7, funcs:group_concat(Column#5, Column#6, ",")->Column#4
StreamAgg_8 8000.00 root group by:Column#7, funcs:group_concat(Column#5, Column#6 separator ",")->Column#4
└─Projection_18 10000.00 root cast(test.t.a, var_string(20))->Column#5, cast(test.t.b, var_string(20))->Column#6, test.t.id
└─TableReader_15 10000.00 root data:TableFullScan_14
└─TableFullScan_14 10000.00 cop[tikv] table:t keep order:true, stats:pseudo
Expand Down
79 changes: 69 additions & 10 deletions executor/aggfuncs/aggfunc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/tidb/executor/aggfuncs"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/expression/aggregation"
"github.com/pingcap/tidb/planner/util"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/types/json"
Expand Down Expand Up @@ -68,6 +69,7 @@ type aggTest struct {
dataGen func(i int) types.Datum
funcName string
results []types.Datum
orderBy bool
}

type multiArgsAggTest struct {
Expand All @@ -77,6 +79,7 @@ type multiArgsAggTest struct {
dataGens []func(i int) types.Datum
funcName string
results []types.Datum
orderBy bool
}

func (s *testSuite) testMergePartialResult(c *C, p aggTest) {
Expand All @@ -93,6 +96,11 @@ func (s *testSuite) testMergePartialResult(c *C, p aggTest) {
}
desc, err := aggregation.NewAggFuncDesc(s.ctx, p.funcName, args, false)
c.Assert(err, IsNil)
if p.orderBy {
desc.OrderByItems = []*util.ByItems{
{Expr: args[0], Desc: true},
}
}
partialDesc, finalDesc := desc.Split([]int{0, 1})

// build partial func for partial phase.
Expand All @@ -112,7 +120,7 @@ func (s *testSuite) testMergePartialResult(c *C, p aggTest) {
dt := resultChk.GetRow(0).GetDatum(0, p.dataType)
result, err := dt.CompareDatum(s.ctx.GetSessionVars().StmtCtx, &p.results[0])
c.Assert(err, IsNil)
c.Assert(result, Equals, 0)
c.Assert(result, Equals, 0, Commentf("%v != %v", dt.String(), p.results[0]))

err = finalFunc.MergePartialResult(s.ctx, partialResult, finalPr)
c.Assert(err, IsNil)
Expand All @@ -128,7 +136,7 @@ func (s *testSuite) testMergePartialResult(c *C, p aggTest) {
dt = resultChk.GetRow(0).GetDatum(0, p.dataType)
result, err = dt.CompareDatum(s.ctx.GetSessionVars().StmtCtx, &p.results[1])
c.Assert(err, IsNil)
c.Assert(result, Equals, 0)
c.Assert(result, Equals, 0, Commentf("%v != %v", dt.String(), p.results[1]))
err = finalFunc.MergePartialResult(s.ctx, partialResult, finalPr)
c.Assert(err, IsNil)

Expand All @@ -139,7 +147,7 @@ func (s *testSuite) testMergePartialResult(c *C, p aggTest) {
dt = resultChk.GetRow(0).GetDatum(0, p.dataType)
result, err = dt.CompareDatum(s.ctx.GetSessionVars().StmtCtx, &p.results[2])
c.Assert(err, IsNil)
c.Assert(result, Equals, 0)
c.Assert(result, Equals, 0, Commentf("%v != %v", dt.String(), p.results[2]))
}

func buildAggTester(funcName string, tp byte, numRows int, results ...interface{}) aggTest {
Expand Down Expand Up @@ -176,6 +184,11 @@ func (s *testSuite) testMultiArgsMergePartialResult(c *C, p multiArgsAggTest) {

desc, err := aggregation.NewAggFuncDesc(s.ctx, p.funcName, args, false)
c.Assert(err, IsNil)
if p.orderBy {
desc.OrderByItems = []*util.ByItems{
{Expr: args[0], Desc: true},
}
}
partialDesc, finalDesc := desc.Split([]int{0, 1})

// build partial func for partial phase.
Expand Down Expand Up @@ -300,6 +313,11 @@ func (s *testSuite) testAggFunc(c *C, p aggTest) {
}
desc, err := aggregation.NewAggFuncDesc(s.ctx, p.funcName, args, false)
c.Assert(err, IsNil)
if p.orderBy {
desc.OrderByItems = []*util.ByItems{
{Expr: args[0], Desc: true},
}
}
finalFunc := aggfuncs.Build(s.ctx, desc, 0)
finalPr := finalFunc.AllocPartialResult()
resultChk := chunk.NewChunkWithCapacity([]*types.FieldType{desc.RetTp}, 1)
Expand All @@ -312,7 +330,7 @@ func (s *testSuite) testAggFunc(c *C, p aggTest) {
dt := resultChk.GetRow(0).GetDatum(0, desc.RetTp)
result, err := dt.CompareDatum(s.ctx.GetSessionVars().StmtCtx, &p.results[1])
c.Assert(err, IsNil)
c.Assert(result, Equals, 0)
c.Assert(result, Equals, 0, Commentf("%v != %v", dt.String(), p.results[1]))

// test the empty input
resultChk.Reset()
Expand All @@ -321,11 +339,16 @@ func (s *testSuite) testAggFunc(c *C, p aggTest) {
dt = resultChk.GetRow(0).GetDatum(0, desc.RetTp)
result, err = dt.CompareDatum(s.ctx.GetSessionVars().StmtCtx, &p.results[0])
c.Assert(err, IsNil)
c.Assert(result, Equals, 0)
c.Assert(result, Equals, 0, Commentf("%v != %v", dt.String(), p.results[0]))

// test the agg func with distinct
desc, err = aggregation.NewAggFuncDesc(s.ctx, p.funcName, args, true)
c.Assert(err, IsNil)
if p.orderBy {
desc.OrderByItems = []*util.ByItems{
{Expr: args[0], Desc: true},
}
}
finalFunc = aggfuncs.Build(s.ctx, desc, 0)
finalPr = finalFunc.AllocPartialResult()

Expand All @@ -341,7 +364,7 @@ func (s *testSuite) testAggFunc(c *C, p aggTest) {
dt = resultChk.GetRow(0).GetDatum(0, desc.RetTp)
result, err = dt.CompareDatum(s.ctx.GetSessionVars().StmtCtx, &p.results[1])
c.Assert(err, IsNil)
c.Assert(result, Equals, 0)
c.Assert(result, Equals, 0, Commentf("%v != %v", dt.String(), p.results[1]))

// test the empty input
resultChk.Reset()
Expand All @@ -350,7 +373,7 @@ func (s *testSuite) testAggFunc(c *C, p aggTest) {
dt = resultChk.GetRow(0).GetDatum(0, desc.RetTp)
result, err = dt.CompareDatum(s.ctx.GetSessionVars().StmtCtx, &p.results[0])
c.Assert(err, IsNil)
c.Assert(result, Equals, 0)
c.Assert(result, Equals, 0, Commentf("%v != %v", dt.String(), p.results[0]))
}

func (s *testSuite) testMultiArgsAggFunc(c *C, p multiArgsAggTest) {
Expand All @@ -367,9 +390,17 @@ func (s *testSuite) testMultiArgsAggFunc(c *C, p multiArgsAggTest) {
for k := 0; k < len(p.dataTypes); k++ {
args[k] = &expression.Column{RetType: p.dataTypes[k], Index: k}
}
if p.funcName == ast.AggFuncGroupConcat {
args = append(args, &expression.Constant{Value: types.NewStringDatum(" "), RetType: types.NewFieldType(mysql.TypeString)})
}

desc, err := aggregation.NewAggFuncDesc(s.ctx, p.funcName, args, false)
c.Assert(err, IsNil)
if p.orderBy {
desc.OrderByItems = []*util.ByItems{
{Expr: args[0], Desc: true},
}
}
finalFunc := aggfuncs.Build(s.ctx, desc, 0)
finalPr := finalFunc.AllocPartialResult()
resultChk := chunk.NewChunkWithCapacity([]*types.FieldType{desc.RetTp}, 1)
Expand All @@ -382,7 +413,7 @@ func (s *testSuite) testMultiArgsAggFunc(c *C, p multiArgsAggTest) {
dt := resultChk.GetRow(0).GetDatum(0, desc.RetTp)
result, err := dt.CompareDatum(s.ctx.GetSessionVars().StmtCtx, &p.results[1])
c.Assert(err, IsNil)
c.Assert(result, Equals, 0)
c.Assert(result, Equals, 0, Commentf("%v != %v", dt.String(), p.results[1]))

// test the empty input
resultChk.Reset()
Expand All @@ -391,11 +422,16 @@ func (s *testSuite) testMultiArgsAggFunc(c *C, p multiArgsAggTest) {
dt = resultChk.GetRow(0).GetDatum(0, desc.RetTp)
result, err = dt.CompareDatum(s.ctx.GetSessionVars().StmtCtx, &p.results[0])
c.Assert(err, IsNil)
c.Assert(result, Equals, 0)
c.Assert(result, Equals, 0, Commentf("%v != %v", dt.String(), p.results[0]))

// test the agg func with distinct
desc, err = aggregation.NewAggFuncDesc(s.ctx, p.funcName, args, true)
c.Assert(err, IsNil)
if p.orderBy {
desc.OrderByItems = []*util.ByItems{
{Expr: args[0], Desc: true},
}
}
finalFunc = aggfuncs.Build(s.ctx, desc, 0)
finalPr = finalFunc.AllocPartialResult()

Expand All @@ -411,7 +447,7 @@ func (s *testSuite) testMultiArgsAggFunc(c *C, p multiArgsAggTest) {
dt = resultChk.GetRow(0).GetDatum(0, desc.RetTp)
result, err = dt.CompareDatum(s.ctx.GetSessionVars().StmtCtx, &p.results[1])
c.Assert(err, IsNil)
c.Assert(result, Equals, 0)
c.Assert(result, Equals, 0, Commentf("%v != %v", dt.String(), p.results[1]))

// test the empty input
resultChk.Reset()
Expand Down Expand Up @@ -439,6 +475,11 @@ func (s *testSuite) benchmarkAggFunc(b *testing.B, p aggTest) {
if err != nil {
b.Fatal(err)
}
if p.orderBy {
desc.OrderByItems = []*util.ByItems{
{Expr: args[0], Desc: true},
}
}
finalFunc := aggfuncs.Build(s.ctx, desc, 0)
resultChk := chunk.NewChunkWithCapacity([]*types.FieldType{desc.RetTp}, 1)
iter := chunk.NewIterator4Chunk(srcChk)
Expand All @@ -454,6 +495,11 @@ func (s *testSuite) benchmarkAggFunc(b *testing.B, p aggTest) {
if err != nil {
b.Fatal(err)
}
if p.orderBy {
desc.OrderByItems = []*util.ByItems{
{Expr: args[0], Desc: true},
}
}
finalFunc = aggfuncs.Build(s.ctx, desc, 0)
resultChk.Reset()
b.Run(fmt.Sprintf("%v(distinct)/%v", p.funcName, p.dataType), func(b *testing.B) {
Expand All @@ -475,11 +521,19 @@ func (s *testSuite) benchmarkMultiArgsAggFunc(b *testing.B, p multiArgsAggTest)
for k := 0; k < len(p.dataTypes); k++ {
args[k] = &expression.Column{RetType: p.dataTypes[k], Index: k}
}
if p.funcName == ast.AggFuncGroupConcat {
args = append(args, &expression.Constant{Value: types.NewStringDatum(" "), RetType: types.NewFieldType(mysql.TypeString)})
}

desc, err := aggregation.NewAggFuncDesc(s.ctx, p.funcName, args, false)
if err != nil {
b.Fatal(err)
}
if p.orderBy {
desc.OrderByItems = []*util.ByItems{
{Expr: args[0], Desc: true},
}
}
finalFunc := aggfuncs.Build(s.ctx, desc, 0)
resultChk := chunk.NewChunkWithCapacity([]*types.FieldType{desc.RetTp}, 1)
iter := chunk.NewIterator4Chunk(srcChk)
Expand All @@ -495,6 +549,11 @@ func (s *testSuite) benchmarkMultiArgsAggFunc(b *testing.B, p multiArgsAggTest)
if err != nil {
b.Fatal(err)
}
if p.orderBy {
desc.OrderByItems = []*util.ByItems{
{Expr: args[0], Desc: true},
}
}
finalFunc = aggfuncs.Build(s.ctx, desc, 0)
resultChk.Reset()
b.Run(fmt.Sprintf("%v(distinct)/%v", p.funcName, p.dataTypes), func(b *testing.B) {
Expand Down
24 changes: 18 additions & 6 deletions executor/aggfuncs/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,10 +320,6 @@ func buildGroupConcat(ctx sessionctx.Context, aggFuncDesc *aggregation.AggFuncDe
case aggregation.DedupMode:
return nil
default:
base := baseAggFunc{
args: aggFuncDesc.Args[:len(aggFuncDesc.Args)-1],
ordinal: ordinal,
}
// The last arg is promised to be a not-null string constant, so the error can be ignored.
c, _ := aggFuncDesc.Args[len(aggFuncDesc.Args)-1].(*expression.Constant)
sep, _, err := c.EvalString(nil, chunk.Row{})
Expand All @@ -342,10 +338,26 @@ func buildGroupConcat(ctx sessionctx.Context, aggFuncDesc *aggregation.AggFuncDe
panic(fmt.Sprintf("Error happened when buildGroupConcat: %s", err.Error()))
}
var truncated int32
base := baseGroupConcat4String{
baseAggFunc: baseAggFunc{
args: aggFuncDesc.Args[:len(aggFuncDesc.Args)-1],
ordinal: ordinal,
},
byItems: aggFuncDesc.OrderByItems,
sep: sep,
maxLen: maxLen,
truncated: &truncated,
}
if aggFuncDesc.HasDistinct {
return &groupConcatDistinct{baseGroupConcat4String{baseAggFunc: base, sep: sep, maxLen: maxLen, truncated: &truncated}}
if len(aggFuncDesc.OrderByItems) > 0 {
return &groupConcatDistinctOrder{base}
}
return &groupConcatDistinct{base}
}
if len(aggFuncDesc.OrderByItems) > 0 {
return &groupConcatOrder{base}
}
return &groupConcat{baseGroupConcat4String{baseAggFunc: base, sep: sep, maxLen: maxLen, truncated: &truncated}}
return &groupConcat{base}
}
}

Expand Down
Loading