Skip to content

Commit

Permalink
executor: add new agg function APPROX_COUNT_DISTINCT (#17175) (#18120)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-srebot authored Jun 19, 2020
1 parent 2f2c4e3 commit 6c2a572
Show file tree
Hide file tree
Showing 24 changed files with 722 additions and 37 deletions.
15 changes: 15 additions & 0 deletions executor/aggfuncs/aggfunc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,9 @@ func (s *testSuite) testMergePartialResult(c *C, p aggTest) {
finalFunc := aggfuncs.Build(s.ctx, finalDesc, 0)
finalPr := finalFunc.AllocPartialResult()
resultChk := chunk.NewChunkWithCapacity([]*types.FieldType{p.dataType}, 1)
if p.funcName == ast.AggFuncApproxCountDistinct {
resultChk = chunk.NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeString)}, 1)
}

// update partial result.
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
Expand All @@ -159,6 +162,9 @@ func (s *testSuite) testMergePartialResult(c *C, p aggTest) {
p.messUpChunk(srcChk)
partialFunc.AppendFinalResult2Chunk(s.ctx, partialResult, resultChk)
dt := resultChk.GetRow(0).GetDatum(0, p.dataType)
if p.funcName == ast.AggFuncApproxCountDistinct {
dt = resultChk.GetRow(0).GetDatum(0, types.NewFieldType(mysql.TypeString))
}
result, err := dt.CompareDatum(s.ctx.GetSessionVars().StmtCtx, &p.results[0])
c.Assert(err, IsNil)
c.Assert(result, Equals, 0, Commentf("%v != %v", dt.String(), p.results[0]))
Expand All @@ -178,17 +184,26 @@ func (s *testSuite) testMergePartialResult(c *C, p aggTest) {
resultChk.Reset()
partialFunc.AppendFinalResult2Chunk(s.ctx, partialResult, resultChk)
dt = resultChk.GetRow(0).GetDatum(0, p.dataType)
if p.funcName == ast.AggFuncApproxCountDistinct {
dt = resultChk.GetRow(0).GetDatum(0, types.NewFieldType(mysql.TypeString))
}
result, err = dt.CompareDatum(s.ctx.GetSessionVars().StmtCtx, &p.results[1])
c.Assert(err, IsNil)
c.Assert(result, Equals, 0, Commentf("%v != %v", dt.String(), p.results[1]))
err = finalFunc.MergePartialResult(s.ctx, partialResult, finalPr)
c.Assert(err, IsNil)

if p.funcName == ast.AggFuncApproxCountDistinct {
resultChk = chunk.NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}, 1)
}
resultChk.Reset()
err = finalFunc.AppendFinalResult2Chunk(s.ctx, finalPr, resultChk)
c.Assert(err, IsNil)

dt = resultChk.GetRow(0).GetDatum(0, p.dataType)
if p.funcName == ast.AggFuncApproxCountDistinct {
dt = resultChk.GetRow(0).GetDatum(0, types.NewFieldType(mysql.TypeLonglong))
}
result, err = dt.CompareDatum(s.ctx.GetSessionVars().StmtCtx, &p.results[2])
c.Assert(err, IsNil)
c.Assert(result, Equals, 0, Commentf("%v != %v", dt.String(), p.results[2]))
Expand Down
6 changes: 6 additions & 0 deletions executor/aggfuncs/aggfuncs.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ var (
_ AggFunc = (*countOriginalWithDistinct4String)(nil)
_ AggFunc = (*countOriginalWithDistinct)(nil)

// All the AggFunc implementations for "APPROX_COUNT_DISTINCT" are listed here.
_ AggFunc = (*approxCountDistinctOriginal)(nil)
_ AggFunc = (*approxCountDistinctPartial1)(nil)
_ AggFunc = (*approxCountDistinctPartial2)(nil)
_ AggFunc = (*approxCountDistinctFinal)(nil)

// All the AggFunc implementations for "FIRSTROW" are listed here.
_ AggFunc = (*firstRow4Decimal)(nil)
_ AggFunc = (*firstRow4Int)(nil)
Expand Down
35 changes: 35 additions & 0 deletions executor/aggfuncs/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ func Build(ctx sessionctx.Context, aggFuncDesc *aggregation.AggFuncDesc, ordinal
return buildVarPop(aggFuncDesc, ordinal)
case ast.AggFuncJsonObjectAgg:
return buildJSONObjectAgg(aggFuncDesc, ordinal)
case ast.AggFuncApproxCountDistinct:
return buildApproxCountDistinct(aggFuncDesc, ordinal)
}
return nil
}
Expand Down Expand Up @@ -89,6 +91,39 @@ func BuildWindowFunctions(ctx sessionctx.Context, windowFuncDesc *aggregation.Ag
}
}

func buildApproxCountDistinct(aggFuncDesc *aggregation.AggFuncDesc, ordinal int) AggFunc {
base := baseApproxCountDistinct{baseAggFunc{
args: aggFuncDesc.Args,
ordinal: ordinal,
}}

// In partition table, union need to compute partial result into partial result.
// We can detect and handle this case by checking whether return type is string.

switch aggFuncDesc.RetTp.Tp {
case mysql.TypeLonglong:
switch aggFuncDesc.Mode {
case aggregation.CompleteMode:
return &approxCountDistinctOriginal{base}
case aggregation.Partial1Mode:
return &approxCountDistinctPartial1{approxCountDistinctOriginal{base}}
case aggregation.Partial2Mode:
return &approxCountDistinctPartial2{approxCountDistinctPartial1{approxCountDistinctOriginal{base}}}
case aggregation.FinalMode:
return &approxCountDistinctFinal{approxCountDistinctPartial2{approxCountDistinctPartial1{approxCountDistinctOriginal{base}}}}
}
case mysql.TypeString:
switch aggFuncDesc.Mode {
case aggregation.CompleteMode, aggregation.Partial1Mode:
return &approxCountDistinctPartial1{approxCountDistinctOriginal{base}}
case aggregation.Partial2Mode, aggregation.FinalMode:
return &approxCountDistinctPartial2{approxCountDistinctPartial1{approxCountDistinctOriginal{base}}}
}
}

return nil
}

// buildCount builds the AggFunc implementation for function "COUNT".
func buildCount(aggFuncDesc *aggregation.AggFuncDesc, ordinal int) AggFunc {
// If mode is DedupMode, we return nil for not implemented.
Expand Down
Loading

0 comments on commit 6c2a572

Please sign in to comment.