Skip to content

Commit

Permalink
executor: remove redundant memory pre-allocations in parallel sort ex…
Browse files Browse the repository at this point in the history
…ecutor (#54073)

close #54070
  • Loading branch information
xzhangxian1008 authored Jun 24, 2024
1 parent fd2b5e9 commit bec113a
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 18 deletions.
43 changes: 26 additions & 17 deletions pkg/executor/sortexec/parallel_sort_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ type parallelSortWorker struct {
localSortedRows []*chunk.Iterator4Slice
sortedRowsIter *chunk.Iterator4Slice
maxSortedRowsLimit int
batchRows []chunk.Row
chunkIters []*chunk.Iterator4Chunk
rowNumInChunkIters int
merger *multiWayMerger
}

Expand All @@ -62,7 +63,6 @@ func newParallelSortWorker(
sortedRowsIter *chunk.Iterator4Slice,
maxChunkSize int,
spillHelper *parallelSortSpillHelper) *parallelSortWorker {
maxSortedRowsLimit := maxChunkSize * 30
return &parallelSortWorker{
workerIDForTest: workerIDForTest,
lessRowFunc: lessRowFunc,
Expand All @@ -73,14 +73,12 @@ func newParallelSortWorker(
timesOfRowCompare: 0,
memTracker: memTracker,
sortedRowsIter: sortedRowsIter,
maxSortedRowsLimit: maxSortedRowsLimit,
maxSortedRowsLimit: maxChunkSize * 30,
spillHelper: spillHelper,
batchRows: make([]chunk.Row, 0, maxSortedRowsLimit),
}
}

func (p *parallelSortWorker) reset() {
p.batchRows = nil
p.localSortedRows = nil
p.sortedRowsIter = nil
p.merger = nil
Expand Down Expand Up @@ -126,28 +124,39 @@ func (p *parallelSortWorker) multiWayMergeLocalSortedRows() ([]chunk.Row, error)
return resultSortedRows, nil
}

func (p *parallelSortWorker) convertChunksToRows() []chunk.Row {
rows := make([]chunk.Row, 0, p.rowNumInChunkIters)
for _, iter := range p.chunkIters {
row := iter.Begin()
for !row.IsEmpty() {
rows = append(rows, row)
row = iter.Next()
}
}
p.chunkIters = p.chunkIters[:0]
p.rowNumInChunkIters = 0
return rows
}

func (p *parallelSortWorker) sortBatchRows() {
slices.SortFunc(p.batchRows, p.keyColumnsLess)
p.localSortedRows = append(p.localSortedRows, chunk.NewIterator4Slice(p.batchRows))
p.batchRows = make([]chunk.Row, 0, p.maxSortedRowsLimit)
rows := p.convertChunksToRows()
slices.SortFunc(rows, p.keyColumnsLess)
p.localSortedRows = append(p.localSortedRows, chunk.NewIterator4Slice(rows))
}

func (p *parallelSortWorker) sortLocalRows() ([]chunk.Row, error) {
// Handle Remaining batchRows whose row number is not over the `maxSortedRowsLimit`
if len(p.batchRows) > 0 {
if p.rowNumInChunkIters > 0 {
p.sortBatchRows()
}

return p.multiWayMergeLocalSortedRows()
}

func (p *parallelSortWorker) addChunkToBatchRows(chk *chunk.Chunk) {
func (p *parallelSortWorker) saveChunk(chk *chunk.Chunk) {
chkIter := chunk.NewIterator4Chunk(chk)
row := chkIter.Begin()
for !row.IsEmpty() {
p.batchRows = append(p.batchRows, row)
row = chkIter.Next()
}
p.chunkIters = append(p.chunkIters, chkIter)
p.rowNumInChunkIters += chkIter.Len()
}

// Fetching a bunch of chunks from chunkChannel and sort them.
Expand Down Expand Up @@ -182,9 +191,9 @@ func (p *parallelSortWorker) fetchChunksAndSortImpl() bool {
p.totalMemoryUsage += chk.MemoryUsage
}

p.addChunkToBatchRows(chk.Chk)
p.saveChunk(chk.Chk)

if len(p.batchRows) >= p.maxSortedRowsLimit {
if p.rowNumInChunkIters >= p.maxSortedRowsLimit {
p.sortBatchRows()
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/sortexec/sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ func (e *SortExec) generateResultFromMemory() (bool, error) {
}

maxChunkSize := e.MaxChunkSize()
resBuf := make([]rowWithError, 0, maxChunkSize)
resBuf := make([]rowWithError, 0, 3)
idx := int64(0)
var row chunk.Row
for {
Expand Down

0 comments on commit bec113a

Please sign in to comment.