Skip to content

Commit

Permalink
executor: refine topn (#51271)
Browse files Browse the repository at this point in the history
ref #47733
  • Loading branch information
xzhangxian1008 authored Feb 23, 2024
1 parent 98834c2 commit a6768bc
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 44 deletions.
2 changes: 1 addition & 1 deletion pkg/executor/sortexec/sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ func (e *SortExec) lessRow(rowI, rowJ chunk.Row) bool {
return false
}

func (e *SortExec) compressRow(rowI, rowJ chunk.Row) int {
func (e *SortExec) compareRow(rowI, rowJ chunk.Row) int {
for i, colIdx := range e.keyColumns {
cmpFunc := e.keyCmpFuncs[i]
cmp := cmpFunc(rowI, colIdx, rowJ, colIdx)
Expand Down
86 changes: 43 additions & 43 deletions pkg/executor/sortexec/topn.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,19 @@ type TopNExec struct {
Limit *plannercore.PhysicalLimit
totalLimit uint64

// rowChunks is the chunks to store row values.
rowChunks *chunk.List
// rowPointer store the chunk index and row index for each row.
rowPtrs []chunk.RowPtr

chkHeap *topNChunkHeap
Idx int
}

// topNChunkHeap implements heap.Interface.
type topNChunkHeap struct {
*TopNExec

// rowChunks is the chunks to store row values.
rowChunks *chunk.List
// rowPointer store the chunk index and row index for each row.
rowPtrs []chunk.RowPtr

Idx int
}

// Less implement heap.Interface, but since we mantains a max heap,
Expand Down Expand Up @@ -89,18 +90,18 @@ func (h *topNChunkHeap) Swap(i, j int) {
}

func (e *TopNExec) keyColumnsCompare(i, j chunk.RowPtr) int {
rowI := e.rowChunks.GetRow(i)
rowJ := e.rowChunks.GetRow(j)
return e.compressRow(rowI, rowJ)
rowI := e.chkHeap.rowChunks.GetRow(i)
rowJ := e.chkHeap.rowChunks.GetRow(j)
return e.compareRow(rowI, rowJ)
}

func (e *TopNExec) initPointers() {
e.rowPtrs = make([]chunk.RowPtr, 0, e.rowChunks.Len())
e.memTracker.Consume(int64(8 * e.rowChunks.Len()))
for chkIdx := 0; chkIdx < e.rowChunks.NumChunks(); chkIdx++ {
rowChk := e.rowChunks.GetChunk(chkIdx)
e.chkHeap.rowPtrs = make([]chunk.RowPtr, 0, e.chkHeap.rowChunks.Len())
e.memTracker.Consume(int64(8 * e.chkHeap.rowChunks.Len()))
for chkIdx := 0; chkIdx < e.chkHeap.rowChunks.NumChunks(); chkIdx++ {
rowChk := e.chkHeap.rowChunks.GetChunk(chkIdx)
for rowIdx := 0; rowIdx < rowChk.NumRows(); rowIdx++ {
e.rowPtrs = append(e.rowPtrs, chunk.RowPtr{ChkIdx: uint32(chkIdx), RowIdx: uint32(rowIdx)})
e.chkHeap.rowPtrs = append(e.chkHeap.rowPtrs, chunk.RowPtr{ChkIdx: uint32(chkIdx), RowIdx: uint32(rowIdx)})
}
}
}
Expand All @@ -111,7 +112,8 @@ func (e *TopNExec) Open(ctx context.Context) error {
e.memTracker.AttachTo(e.Ctx().GetSessionVars().StmtCtx.MemTracker)

e.fetched = false
e.Idx = 0
e.chkHeap = &topNChunkHeap{TopNExec: e}
e.chkHeap.Idx = 0

return exec.Open(ctx, e.Children(0))
}
Expand All @@ -121,7 +123,7 @@ func (e *TopNExec) Next(ctx context.Context, req *chunk.Chunk) error {
req.Reset()
if !e.fetched {
e.totalLimit = e.Limit.Offset + e.Limit.Count
e.Idx = int(e.Limit.Offset)
e.chkHeap.Idx = int(e.Limit.Offset)
err := e.loadChunksUntilTotalLimit(ctx)
if err != nil {
return err
Expand All @@ -132,38 +134,37 @@ func (e *TopNExec) Next(ctx context.Context, req *chunk.Chunk) error {
}
e.fetched = true
}
if e.Idx >= len(e.rowPtrs) {
if e.chkHeap.Idx >= len(e.chkHeap.rowPtrs) {
return nil
}
if !req.IsFull() {
numToAppend := min(len(e.rowPtrs)-e.Idx, req.RequiredRows()-req.NumRows())
numToAppend := min(len(e.chkHeap.rowPtrs)-e.chkHeap.Idx, req.RequiredRows()-req.NumRows())
rows := make([]chunk.Row, numToAppend)
for index := 0; index < numToAppend; index++ {
rows[index] = e.rowChunks.GetRow(e.rowPtrs[e.Idx])
e.Idx++
rows[index] = e.chkHeap.rowChunks.GetRow(e.chkHeap.rowPtrs[e.chkHeap.Idx])
e.chkHeap.Idx++
}
req.AppendRows(rows)
}
return nil
}

func (e *TopNExec) loadChunksUntilTotalLimit(ctx context.Context) error {
e.chkHeap = &topNChunkHeap{e}
e.rowChunks = chunk.NewList(exec.RetTypes(e), e.InitCap(), e.MaxChunkSize())
e.rowChunks.GetMemTracker().AttachTo(e.memTracker)
e.rowChunks.GetMemTracker().SetLabel(memory.LabelForRowChunks)
for uint64(e.rowChunks.Len()) < e.totalLimit {
e.chkHeap.rowChunks = chunk.NewList(exec.RetTypes(e), e.InitCap(), e.MaxChunkSize())
e.chkHeap.rowChunks.GetMemTracker().AttachTo(e.memTracker)
e.chkHeap.rowChunks.GetMemTracker().SetLabel(memory.LabelForRowChunks)
for uint64(e.chkHeap.rowChunks.Len()) < e.totalLimit {
srcChk := exec.TryNewCacheChunk(e.Children(0))
// adjust required rows by total limit
srcChk.SetRequiredRows(int(e.totalLimit-uint64(e.rowChunks.Len())), e.MaxChunkSize())
srcChk.SetRequiredRows(int(e.totalLimit-uint64(e.chkHeap.rowChunks.Len())), e.MaxChunkSize())
err := exec.Next(ctx, e.Children(0), srcChk)
if err != nil {
return err
}
if srcChk.NumRows() == 0 {
break
}
e.rowChunks.Add(srcChk)
e.chkHeap.rowChunks.Add(srcChk)
}
e.initPointers()
e.initCompareFuncs()
Expand All @@ -175,7 +176,7 @@ const topNCompactionFactor = 4

func (e *TopNExec) executeTopN(ctx context.Context) error {
heap.Init(e.chkHeap)
for uint64(len(e.rowPtrs)) > e.totalLimit {
for uint64(len(e.chkHeap.rowPtrs)) > e.totalLimit {
// The number of rows we loaded may exceeds total limit, remove greatest rows by Pop.
heap.Pop(e.chkHeap)
}
Expand All @@ -192,26 +193,26 @@ func (e *TopNExec) executeTopN(ctx context.Context) error {
if err != nil {
return err
}
if e.rowChunks.Len() > len(e.rowPtrs)*topNCompactionFactor {
err = e.doCompaction()
if e.chkHeap.rowChunks.Len() > len(e.chkHeap.rowPtrs)*topNCompactionFactor {
err = e.doCompaction(e.chkHeap)
if err != nil {
return err
}
}
}
slices.SortFunc(e.rowPtrs, e.keyColumnsCompare)
slices.SortFunc(e.chkHeap.rowPtrs, e.keyColumnsCompare)
return nil
}

func (e *TopNExec) processChildChk(childRowChk *chunk.Chunk) error {
for i := 0; i < childRowChk.NumRows(); i++ {
heapMaxPtr := e.rowPtrs[0]
heapMaxPtr := e.chkHeap.rowPtrs[0]
var heapMax, next chunk.Row
heapMax = e.rowChunks.GetRow(heapMaxPtr)
heapMax = e.chkHeap.rowChunks.GetRow(heapMaxPtr)
next = childRowChk.GetRow(i)
if e.chkHeap.greaterRow(heapMax, next) {
// Evict heap max, keep the next row.
e.rowPtrs[0] = e.rowChunks.AppendRow(childRowChk.GetRow(i))
e.chkHeap.rowPtrs[0] = e.chkHeap.rowChunks.AppendRow(childRowChk.GetRow(i))
heap.Fix(e.chkHeap, 0)
}
}
Expand All @@ -222,19 +223,18 @@ func (e *TopNExec) processChildChk(childRowChk *chunk.Chunk) error {
// If we don't do compaction, in a extreme case like the child data is already ascending sorted
// but we want descending top N, then we will keep all data in memory.
// But if data is distributed randomly, this function will be called log(n) times.
func (e *TopNExec) doCompaction() error {
func (e *TopNExec) doCompaction(chkHeap *topNChunkHeap) error {
newRowChunks := chunk.NewList(exec.RetTypes(e), e.InitCap(), e.MaxChunkSize())
newRowPtrs := make([]chunk.RowPtr, 0, e.rowChunks.Len())
for _, rowPtr := range e.rowPtrs {
newRowPtr := newRowChunks.AppendRow(e.rowChunks.GetRow(rowPtr))
newRowPtrs := make([]chunk.RowPtr, 0, chkHeap.rowChunks.Len())
for _, rowPtr := range chkHeap.rowPtrs {
newRowPtr := newRowChunks.AppendRow(chkHeap.rowChunks.GetRow(rowPtr))
newRowPtrs = append(newRowPtrs, newRowPtr)
}
newRowChunks.GetMemTracker().SetLabel(memory.LabelForRowChunks)
e.memTracker.ReplaceChild(e.rowChunks.GetMemTracker(), newRowChunks.GetMemTracker())
e.rowChunks = newRowChunks
e.memTracker.ReplaceChild(chkHeap.rowChunks.GetMemTracker(), newRowChunks.GetMemTracker())
chkHeap.rowChunks = newRowChunks

e.memTracker.Consume(int64(-8 * len(e.rowPtrs)))
e.memTracker.Consume(int64(8 * len(newRowPtrs)))
e.rowPtrs = newRowPtrs
e.memTracker.Consume(int64(8 * (len(newRowPtrs) - len(chkHeap.rowPtrs))))
chkHeap.rowPtrs = newRowPtrs
return nil
}

0 comments on commit a6768bc

Please sign in to comment.