Skip to content

Commit

Permalink
statistics: improve memory for mergeGlobalStatsTopNByConcurrency (#45993
Browse files Browse the repository at this point in the history
)

close #45727
  • Loading branch information
hawkingrei authored Aug 14, 2023
1 parent 60588b9 commit e9f4e31
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 29 deletions.
5 changes: 1 addition & 4 deletions statistics/cmsketch_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,7 @@ func benchmarkMergeGlobalStatsTopNByConcurrencyWithHists(partitions int, b *test
h.Buckets = append(h.Buckets, statistics.Bucket{Repeat: 10, Count: 40})
hists = append(hists, h)
}
wrapper := &statistics.StatsWrapper{
AllTopN: topNs,
AllHg: hists,
}
wrapper := statistics.NewStatsWrapper(hists, topNs)
const mergeConcurrency = 4
batchSize := len(wrapper.AllTopN) / mergeConcurrency
if batchSize < 1 {
Expand Down
19 changes: 2 additions & 17 deletions statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package handle

import (
"bytes"
"context"
"encoding/json"
"fmt"
Expand Down Expand Up @@ -948,19 +947,15 @@ func MergeGlobalStatsTopNByConcurrency(mergeConcurrency, mergeBatchSize int, wra

// handle Error
hasErr := false
errMsg := make([]string, 0)
for resp := range respCh {
if resp.Err != nil {
hasErr = true
errMsg = append(errMsg, resp.Err.Error())
}
resps = append(resps, resp)
}
if hasErr {
errMsg := make([]string, 0)
for _, resp := range resps {
if resp.Err != nil {
errMsg = append(errMsg, resp.Err.Error())
}
}
return nil, nil, nil, errors.New(strings.Join(errMsg, ","))
}

Expand All @@ -972,16 +967,6 @@ func MergeGlobalStatsTopNByConcurrency(mergeConcurrency, mergeBatchSize int, wra
sorted = append(sorted, resp.TopN.TopN...)
}
leftTopn = append(leftTopn, resp.PopedTopn...)
for i, removeTopn := range resp.RemoveVals {
// Remove the value from the Hists.
if len(removeTopn) > 0 {
tmp := removeTopn
slices.SortFunc(tmp, func(i, j statistics.TopNMeta) int {
return bytes.Compare(i.Encoded, j.Encoded)
})
wrapper.AllHg[i].RemoveVals(tmp)
}
}
}

globalTopN, popedTopn := statistics.GetMergedTopNFromSortedSlice(sorted, n)
Expand Down
18 changes: 10 additions & 8 deletions statistics/merge_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package statistics

import (
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -44,6 +45,8 @@ type topnStatsMergeWorker struct {
respCh chan<- *TopnStatsMergeResponse
// the stats in the wrapper should only be read during the worker
statsWrapper *StatsWrapper
// shardMutex is used to protect `statsWrapper.AllHg`
shardMutex []sync.Mutex
}

// NewTopnStatsMergeWorker returns topn merge worker
Expand All @@ -57,6 +60,7 @@ func NewTopnStatsMergeWorker(
respCh: respCh,
}
worker.statsWrapper = wrapper
worker.shardMutex = make([]sync.Mutex, len(wrapper.AllHg))
worker.killed = killed
return worker
}
Expand All @@ -77,10 +81,9 @@ func NewTopnStatsMergeTask(start, end int) *TopnStatsMergeTask {

// TopnStatsMergeResponse indicates topn merge worker response
type TopnStatsMergeResponse struct {
Err error
TopN *TopN
PopedTopn []TopNMeta
RemoveVals [][]TopNMeta
Err error
TopN *TopN
PopedTopn []TopNMeta
}

// Run runs topn merge like statistics.MergePartTopN2GlobalTopN
Expand All @@ -99,7 +102,6 @@ func (worker *topnStatsMergeWorker) Run(timeZone *time.Location, isIndex bool,
return
}
partNum := len(allTopNs)
removeVals := make([][]TopNMeta, partNum)
// Different TopN structures may hold the same value, we have to merge them.
counter := make(map[hack.MutableString]float64)
// datumMap is used to store the mapping from the string type to datum type.
Expand Down Expand Up @@ -168,13 +170,13 @@ func (worker *topnStatsMergeWorker) Run(timeZone *time.Location, isIndex bool,
if count != 0 {
counter[encodedVal] += count
// Remove the value corresponding to encodedVal from the histogram.
removeVals[j] = append(removeVals[j], TopNMeta{Encoded: datum.GetBytes(), Count: uint64(count)})
worker.shardMutex[j].Lock()
worker.statsWrapper.AllHg[j].BinarySearchRemoveVal(TopNMeta{Encoded: datum.GetBytes(), Count: uint64(count)})
worker.shardMutex[j].Unlock()
}
}
}
}
// record remove values
resp.RemoveVals = removeVals

numTop := len(counter)
if numTop == 0 {
Expand Down

0 comments on commit e9f4e31

Please sign in to comment.