Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: enable parallel sort #53537

Merged
merged 13 commits into from
Jun 6, 2024
22 changes: 9 additions & 13 deletions pkg/executor/sortexec/parallel_sort_spill_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
var hardLimit1 = int64(100000)
var hardLimit2 = hardLimit1 * 10

func oneSpillCase(t *testing.T, ctx *mock.Context, exe *sortexec.SortExec, sortCase *testutil.SortCase, schema *expression.Schema, dataSource *testutil.MockDataSource) {
func oneSpillCase(t *testing.T, exe *sortexec.SortExec, sortCase *testutil.SortCase, schema *expression.Schema, dataSource *testutil.MockDataSource) {
if exe == nil {
exe = buildSortExec(sortCase, dataSource)
}
Expand Down Expand Up @@ -60,15 +60,15 @@ func inMemoryThenSpill(t *testing.T, ctx *mock.Context, exe *sortexec.SortExec,
require.True(t, checkCorrectness(schema, exe, dataSource, resultChunks))
}

func failpointNoMemoryDataTest(t *testing.T, ctx *mock.Context, exe *sortexec.SortExec, sortCase *testutil.SortCase, schema *expression.Schema, dataSource *testutil.MockDataSource) {
func failpointNoMemoryDataTest(t *testing.T, exe *sortexec.SortExec, sortCase *testutil.SortCase, dataSource *testutil.MockDataSource) {
if exe == nil {
exe = buildSortExec(sortCase, dataSource)
}
dataSource.PrepareChunks()
executeInFailpoint(t, exe, 0, nil)
}

func failpointDataInMemoryThenSpillTest(t *testing.T, ctx *mock.Context, exe *sortexec.SortExec, sortCase *testutil.SortCase, schema *expression.Schema, dataSource *testutil.MockDataSource) {
func failpointDataInMemoryThenSpillTest(t *testing.T, ctx *mock.Context, exe *sortexec.SortExec, sortCase *testutil.SortCase, dataSource *testutil.MockDataSource) {
if exe == nil {
exe = buildSortExec(sortCase, dataSource)
}
Expand All @@ -91,15 +91,13 @@ func TestParallelSortSpillDisk(t *testing.T) {
ctx.GetSessionVars().MemTracker = memory.NewTracker(memory.LabelForSQLText, hardLimit1)
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(memory.LabelForSQLText, -1)
ctx.GetSessionVars().StmtCtx.MemTracker.AttachTo(ctx.GetSessionVars().MemTracker)
// TODO use variable to choose parallel mode after system variable is added
// ctx.GetSessionVars().EnableParallelSort = true

schema := expression.NewSchema(sortCase.Columns()...)
dataSource := buildDataSource(sortCase, schema)
exe := buildSortExec(sortCase, dataSource)
for i := 0; i < 10; i++ {
oneSpillCase(t, ctx, nil, sortCase, schema, dataSource)
oneSpillCase(t, ctx, exe, sortCase, schema, dataSource)
oneSpillCase(t, nil, sortCase, schema, dataSource)
oneSpillCase(t, exe, sortCase, schema, dataSource)
}

ctx.GetSessionVars().MemTracker = memory.NewTracker(memory.LabelForSQLText, hardLimit2)
Expand Down Expand Up @@ -129,21 +127,19 @@ func TestParallelSortSpillDiskFailpoint(t *testing.T) {
ctx.GetSessionVars().MemTracker = memory.NewTracker(memory.LabelForSQLText, hardLimit1)
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(memory.LabelForSQLText, -1)
ctx.GetSessionVars().StmtCtx.MemTracker.AttachTo(ctx.GetSessionVars().MemTracker)
// TODO use variable to choose parallel mode after system variable is added
// ctx.GetSessionVars().EnableParallelSort = true

schema := expression.NewSchema(sortCase.Columns()...)
dataSource := buildDataSource(sortCase, schema)
exe := buildSortExec(sortCase, dataSource)
for i := 0; i < 20; i++ {
failpointNoMemoryDataTest(t, ctx, nil, sortCase, schema, dataSource)
failpointNoMemoryDataTest(t, ctx, exe, sortCase, schema, dataSource)
failpointNoMemoryDataTest(t, nil, sortCase, dataSource)
failpointNoMemoryDataTest(t, exe, sortCase, dataSource)
}

ctx.GetSessionVars().MemTracker = memory.NewTracker(memory.LabelForSQLText, hardLimit2)
ctx.GetSessionVars().StmtCtx.MemTracker.AttachTo(ctx.GetSessionVars().MemTracker)
for i := 0; i < 20; i++ {
failpointDataInMemoryThenSpillTest(t, ctx, nil, sortCase, schema, dataSource)
failpointDataInMemoryThenSpillTest(t, ctx, exe, sortCase, schema, dataSource)
failpointDataInMemoryThenSpillTest(t, ctx, nil, sortCase, dataSource)
failpointDataInMemoryThenSpillTest(t, ctx, exe, sortCase, dataSource)
}
}
6 changes: 0 additions & 6 deletions pkg/executor/sortexec/parallel_sort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ func executeInFailpoint(t *testing.T, exe *sortexec.SortExec, hardLimit int64, t
tmpCtx := context.Background()
err := exe.Open(tmpCtx)
require.NoError(t, err)
exe.IsUnparallel = false
exe.InitInParallelModeForTest()

goRoutineWaiter := sync.WaitGroup{}
goRoutineWaiter.Add(1)
Expand Down Expand Up @@ -85,8 +83,6 @@ func parallelSortTest(t *testing.T, ctx *mock.Context, exe *sortexec.SortExec, s
ctx.GetSessionVars().MemTracker = memory.NewTracker(memory.LabelForSQLText, -1)
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(memory.LabelForSQLText, -1)
ctx.GetSessionVars().StmtCtx.MemTracker.AttachTo(ctx.GetSessionVars().MemTracker)
// TODO use variable to choose parallel mode after system variable is added
// ctx.GetSessionVars().EnableParallelSort = true

if exe == nil {
exe = buildSortExec(sortCase, dataSource)
Expand All @@ -105,8 +101,6 @@ func failpointTest(t *testing.T, ctx *mock.Context, exe *sortexec.SortExec, sort
ctx.GetSessionVars().MemTracker = memory.NewTracker(memory.LabelForSQLText, -1)
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(memory.LabelForSQLText, -1)
ctx.GetSessionVars().StmtCtx.MemTracker.AttachTo(ctx.GetSessionVars().MemTracker)
// TODO use variable to choose parallel mode after system variable is added
// ctx.GetSessionVars().EnableParallelSort = true
if exe == nil {
exe = buildSortExec(sortCase, dataSource)
}
Expand Down
46 changes: 22 additions & 24 deletions pkg/executor/sortexec/sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package sortexec

import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -55,6 +56,7 @@ type SortExec struct {
memTracker *memory.Tracker
diskTracker *disk.Tracker

// TODO delete this variable in the future and remove the unparallel sort
IsUnparallel bool

finishCh chan struct{}
Expand Down Expand Up @@ -96,6 +98,11 @@ type SortExec struct {

// Close implements the Executor Close interface.
func (e *SortExec) Close() error {
defer func() {
if r := recover(); r != nil {
fmt.Printf("123")
xzhangxian1008 marked this conversation as resolved.
Show resolved Hide resolved
}
}()
// TopN not initializes `e.finishCh` but it will call the Close function
if e.finishCh != nil {
close(e.finishCh)
Expand Down Expand Up @@ -124,11 +131,13 @@ func (e *SortExec) Close() error {
// will use `e.Parallel.workers` and `e.Parallel.merger`.
channel.Clear(e.Parallel.resultChannel)
for i := range e.Parallel.workers {
e.Parallel.workers[i].batchRows = nil
e.Parallel.workers[i].localSortedRows = nil
e.Parallel.workers[i].sortedRowsIter = nil
e.Parallel.workers[i].merger = nil
e.Parallel.workers[i].memTracker.ReplaceBytesUsed(0)
if e.Parallel.workers[i] != nil {
e.Parallel.workers[i].batchRows = nil
e.Parallel.workers[i].localSortedRows = nil
e.Parallel.workers[i].sortedRowsIter = nil
e.Parallel.workers[i].merger = nil
e.Parallel.workers[i].memTracker.ReplaceBytesUsed(0)
xzhangxian1008 marked this conversation as resolved.
Show resolved Hide resolved
}
}
e.Parallel.merger = nil
if e.Parallel.spillAction != nil {
Expand Down Expand Up @@ -160,7 +169,7 @@ func (e *SortExec) Open(ctx context.Context) error {
e.diskTracker.AttachTo(e.Ctx().GetSessionVars().StmtCtx.DiskTracker)
}

e.IsUnparallel = true
e.IsUnparallel = false
windtalker marked this conversation as resolved.
Show resolved Hide resolved
if e.IsUnparallel {
e.Unparallel.Idx = 0
e.Unparallel.sortPartitions = e.Unparallel.sortPartitions[:0]
Expand All @@ -185,24 +194,10 @@ func (e *SortExec) Open(ctx context.Context) error {
return exec.Open(ctx, e.Children(0))
}

// InitInParallelModeForTest is a function for test
// After system variable is added, we can delete this function
func (e *SortExec) InitInParallelModeForTest() {
e.Parallel.workers = make([]*parallelSortWorker, e.Ctx().GetSessionVars().ExecutorConcurrency)
e.Parallel.chunkChannel = make(chan *chunkWithMemoryUsage, e.Ctx().GetSessionVars().ExecutorConcurrency)
e.Parallel.fetcherAndWorkerSyncer = &sync.WaitGroup{}
e.Parallel.sortedRowsIters = make([]*chunk.Iterator4Slice, len(e.Parallel.workers))
e.Parallel.resultChannel = make(chan rowWithError, e.MaxChunkSize())
e.Parallel.closeSync = make(chan struct{})
e.Parallel.merger = newMultiWayMerger(&memorySource{sortedRowsIters: e.Parallel.sortedRowsIters}, e.lessRow)
e.Parallel.spillHelper = newParallelSortSpillHelper(e, exec.RetTypes(e), e.finishCh, e.lessRow, e.Parallel.resultChannel)
e.Parallel.spillAction = newParallelSortSpillDiskAction(e.Parallel.spillHelper)
for i := range e.Parallel.sortedRowsIters {
e.Parallel.sortedRowsIters[i] = chunk.NewIterator4Slice(nil)
}
if e.enableTmpStorageOnOOM {
e.Ctx().GetSessionVars().MemTracker.FallbackOldAndSetNewAction(e.Parallel.spillAction)
}
// InitUnparallelModeForTest is for unit test
func (e *SortExec) InitUnparallelModeForTest() {
e.Unparallel.Idx = 0
e.Unparallel.sortPartitions = e.Unparallel.sortPartitions[:0]
}

// Next implements the Executor Next interface.
Expand Down Expand Up @@ -772,6 +767,9 @@ func (e *SortExec) buildKeyColumns() {
func (e *SortExec) lessRow(rowI, rowJ chunk.Row) int {
for i, colIdx := range e.keyColumns {
cmpFunc := e.keyCmpFuncs[i]
if cmpFunc == nil {
break
xzhangxian1008 marked this conversation as resolved.
Show resolved Hide resolved
}
cmp := cmpFunc(rowI, colIdx, rowJ, colIdx)
if e.ByItems[i].Desc {
cmp = -cmp
Expand Down
20 changes: 6 additions & 14 deletions pkg/executor/sortexec/sort_spill_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,9 @@ func executeSortExecutor(t *testing.T, exe *sortexec.SortExec, isParallelSort bo
tmpCtx := context.Background()
err := exe.Open(tmpCtx)
require.NoError(t, err)
if isParallelSort {
exe.IsUnparallel = false
exe.InitInParallelModeForTest()
if !isParallelSort {
exe.IsUnparallel = true
exe.InitUnparallelModeForTest()
}

resultChunks := make([]*chunk.Chunk, 0)
Expand All @@ -196,9 +196,9 @@ func executeSortExecutorAndManullyTriggerSpill(t *testing.T, exe *sortexec.SortE
tmpCtx := context.Background()
err := exe.Open(tmpCtx)
require.NoError(t, err)
if isParallelSort {
exe.IsUnparallel = false
exe.InitInParallelModeForTest()
if !isParallelSort {
exe.IsUnparallel = true
exe.InitUnparallelModeForTest()
}

resultChunks := make([]*chunk.Chunk, 0)
Expand Down Expand Up @@ -236,8 +236,6 @@ func onePartitionAndAllDataInMemoryCase(t *testing.T, ctx *mock.Context, sortCas
ctx.GetSessionVars().MemTracker = memory.NewTracker(memory.LabelForSQLText, 1048576)
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(memory.LabelForSQLText, -1)
ctx.GetSessionVars().StmtCtx.MemTracker.AttachTo(ctx.GetSessionVars().MemTracker)
// TODO use variable to choose parallel mode after system variable is added
// ctx.GetSessionVars().EnableParallelSort = false
schema := expression.NewSchema(sortCase.Columns()...)
dataSource := buildDataSource(sortCase, schema)
exe := buildSortExec(sortCase, dataSource)
Expand All @@ -259,8 +257,6 @@ func onePartitionAndAllDataInDiskCase(t *testing.T, ctx *mock.Context, sortCase
ctx.GetSessionVars().MemTracker = memory.NewTracker(memory.LabelForSQLText, 50000)
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(memory.LabelForSQLText, -1)
ctx.GetSessionVars().StmtCtx.MemTracker.AttachTo(ctx.GetSessionVars().MemTracker)
// TODO use variable to choose parallel mode after system variable is added
// ctx.GetSessionVars().EnableParallelSort = false
schema := expression.NewSchema(sortCase.Columns()...)
dataSource := buildDataSource(sortCase, schema)
exe := buildSortExec(sortCase, dataSource)
Expand Down Expand Up @@ -289,8 +285,6 @@ func multiPartitionCase(t *testing.T, ctx *mock.Context, sortCase *testutil.Sort
ctx.GetSessionVars().MemTracker = memory.NewTracker(memory.LabelForSQLText, 10000)
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(memory.LabelForSQLText, -1)
ctx.GetSessionVars().StmtCtx.MemTracker.AttachTo(ctx.GetSessionVars().MemTracker)
// TODO use variable to choose parallel mode after system variable is added
// ctx.GetSessionVars().EnableParallelSort = false
schema := expression.NewSchema(sortCase.Columns()...)
dataSource := buildDataSource(sortCase, schema)
exe := buildSortExec(sortCase, dataSource)
Expand Down Expand Up @@ -330,8 +324,6 @@ func inMemoryThenSpillCase(t *testing.T, ctx *mock.Context, sortCase *testutil.S
ctx.GetSessionVars().MemTracker = memory.NewTracker(memory.LabelForSQLText, hardLimit)
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(memory.LabelForSQLText, -1)
ctx.GetSessionVars().StmtCtx.MemTracker.AttachTo(ctx.GetSessionVars().MemTracker)
// TODO use variable to choose parallel mode after system variable is added
// ctx.GetSessionVars().EnableParallelSort = false
schema := expression.NewSchema(sortCase.Columns()...)
dataSource := buildDataSource(sortCase, schema)
exe := buildSortExec(sortCase, dataSource)
Expand Down