Skip to content

Commit

Permalink
executor: track partialResultMap in unparalleled aggreagte. (#22962)
Browse files Browse the repository at this point in the history
  • Loading branch information
wshwsh12 authored Mar 2, 2021
1 parent 2a5aa13 commit 3fcfefd
Showing 1 changed file with 15 additions and 2 deletions.
17 changes: 15 additions & 2 deletions executor/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ type HashAggExec struct {
PartialAggFuncs []aggfuncs.AggFunc
FinalAggFuncs []aggfuncs.AggFunc
partialResultMap aggPartialResultMapper
bInMap int64 // indicate there are 2^bInMap buckets in partialResultMap
groupSet set.StringSet
groupKeys []string
cursor4GroupKey int
Expand Down Expand Up @@ -283,6 +284,8 @@ func (e *HashAggExec) Open(ctx context.Context) error {
func (e *HashAggExec) initForUnparallelExec() {
e.groupSet = set.NewStringSet()
e.partialResultMap = make(aggPartialResultMapper)
e.bInMap = 0
e.memTracker.Consume(defBucketMemoryUsage * (1 << e.bInMap))
e.groupKeyBuffer = make([][]byte, 0, 8)
e.childResult = newFirstChunk(e.children[0])
e.memTracker.Consume(e.childResult.MemoryUsage())
Expand Down Expand Up @@ -878,6 +881,7 @@ func (e *HashAggExec) execute(ctx context.Context) (err error) {
return err
}

allMemDelta := int64(0)
for j := 0; j < e.childResult.NumRows(); j++ {
groupKey := string(e.groupKeyBuffer[j]) // do memory copy here, because e.groupKeyBuffer may be reused.
if !e.groupSet.Exist(groupKey) {
Expand All @@ -890,23 +894,32 @@ func (e *HashAggExec) execute(ctx context.Context) (err error) {
if err != nil {
return err
}
e.memTracker.Consume(memDelta)
allMemDelta += memDelta
}
}
e.memTracker.Consume(allMemDelta)
}
}

func (e *HashAggExec) getPartialResults(groupKey string) []aggfuncs.PartialResult {
partialResults, ok := e.partialResultMap[groupKey]
allMemDelta := int64(0)
if !ok {
partialResults = make([]aggfuncs.PartialResult, 0, len(e.PartialAggFuncs))
for _, af := range e.PartialAggFuncs {
partialResult, memDelta := af.AllocPartialResult()
partialResults = append(partialResults, partialResult)
e.memTracker.Consume(memDelta)
allMemDelta += memDelta
}
e.partialResultMap[groupKey] = partialResults
allMemDelta += int64(len(groupKey))
// Map will expand when count > bucketNum * loadFactor. The memory usage will doubled.
if len(e.partialResultMap) > (1<<e.bInMap)*loadFactorNum/loadFactorDen {
e.memTracker.Consume(defBucketMemoryUsage * (1 << e.bInMap))
e.bInMap++
}
}
e.memTracker.Consume(allMemDelta)
return partialResults
}

Expand Down

0 comments on commit 3fcfefd

Please sign in to comment.