From 9be32d6ae6ed446827722e8c00fe3c001a0878aa Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Fri, 24 May 2024 14:55:29 +0800 Subject: [PATCH 01/12] init --- .../sortexec/parallel_sort_spill_test.go | 22 +++++++---------- pkg/executor/sortexec/parallel_sort_test.go | 6 ----- pkg/executor/sortexec/sort.go | 24 ++++--------------- pkg/executor/sortexec/sort_spill_test.go | 12 +++++----- 4 files changed, 20 insertions(+), 44 deletions(-) diff --git a/pkg/executor/sortexec/parallel_sort_spill_test.go b/pkg/executor/sortexec/parallel_sort_spill_test.go index 26d2fc4cf9b81..44378dcf0613b 100644 --- a/pkg/executor/sortexec/parallel_sort_spill_test.go +++ b/pkg/executor/sortexec/parallel_sort_spill_test.go @@ -29,7 +29,7 @@ import ( var hardLimit1 = int64(100000) var hardLimit2 = hardLimit1 * 10 -func oneSpillCase(t *testing.T, ctx *mock.Context, exe *sortexec.SortExec, sortCase *testutil.SortCase, schema *expression.Schema, dataSource *testutil.MockDataSource) { +func oneSpillCase(t *testing.T, exe *sortexec.SortExec, sortCase *testutil.SortCase, schema *expression.Schema, dataSource *testutil.MockDataSource) { if exe == nil { exe = buildSortExec(sortCase, dataSource) } @@ -60,7 +60,7 @@ func inMemoryThenSpill(t *testing.T, ctx *mock.Context, exe *sortexec.SortExec, require.True(t, checkCorrectness(schema, exe, dataSource, resultChunks)) } -func failpointNoMemoryDataTest(t *testing.T, ctx *mock.Context, exe *sortexec.SortExec, sortCase *testutil.SortCase, schema *expression.Schema, dataSource *testutil.MockDataSource) { +func failpointNoMemoryDataTest(t *testing.T, exe *sortexec.SortExec, sortCase *testutil.SortCase, dataSource *testutil.MockDataSource) { if exe == nil { exe = buildSortExec(sortCase, dataSource) } @@ -68,7 +68,7 @@ func failpointNoMemoryDataTest(t *testing.T, ctx *mock.Context, exe *sortexec.So executeInFailpoint(t, exe, 0, nil) } -func failpointDataInMemoryThenSpillTest(t *testing.T, ctx *mock.Context, exe *sortexec.SortExec, sortCase *testutil.SortCase, schema *expression.Schema, dataSource *testutil.MockDataSource) { +func failpointDataInMemoryThenSpillTest(t *testing.T, ctx *mock.Context, exe *sortexec.SortExec, sortCase *testutil.SortCase, dataSource *testutil.MockDataSource) { if exe == nil { exe = buildSortExec(sortCase, dataSource) } @@ -91,15 +91,13 @@ func TestParallelSortSpillDisk(t *testing.T) { ctx.GetSessionVars().MemTracker = memory.NewTracker(memory.LabelForSQLText, hardLimit1) ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(memory.LabelForSQLText, -1) ctx.GetSessionVars().StmtCtx.MemTracker.AttachTo(ctx.GetSessionVars().MemTracker) - // TODO use variable to choose parallel mode after system variable is added - // ctx.GetSessionVars().EnableParallelSort = true schema := expression.NewSchema(sortCase.Columns()...) dataSource := buildDataSource(sortCase, schema) exe := buildSortExec(sortCase, dataSource) for i := 0; i < 10; i++ { - oneSpillCase(t, ctx, nil, sortCase, schema, dataSource) - oneSpillCase(t, ctx, exe, sortCase, schema, dataSource) + oneSpillCase(t, nil, sortCase, schema, dataSource) + oneSpillCase(t, exe, sortCase, schema, dataSource) } ctx.GetSessionVars().MemTracker = memory.NewTracker(memory.LabelForSQLText, hardLimit2) @@ -129,21 +127,19 @@ func TestParallelSortSpillDiskFailpoint(t *testing.T) { ctx.GetSessionVars().MemTracker = memory.NewTracker(memory.LabelForSQLText, hardLimit1) ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(memory.LabelForSQLText, -1) ctx.GetSessionVars().StmtCtx.MemTracker.AttachTo(ctx.GetSessionVars().MemTracker) - // TODO use variable to choose parallel mode after system variable is added - // ctx.GetSessionVars().EnableParallelSort = true schema := expression.NewSchema(sortCase.Columns()...) dataSource := buildDataSource(sortCase, schema) exe := buildSortExec(sortCase, dataSource) for i := 0; i < 20; i++ { - failpointNoMemoryDataTest(t, ctx, nil, sortCase, schema, dataSource) - failpointNoMemoryDataTest(t, ctx, exe, sortCase, schema, dataSource) + failpointNoMemoryDataTest(t, nil, sortCase, dataSource) + failpointNoMemoryDataTest(t, exe, sortCase, dataSource) } ctx.GetSessionVars().MemTracker = memory.NewTracker(memory.LabelForSQLText, hardLimit2) ctx.GetSessionVars().StmtCtx.MemTracker.AttachTo(ctx.GetSessionVars().MemTracker) for i := 0; i < 20; i++ { - failpointDataInMemoryThenSpillTest(t, ctx, nil, sortCase, schema, dataSource) - failpointDataInMemoryThenSpillTest(t, ctx, exe, sortCase, schema, dataSource) + failpointDataInMemoryThenSpillTest(t, ctx, nil, sortCase, dataSource) + failpointDataInMemoryThenSpillTest(t, ctx, exe, sortCase, dataSource) } } diff --git a/pkg/executor/sortexec/parallel_sort_test.go b/pkg/executor/sortexec/parallel_sort_test.go index 096cd41026899..36d6858c28a45 100644 --- a/pkg/executor/sortexec/parallel_sort_test.go +++ b/pkg/executor/sortexec/parallel_sort_test.go @@ -36,8 +36,6 @@ func executeInFailpoint(t *testing.T, exe *sortexec.SortExec, hardLimit int64, t tmpCtx := context.Background() err := exe.Open(tmpCtx) require.NoError(t, err) - exe.IsUnparallel = false - exe.InitInParallelModeForTest() goRoutineWaiter := sync.WaitGroup{} goRoutineWaiter.Add(1) @@ -85,8 +83,6 @@ func parallelSortTest(t *testing.T, ctx *mock.Context, exe *sortexec.SortExec, s ctx.GetSessionVars().MemTracker = memory.NewTracker(memory.LabelForSQLText, -1) ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(memory.LabelForSQLText, -1) ctx.GetSessionVars().StmtCtx.MemTracker.AttachTo(ctx.GetSessionVars().MemTracker) - // TODO use variable to choose parallel mode after system variable is added - // ctx.GetSessionVars().EnableParallelSort = true if exe == nil { exe = buildSortExec(sortCase, dataSource) @@ -105,8 +101,6 @@ func failpointTest(t *testing.T, ctx *mock.Context, exe *sortexec.SortExec, sort ctx.GetSessionVars().MemTracker = memory.NewTracker(memory.LabelForSQLText, -1) ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(memory.LabelForSQLText, -1) ctx.GetSessionVars().StmtCtx.MemTracker.AttachTo(ctx.GetSessionVars().MemTracker) - // TODO use variable to choose parallel mode after system variable is added - // ctx.GetSessionVars().EnableParallelSort = true if exe == nil { exe = buildSortExec(sortCase, dataSource) } diff --git a/pkg/executor/sortexec/sort.go b/pkg/executor/sortexec/sort.go index 923949fc3ee9f..ffb94cf1d122b 100644 --- a/pkg/executor/sortexec/sort.go +++ b/pkg/executor/sortexec/sort.go @@ -55,6 +55,7 @@ type SortExec struct { memTracker *memory.Tracker diskTracker *disk.Tracker + // TODO delete this variable in the future and remove the unparallel sort IsUnparallel bool finishCh chan struct{} @@ -160,7 +161,7 @@ func (e *SortExec) Open(ctx context.Context) error { e.diskTracker.AttachTo(e.Ctx().GetSessionVars().StmtCtx.DiskTracker) } - e.IsUnparallel = true + e.IsUnparallel = false if e.IsUnparallel { e.Unparallel.Idx = 0 e.Unparallel.sortPartitions = e.Unparallel.sortPartitions[:0] @@ -185,24 +186,9 @@ func (e *SortExec) Open(ctx context.Context) error { return exec.Open(ctx, e.Children(0)) } -// InitInParallelModeForTest is a function for test -// After system variable is added, we can delete this function -func (e *SortExec) InitInParallelModeForTest() { - e.Parallel.workers = make([]*parallelSortWorker, e.Ctx().GetSessionVars().ExecutorConcurrency) - e.Parallel.chunkChannel = make(chan *chunkWithMemoryUsage, e.Ctx().GetSessionVars().ExecutorConcurrency) - e.Parallel.fetcherAndWorkerSyncer = &sync.WaitGroup{} - e.Parallel.sortedRowsIters = make([]*chunk.Iterator4Slice, len(e.Parallel.workers)) - e.Parallel.resultChannel = make(chan rowWithError, e.MaxChunkSize()) - e.Parallel.closeSync = make(chan struct{}) - e.Parallel.merger = newMultiWayMerger(&memorySource{sortedRowsIters: e.Parallel.sortedRowsIters}, e.lessRow) - e.Parallel.spillHelper = newParallelSortSpillHelper(e, exec.RetTypes(e), e.finishCh, e.lessRow, e.Parallel.resultChannel) - e.Parallel.spillAction = newParallelSortSpillDiskAction(e.Parallel.spillHelper) - for i := range e.Parallel.sortedRowsIters { - e.Parallel.sortedRowsIters[i] = chunk.NewIterator4Slice(nil) - } - if e.enableTmpStorageOnOOM { - e.Ctx().GetSessionVars().MemTracker.FallbackOldAndSetNewAction(e.Parallel.spillAction) - } +func (e *SortExec) InitUnparallelModeForTest() { + e.Unparallel.Idx = 0 + e.Unparallel.sortPartitions = e.Unparallel.sortPartitions[:0] } // Next implements the Executor Next interface. diff --git a/pkg/executor/sortexec/sort_spill_test.go b/pkg/executor/sortexec/sort_spill_test.go index 1c2cb46b02a0b..553497ccb8052 100644 --- a/pkg/executor/sortexec/sort_spill_test.go +++ b/pkg/executor/sortexec/sort_spill_test.go @@ -174,9 +174,9 @@ func executeSortExecutor(t *testing.T, exe *sortexec.SortExec, isParallelSort bo tmpCtx := context.Background() err := exe.Open(tmpCtx) require.NoError(t, err) - if isParallelSort { - exe.IsUnparallel = false - exe.InitInParallelModeForTest() + if !isParallelSort { + exe.IsUnparallel = true + exe.InitUnparallelModeForTest() } resultChunks := make([]*chunk.Chunk, 0) @@ -196,9 +196,9 @@ func executeSortExecutorAndManullyTriggerSpill(t *testing.T, exe *sortexec.SortE tmpCtx := context.Background() err := exe.Open(tmpCtx) require.NoError(t, err) - if isParallelSort { - exe.IsUnparallel = false - exe.InitInParallelModeForTest() + if !isParallelSort { + exe.IsUnparallel = true + exe.InitUnparallelModeForTest() } resultChunks := make([]*chunk.Chunk, 0) From ce3628add3ad3914a851dd09d41b69dae0b80af1 Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Fri, 24 May 2024 14:59:05 +0800 Subject: [PATCH 02/12] remove todo --- pkg/executor/sortexec/sort_spill_test.go | 8 -------- 1 file changed, 8 deletions(-) diff --git a/pkg/executor/sortexec/sort_spill_test.go b/pkg/executor/sortexec/sort_spill_test.go index 553497ccb8052..052798ef5547d 100644 --- a/pkg/executor/sortexec/sort_spill_test.go +++ b/pkg/executor/sortexec/sort_spill_test.go @@ -236,8 +236,6 @@ func onePartitionAndAllDataInMemoryCase(t *testing.T, ctx *mock.Context, sortCas ctx.GetSessionVars().MemTracker = memory.NewTracker(memory.LabelForSQLText, 1048576) ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(memory.LabelForSQLText, -1) ctx.GetSessionVars().StmtCtx.MemTracker.AttachTo(ctx.GetSessionVars().MemTracker) - // TODO use variable to choose parallel mode after system variable is added - // ctx.GetSessionVars().EnableParallelSort = false schema := expression.NewSchema(sortCase.Columns()...) dataSource := buildDataSource(sortCase, schema) exe := buildSortExec(sortCase, dataSource) @@ -259,8 +257,6 @@ func onePartitionAndAllDataInDiskCase(t *testing.T, ctx *mock.Context, sortCase ctx.GetSessionVars().MemTracker = memory.NewTracker(memory.LabelForSQLText, 50000) ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(memory.LabelForSQLText, -1) ctx.GetSessionVars().StmtCtx.MemTracker.AttachTo(ctx.GetSessionVars().MemTracker) - // TODO use variable to choose parallel mode after system variable is added - // ctx.GetSessionVars().EnableParallelSort = false schema := expression.NewSchema(sortCase.Columns()...) dataSource := buildDataSource(sortCase, schema) exe := buildSortExec(sortCase, dataSource) @@ -289,8 +285,6 @@ func multiPartitionCase(t *testing.T, ctx *mock.Context, sortCase *testutil.Sort ctx.GetSessionVars().MemTracker = memory.NewTracker(memory.LabelForSQLText, 10000) ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(memory.LabelForSQLText, -1) ctx.GetSessionVars().StmtCtx.MemTracker.AttachTo(ctx.GetSessionVars().MemTracker) - // TODO use variable to choose parallel mode after system variable is added - // ctx.GetSessionVars().EnableParallelSort = false schema := expression.NewSchema(sortCase.Columns()...) dataSource := buildDataSource(sortCase, schema) exe := buildSortExec(sortCase, dataSource) @@ -330,8 +324,6 @@ func inMemoryThenSpillCase(t *testing.T, ctx *mock.Context, sortCase *testutil.S ctx.GetSessionVars().MemTracker = memory.NewTracker(memory.LabelForSQLText, hardLimit) ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(memory.LabelForSQLText, -1) ctx.GetSessionVars().StmtCtx.MemTracker.AttachTo(ctx.GetSessionVars().MemTracker) - // TODO use variable to choose parallel mode after system variable is added - // ctx.GetSessionVars().EnableParallelSort = false schema := expression.NewSchema(sortCase.Columns()...) dataSource := buildDataSource(sortCase, schema) exe := buildSortExec(sortCase, dataSource) From 96a51000c56ce0cc0d025bfd81442ddbb7910d2a Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Fri, 24 May 2024 15:06:06 +0800 Subject: [PATCH 03/12] fix ci --- pkg/executor/sortexec/sort.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/executor/sortexec/sort.go b/pkg/executor/sortexec/sort.go index ffb94cf1d122b..e37a1a9fae75e 100644 --- a/pkg/executor/sortexec/sort.go +++ b/pkg/executor/sortexec/sort.go @@ -186,6 +186,7 @@ func (e *SortExec) Open(ctx context.Context) error { return exec.Open(ctx, e.Children(0)) } +// InitUnparallelModeForTest is for unit test func (e *SortExec) InitUnparallelModeForTest() { e.Unparallel.Idx = 0 e.Unparallel.sortPartitions = e.Unparallel.sortPartitions[:0] From 90cf29a9fc987a21f61da4606addeead6532cc2a Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Fri, 24 May 2024 16:13:35 +0800 Subject: [PATCH 04/12] fix --- pkg/executor/sortexec/sort.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/executor/sortexec/sort.go b/pkg/executor/sortexec/sort.go index e37a1a9fae75e..2a3d76715fef4 100644 --- a/pkg/executor/sortexec/sort.go +++ b/pkg/executor/sortexec/sort.go @@ -759,6 +759,9 @@ func (e *SortExec) buildKeyColumns() { func (e *SortExec) lessRow(rowI, rowJ chunk.Row) int { for i, colIdx := range e.keyColumns { cmpFunc := e.keyCmpFuncs[i] + if cmpFunc == nil { + break + } cmp := cmpFunc(rowI, colIdx, rowJ, colIdx) if e.ByItems[i].Desc { cmp = -cmp From aa5ddbfaf1f36681f0c6ab9c03ea727c352eeb56 Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Fri, 24 May 2024 16:48:37 +0800 Subject: [PATCH 05/12] fix --- pkg/executor/sortexec/sort.go | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/pkg/executor/sortexec/sort.go b/pkg/executor/sortexec/sort.go index 2a3d76715fef4..7a26368c8c7ea 100644 --- a/pkg/executor/sortexec/sort.go +++ b/pkg/executor/sortexec/sort.go @@ -16,6 +16,7 @@ package sortexec import ( "context" + "fmt" "sync" "sync/atomic" "time" @@ -97,6 +98,11 @@ type SortExec struct { // Close implements the Executor Close interface. func (e *SortExec) Close() error { + defer func() { + if r := recover(); r != nil { + fmt.Printf("123") + } + }() // TopN not initializes `e.finishCh` but it will call the Close function if e.finishCh != nil { close(e.finishCh) @@ -125,11 +131,13 @@ func (e *SortExec) Close() error { // will use `e.Parallel.workers` and `e.Parallel.merger`. channel.Clear(e.Parallel.resultChannel) for i := range e.Parallel.workers { - e.Parallel.workers[i].batchRows = nil - e.Parallel.workers[i].localSortedRows = nil - e.Parallel.workers[i].sortedRowsIter = nil - e.Parallel.workers[i].merger = nil - e.Parallel.workers[i].memTracker.ReplaceBytesUsed(0) + if e.Parallel.workers[i] != nil { + e.Parallel.workers[i].batchRows = nil + e.Parallel.workers[i].localSortedRows = nil + e.Parallel.workers[i].sortedRowsIter = nil + e.Parallel.workers[i].merger = nil + e.Parallel.workers[i].memTracker.ReplaceBytesUsed(0) + } } e.Parallel.merger = nil if e.Parallel.spillAction != nil { From 5bbe0b8ebb1a3757e28fc19a7592c20c1eaa9acd Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Mon, 27 May 2024 10:41:41 +0800 Subject: [PATCH 06/12] address comment --- pkg/executor/sortexec/parallel_sort_worker.go | 8 ++++++++ pkg/executor/sortexec/sort.go | 12 +----------- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/pkg/executor/sortexec/parallel_sort_worker.go b/pkg/executor/sortexec/parallel_sort_worker.go index ecf43fd3b8500..082b935060ecc 100644 --- a/pkg/executor/sortexec/parallel_sort_worker.go +++ b/pkg/executor/sortexec/parallel_sort_worker.go @@ -79,6 +79,14 @@ func newParallelSortWorker( } } +func (p *parallelSortWorker) reset() { + p.batchRows = nil + p.localSortedRows = nil + p.sortedRowsIter = nil + p.merger = nil + p.memTracker.ReplaceBytesUsed(0) +} + func (p *parallelSortWorker) injectFailPointForParallelSortWorker(triggerFactor int32) { injectParallelSortRandomFail(triggerFactor) failpoint.Inject("SlowSomeWorkers", func(val failpoint.Value) { diff --git a/pkg/executor/sortexec/sort.go b/pkg/executor/sortexec/sort.go index 7a26368c8c7ea..03734c0965e40 100644 --- a/pkg/executor/sortexec/sort.go +++ b/pkg/executor/sortexec/sort.go @@ -16,7 +16,6 @@ package sortexec import ( "context" - "fmt" "sync" "sync/atomic" "time" @@ -98,11 +97,6 @@ type SortExec struct { // Close implements the Executor Close interface. func (e *SortExec) Close() error { - defer func() { - if r := recover(); r != nil { - fmt.Printf("123") - } - }() // TopN not initializes `e.finishCh` but it will call the Close function if e.finishCh != nil { close(e.finishCh) @@ -132,11 +126,7 @@ func (e *SortExec) Close() error { channel.Clear(e.Parallel.resultChannel) for i := range e.Parallel.workers { if e.Parallel.workers[i] != nil { - e.Parallel.workers[i].batchRows = nil - e.Parallel.workers[i].localSortedRows = nil - e.Parallel.workers[i].sortedRowsIter = nil - e.Parallel.workers[i].merger = nil - e.Parallel.workers[i].memTracker.ReplaceBytesUsed(0) + e.Parallel.workers[i].reset() } } e.Parallel.merger = nil From 25bb24ac3c037bb897f6237ca78ca26dc24379c2 Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Mon, 27 May 2024 10:52:18 +0800 Subject: [PATCH 07/12] fix ut --- pkg/executor/sortexec/sort.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/pkg/executor/sortexec/sort.go b/pkg/executor/sortexec/sort.go index 03734c0965e40..88b8ed13aee05 100644 --- a/pkg/executor/sortexec/sort.go +++ b/pkg/executor/sortexec/sort.go @@ -695,6 +695,14 @@ func (e *SortExec) fetchChunksFromChild(ctx context.Context) { e.Parallel.resultChannel <- rowWithError{err: err} } + failpoint.Inject("SignalCheckpointForSort", func(val failpoint.Value) { + if val.(bool) { + if e.Ctx().GetSessionVars().ConnectionID == 123456 { + e.Ctx().GetSessionVars().MemTracker.Killer.SendKillSignal(sqlkiller.QueryMemoryExceeded) + } + } + }) + // We must place it after the spill as workers will process its received // chunks after channel is closed and this will cause data race. close(e.Parallel.chunkChannel) From fe7533bbe52ebee2a91bea947183224b75018834 Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Mon, 27 May 2024 11:59:56 +0800 Subject: [PATCH 08/12] add nil check --- pkg/executor/sortexec/sort.go | 31 ++++++++++++++++--------------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/pkg/executor/sortexec/sort.go b/pkg/executor/sortexec/sort.go index 88b8ed13aee05..bac9770505440 100644 --- a/pkg/executor/sortexec/sort.go +++ b/pkg/executor/sortexec/sort.go @@ -765,15 +765,14 @@ func (e *SortExec) buildKeyColumns() { func (e *SortExec) lessRow(rowI, rowJ chunk.Row) int { for i, colIdx := range e.keyColumns { cmpFunc := e.keyCmpFuncs[i] - if cmpFunc == nil { - break - } - cmp := cmpFunc(rowI, colIdx, rowJ, colIdx) - if e.ByItems[i].Desc { - cmp = -cmp - } - if cmp != 0 { - return cmp + if cmpFunc != nil { + cmp := cmpFunc(rowI, colIdx, rowJ, colIdx) + if e.ByItems[i].Desc { + cmp = -cmp + } + if cmp != 0 { + return cmp + } } } return 0 @@ -782,12 +781,14 @@ func (e *SortExec) lessRow(rowI, rowJ chunk.Row) int { func (e *SortExec) compareRow(rowI, rowJ chunk.Row) int { for i, colIdx := range e.keyColumns { cmpFunc := e.keyCmpFuncs[i] - cmp := cmpFunc(rowI, colIdx, rowJ, colIdx) - if e.ByItems[i].Desc { - cmp = -cmp - } - if cmp != 0 { - return cmp + if cmpFunc != nil { + cmp := cmpFunc(rowI, colIdx, rowJ, colIdx) + if e.ByItems[i].Desc { + cmp = -cmp + } + if cmp != 0 { + return cmp + } } } return 0 From 0c9b43371e4c483b56214a7f2d41f281e88a3914 Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Mon, 27 May 2024 14:28:05 +0800 Subject: [PATCH 09/12] add cmpNullConst func --- pkg/executor/sortexec/sort.go | 16 +++++++++++++--- pkg/util/chunk/compare.go | 6 ++++++ 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/pkg/executor/sortexec/sort.go b/pkg/executor/sortexec/sort.go index bac9770505440..2d3ae4047afc1 100644 --- a/pkg/executor/sortexec/sort.go +++ b/pkg/executor/sortexec/sort.go @@ -16,6 +16,8 @@ package sortexec import ( "context" + "errors" + "fmt" "sync" "sync/atomic" "time" @@ -257,9 +259,13 @@ func (e *SortExec) InitUnparallelModeForTest() { */ func (e *SortExec) Next(ctx context.Context, req *chunk.Chunk) error { if e.fetched.CompareAndSwap(false, true) { - e.initCompareFuncs() + err := e.initCompareFuncs() + if err != nil { + return err + } + e.buildKeyColumns() - err := e.fetchChunks(ctx) + err = e.fetchChunks(ctx) if err != nil { return err } @@ -746,12 +752,16 @@ func (e *SortExec) fetchChunksFromChild(ctx context.Context) { } } -func (e *SortExec) initCompareFuncs() { +func (e *SortExec) initCompareFuncs() error { e.keyCmpFuncs = make([]chunk.CompareFunc, len(e.ByItems)) for i := range e.ByItems { keyType := e.ByItems[i].Expr.GetType() e.keyCmpFuncs[i] = chunk.GetCompareFunc(keyType) + if e.keyCmpFuncs[i] == nil { + return errors.New(fmt.Sprintf("Sort executor not supports type %s", types.TypeStr(keyType.GetType()))) + } } + return nil } func (e *SortExec) buildKeyColumns() { diff --git a/pkg/util/chunk/compare.go b/pkg/util/chunk/compare.go index b2f6f6bd0d0d4..86ab092f66f84 100644 --- a/pkg/util/chunk/compare.go +++ b/pkg/util/chunk/compare.go @@ -53,6 +53,8 @@ func GetCompareFunc(tp *types.FieldType) CompareFunc { return cmpBit case mysql.TypeJSON: return cmpJSON + case mysql.TypeNull: + return cmpNullConst } return nil } @@ -169,6 +171,10 @@ func cmpJSON(l Row, lCol int, r Row, rCol int) int { return types.CompareBinaryJSON(lJ, rJ) } +func cmpNullConst(_ Row, _ int, _ Row, _ int) int { + return 0 +} + // Compare compares the value with ad. // We assume that the collation information of the column is the same with the datum. func Compare(row Row, colIdx int, ad *types.Datum) int { From 9933a70b15d3e83b5a86e9476d7e6184c9b91f56 Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Mon, 27 May 2024 17:04:36 +0800 Subject: [PATCH 10/12] fix ci --- pkg/executor/sortexec/sort.go | 6 +++--- pkg/executor/sortexec/topn.go | 6 +++++- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/pkg/executor/sortexec/sort.go b/pkg/executor/sortexec/sort.go index 2d3ae4047afc1..dc1bfd0927b07 100644 --- a/pkg/executor/sortexec/sort.go +++ b/pkg/executor/sortexec/sort.go @@ -16,12 +16,12 @@ package sortexec import ( "context" - "errors" - "fmt" "sync" "sync/atomic" "time" + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/executor/internal/exec" "github.com/pingcap/tidb/pkg/expression" @@ -758,7 +758,7 @@ func (e *SortExec) initCompareFuncs() error { keyType := e.ByItems[i].Expr.GetType() e.keyCmpFuncs[i] = chunk.GetCompareFunc(keyType) if e.keyCmpFuncs[i] == nil { - return errors.New(fmt.Sprintf("Sort executor not supports type %s", types.TypeStr(keyType.GetType()))) + return errors.Errorf("Sort executor not supports type %s", types.TypeStr(keyType.GetType())) } } return nil diff --git a/pkg/executor/sortexec/topn.go b/pkg/executor/sortexec/topn.go index 146daa01ebd9b..2f6b0607ab1f6 100644 --- a/pkg/executor/sortexec/topn.go +++ b/pkg/executor/sortexec/topn.go @@ -261,7 +261,11 @@ func (e *TopNExec) fetchChunks(ctx context.Context) error { } func (e *TopNExec) loadChunksUntilTotalLimit(ctx context.Context) error { - e.initCompareFuncs() + err := e.initCompareFuncs() + if err != nil { + return err + } + e.buildKeyColumns() e.chkHeap.init(e, e.memTracker, e.Limit.Offset+e.Limit.Count, int(e.Limit.Offset), e.greaterRow, e.RetFieldTypes()) for uint64(e.chkHeap.rowChunks.Len()) < e.chkHeap.totalLimit { From 717352afdfa93f21c585c771f1affa22ad916bc6 Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Mon, 27 May 2024 17:28:44 +0800 Subject: [PATCH 11/12] fix ci --- pkg/executor/sortexec/sort.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/executor/sortexec/sort.go b/pkg/executor/sortexec/sort.go index dc1bfd0927b07..efd964c103ecc 100644 --- a/pkg/executor/sortexec/sort.go +++ b/pkg/executor/sortexec/sort.go @@ -21,7 +21,6 @@ import ( "time" "github.com/pingcap/errors" - "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/executor/internal/exec" "github.com/pingcap/tidb/pkg/expression" From c74228ccdbefa8c7192dd02f153748b56dd93a2d Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Tue, 28 May 2024 13:56:43 +0800 Subject: [PATCH 12/12] delete cmp != nil --- pkg/executor/sortexec/sort.go | 28 ++++++++++++---------------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/pkg/executor/sortexec/sort.go b/pkg/executor/sortexec/sort.go index efd964c103ecc..bd0f6971877ba 100644 --- a/pkg/executor/sortexec/sort.go +++ b/pkg/executor/sortexec/sort.go @@ -774,14 +774,12 @@ func (e *SortExec) buildKeyColumns() { func (e *SortExec) lessRow(rowI, rowJ chunk.Row) int { for i, colIdx := range e.keyColumns { cmpFunc := e.keyCmpFuncs[i] - if cmpFunc != nil { - cmp := cmpFunc(rowI, colIdx, rowJ, colIdx) - if e.ByItems[i].Desc { - cmp = -cmp - } - if cmp != 0 { - return cmp - } + cmp := cmpFunc(rowI, colIdx, rowJ, colIdx) + if e.ByItems[i].Desc { + cmp = -cmp + } + if cmp != 0 { + return cmp } } return 0 @@ -790,14 +788,12 @@ func (e *SortExec) lessRow(rowI, rowJ chunk.Row) int { func (e *SortExec) compareRow(rowI, rowJ chunk.Row) int { for i, colIdx := range e.keyColumns { cmpFunc := e.keyCmpFuncs[i] - if cmpFunc != nil { - cmp := cmpFunc(rowI, colIdx, rowJ, colIdx) - if e.ByItems[i].Desc { - cmp = -cmp - } - if cmp != 0 { - return cmp - } + cmp := cmpFunc(rowI, colIdx, rowJ, colIdx) + if e.ByItems[i].Desc { + cmp = -cmp + } + if cmp != 0 { + return cmp } } return 0