From bec113a91d850e9d379a0d754b8164420a5fb0e2 Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Mon, 24 Jun 2024 15:59:50 +0800 Subject: [PATCH] executor: remove redundant memory pre-allocations in parallel sort executor (#54073) close pingcap/tidb#54070 --- pkg/executor/sortexec/parallel_sort_worker.go | 43 +++++++++++-------- pkg/executor/sortexec/sort.go | 2 +- 2 files changed, 27 insertions(+), 18 deletions(-) diff --git a/pkg/executor/sortexec/parallel_sort_worker.go b/pkg/executor/sortexec/parallel_sort_worker.go index 082b935060ecc..1d7a4d7aaa7b6 100644 --- a/pkg/executor/sortexec/parallel_sort_worker.go +++ b/pkg/executor/sortexec/parallel_sort_worker.go @@ -47,7 +47,8 @@ type parallelSortWorker struct { localSortedRows []*chunk.Iterator4Slice sortedRowsIter *chunk.Iterator4Slice maxSortedRowsLimit int - batchRows []chunk.Row + chunkIters []*chunk.Iterator4Chunk + rowNumInChunkIters int merger *multiWayMerger } @@ -62,7 +63,6 @@ func newParallelSortWorker( sortedRowsIter *chunk.Iterator4Slice, maxChunkSize int, spillHelper *parallelSortSpillHelper) *parallelSortWorker { - maxSortedRowsLimit := maxChunkSize * 30 return ¶llelSortWorker{ workerIDForTest: workerIDForTest, lessRowFunc: lessRowFunc, @@ -73,14 +73,12 @@ func newParallelSortWorker( timesOfRowCompare: 0, memTracker: memTracker, sortedRowsIter: sortedRowsIter, - maxSortedRowsLimit: maxSortedRowsLimit, + maxSortedRowsLimit: maxChunkSize * 30, spillHelper: spillHelper, - batchRows: make([]chunk.Row, 0, maxSortedRowsLimit), } } func (p *parallelSortWorker) reset() { - p.batchRows = nil p.localSortedRows = nil p.sortedRowsIter = nil p.merger = nil @@ -126,28 +124,39 @@ func (p *parallelSortWorker) multiWayMergeLocalSortedRows() ([]chunk.Row, error) return resultSortedRows, nil } +func (p *parallelSortWorker) convertChunksToRows() []chunk.Row { + rows := make([]chunk.Row, 0, p.rowNumInChunkIters) + for _, iter := range p.chunkIters { + row := iter.Begin() + for !row.IsEmpty() { + rows = append(rows, row) + row = iter.Next() + } + } + p.chunkIters = p.chunkIters[:0] + p.rowNumInChunkIters = 0 + return rows +} + func (p *parallelSortWorker) sortBatchRows() { - slices.SortFunc(p.batchRows, p.keyColumnsLess) - p.localSortedRows = append(p.localSortedRows, chunk.NewIterator4Slice(p.batchRows)) - p.batchRows = make([]chunk.Row, 0, p.maxSortedRowsLimit) + rows := p.convertChunksToRows() + slices.SortFunc(rows, p.keyColumnsLess) + p.localSortedRows = append(p.localSortedRows, chunk.NewIterator4Slice(rows)) } func (p *parallelSortWorker) sortLocalRows() ([]chunk.Row, error) { // Handle Remaining batchRows whose row number is not over the `maxSortedRowsLimit` - if len(p.batchRows) > 0 { + if p.rowNumInChunkIters > 0 { p.sortBatchRows() } return p.multiWayMergeLocalSortedRows() } -func (p *parallelSortWorker) addChunkToBatchRows(chk *chunk.Chunk) { +func (p *parallelSortWorker) saveChunk(chk *chunk.Chunk) { chkIter := chunk.NewIterator4Chunk(chk) - row := chkIter.Begin() - for !row.IsEmpty() { - p.batchRows = append(p.batchRows, row) - row = chkIter.Next() - } + p.chunkIters = append(p.chunkIters, chkIter) + p.rowNumInChunkIters += chkIter.Len() } // Fetching a bunch of chunks from chunkChannel and sort them. @@ -182,9 +191,9 @@ func (p *parallelSortWorker) fetchChunksAndSortImpl() bool { p.totalMemoryUsage += chk.MemoryUsage } - p.addChunkToBatchRows(chk.Chk) + p.saveChunk(chk.Chk) - if len(p.batchRows) >= p.maxSortedRowsLimit { + if p.rowNumInChunkIters >= p.maxSortedRowsLimit { p.sortBatchRows() } diff --git a/pkg/executor/sortexec/sort.go b/pkg/executor/sortexec/sort.go index f55f366e60307..5d56433fd4ad4 100644 --- a/pkg/executor/sortexec/sort.go +++ b/pkg/executor/sortexec/sort.go @@ -382,7 +382,7 @@ func (e *SortExec) generateResultFromMemory() (bool, error) { } maxChunkSize := e.MaxChunkSize() - resBuf := make([]rowWithError, 0, maxChunkSize) + resBuf := make([]rowWithError, 0, 3) idx := int64(0) var row chunk.Row for {