Skip to content

Commit

Permalink
executor: Improve the performance of aggFuncSum by using sliding wi…
Browse files Browse the repository at this point in the history
…ndow (#14887)
  • Loading branch information
mmyj authored Mar 23, 2020
1 parent 3906d5a commit bbac2b2
Show file tree
Hide file tree
Showing 9 changed files with 298 additions and 107 deletions.
9 changes: 6 additions & 3 deletions executor/aggfuncs/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func Build(ctx sessionctx.Context, aggFuncDesc *aggregation.AggFuncDesc, ordinal
case ast.AggFuncCount:
return buildCount(aggFuncDesc, ordinal)
case ast.AggFuncSum:
return buildSum(aggFuncDesc, ordinal)
return buildSum(ctx, aggFuncDesc, ordinal)
case ast.AggFuncAvg:
return buildAvg(aggFuncDesc, ordinal)
case ast.AggFuncFirstRow:
Expand Down Expand Up @@ -134,7 +134,7 @@ func buildCount(aggFuncDesc *aggregation.AggFuncDesc, ordinal int) AggFunc {
}

// buildSum builds the AggFunc implementation for function "SUM".
func buildSum(aggFuncDesc *aggregation.AggFuncDesc, ordinal int) AggFunc {
func buildSum(ctx sessionctx.Context, aggFuncDesc *aggregation.AggFuncDesc, ordinal int) AggFunc {
base := baseSumAggFunc{
baseAggFunc: baseAggFunc{
args: aggFuncDesc.Args,
Expand All @@ -155,7 +155,10 @@ func buildSum(aggFuncDesc *aggregation.AggFuncDesc, ordinal int) AggFunc {
if aggFuncDesc.HasDistinct {
return &sum4DistinctFloat64{base}
}
return &sum4Float64{base}
if ctx.GetSessionVars().WindowingUseHighPrecision {
return &sum4Float64HighPrecision{baseSum4Float64{base}}
}
return &sum4Float64{baseSum4Float64{base}}
}
}
}
Expand Down
132 changes: 103 additions & 29 deletions executor/aggfuncs/func_sum.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,56 +22,57 @@ import (
)

type partialResult4SumFloat64 struct {
val float64
isNull bool
val float64
notNullRowCount int64
}

type partialResult4SumDecimal struct {
val types.MyDecimal
isNull bool
val types.MyDecimal
notNullRowCount int64
}

type partialResult4SumDistinctFloat64 struct {
partialResult4SumFloat64
val float64
isNull bool
valSet set.Float64Set
}

type partialResult4SumDistinctDecimal struct {
partialResult4SumDecimal
val types.MyDecimal
isNull bool
valSet set.StringSet
}

type baseSumAggFunc struct {
baseAggFunc
}

type sum4Float64 struct {
type baseSum4Float64 struct {
baseSumAggFunc
}

func (e *sum4Float64) AllocPartialResult() PartialResult {
func (e *baseSum4Float64) AllocPartialResult() PartialResult {
p := new(partialResult4SumFloat64)
p.isNull = true
return PartialResult(p)
}

func (e *sum4Float64) ResetPartialResult(pr PartialResult) {
func (e *baseSum4Float64) ResetPartialResult(pr PartialResult) {
p := (*partialResult4SumFloat64)(pr)
p.val = 0
p.isNull = true
p.notNullRowCount = 0
}

func (e *sum4Float64) AppendFinalResult2Chunk(sctx sessionctx.Context, pr PartialResult, chk *chunk.Chunk) error {
func (e *baseSum4Float64) AppendFinalResult2Chunk(sctx sessionctx.Context, pr PartialResult, chk *chunk.Chunk) error {
p := (*partialResult4SumFloat64)(pr)
if p.isNull {
if p.notNullRowCount == 0 {
chk.AppendNull(e.ordinal)
return nil
}
chk.AppendFloat64(e.ordinal, p.val)
return nil
}

func (e *sum4Float64) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, pr PartialResult) error {
func (e *baseSum4Float64) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, pr PartialResult) error {
p := (*partialResult4SumFloat64)(pr)
for _, row := range rowsInGroup {
input, isNull, err := e.args[0].EvalReal(sctx, row)
Expand All @@ -81,44 +82,74 @@ func (e *sum4Float64) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup [
if isNull {
continue
}
if p.isNull {
p.val = input
p.isNull = false
continue
}
p.val += input
p.notNullRowCount++
}
return nil
}

func (e *sum4Float64) MergePartialResult(sctx sessionctx.Context, src, dst PartialResult) error {
func (e *baseSum4Float64) MergePartialResult(sctx sessionctx.Context, src, dst PartialResult) error {
p1, p2 := (*partialResult4SumFloat64)(src), (*partialResult4SumFloat64)(dst)
if p1.isNull {
if p1.notNullRowCount == 0 {
return nil
}
p2.val += p1.val
p2.isNull = false
p2.notNullRowCount += p1.notNullRowCount
return nil
}

type sum4Float64 struct {
baseSum4Float64
}

func (e *sum4Float64) Slide(sctx sessionctx.Context, rows []chunk.Row, lastStart, lastEnd uint64, shiftStart, shiftEnd uint64, pr PartialResult) error {
p := (*partialResult4SumFloat64)(pr)
for i := uint64(0); i < shiftEnd; i++ {
input, isNull, err := e.args[0].EvalReal(sctx, rows[lastEnd+i])
if err != nil {
return err
}
if isNull {
continue
}
p.val += input
p.notNullRowCount++
}
for i := uint64(0); i < shiftStart; i++ {
input, isNull, err := e.args[0].EvalReal(sctx, rows[lastStart+i])
if err != nil {
return err
}
if isNull {
continue
}
p.val -= input
p.notNullRowCount--
}
return nil
}

type sum4Float64HighPrecision struct {
baseSum4Float64
}

type sum4Decimal struct {
baseSumAggFunc
}

func (e *sum4Decimal) AllocPartialResult() PartialResult {
p := new(partialResult4SumDecimal)
p.isNull = true
return PartialResult(p)
}

func (e *sum4Decimal) ResetPartialResult(pr PartialResult) {
p := (*partialResult4SumDecimal)(pr)
p.isNull = true
p.notNullRowCount = 0
}

func (e *sum4Decimal) AppendFinalResult2Chunk(sctx sessionctx.Context, pr PartialResult, chk *chunk.Chunk) error {
p := (*partialResult4SumDecimal)(pr)
if p.isNull {
if p.notNullRowCount == 0 {
chk.AppendNull(e.ordinal)
return nil
}
Expand All @@ -136,9 +167,9 @@ func (e *sum4Decimal) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup [
if isNull {
continue
}
if p.isNull {
if p.notNullRowCount == 0 {
p.val = *input
p.isNull = false
p.notNullRowCount = 1
continue
}

Expand All @@ -148,13 +179,56 @@ func (e *sum4Decimal) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup [
return err
}
p.val = *newSum
p.notNullRowCount++
}
return nil
}

func (e *sum4Decimal) Slide(sctx sessionctx.Context, rows []chunk.Row, lastStart, lastEnd uint64, shiftStart, shiftEnd uint64, pr PartialResult) error {
p := (*partialResult4SumDecimal)(pr)
for i := uint64(0); i < shiftEnd; i++ {
input, isNull, err := e.args[0].EvalDecimal(sctx, rows[lastEnd+i])
if err != nil {
return err
}
if isNull {
continue
}
if p.notNullRowCount == 0 {
p.val = *input
p.notNullRowCount = 1
continue
}
newSum := new(types.MyDecimal)
err = types.DecimalAdd(&p.val, input, newSum)
if err != nil {
return err
}
p.val = *newSum
p.notNullRowCount++
}
for i := uint64(0); i < shiftStart; i++ {
input, isNull, err := e.args[0].EvalDecimal(sctx, rows[lastStart+i])
if err != nil {
return err
}
if isNull {
continue
}
newSum := new(types.MyDecimal)
err = types.DecimalSub(&p.val, input, newSum)
if err != nil {
return err
}
p.val = *newSum
p.notNullRowCount--
}
return nil
}

func (e *sum4Decimal) MergePartialResult(sctx sessionctx.Context, src, dst PartialResult) error {
p1, p2 := (*partialResult4SumDecimal)(src), (*partialResult4SumDecimal)(dst)
if p1.isNull {
if p1.notNullRowCount == 0 {
return nil
}
newSum := new(types.MyDecimal)
Expand All @@ -163,7 +237,7 @@ func (e *sum4Decimal) MergePartialResult(sctx sessionctx.Context, src, dst Parti
return err
}
p2.val = *newSum
p2.isNull = false
p2.notNullRowCount += p1.notNullRowCount
return nil
}

Expand Down
Loading

0 comments on commit bbac2b2

Please sign in to comment.