From fd8791e8e50bf769ef6f06f6ca446300ee77f57b Mon Sep 17 00:00:00 2001 From: Chengpeng Yan <41809508+Reminiscent@users.noreply.github.com> Date: Thu, 21 May 2020 19:26:00 +0800 Subject: [PATCH] cherry pick #16248 to release-3.1 Signed-off-by: sre-bot --- executor/aggregate.go | 18 + executor/index_lookup_hash_join.go | 739 ++++++++++++++++++++++++++++ executor/index_lookup_join.go | 9 + executor/index_lookup_merge_join.go | 701 ++++++++++++++++++++++++++ executor/join.go | 15 + executor/projection.go | 21 + executor/shuffle.go | 381 ++++++++++++++ util/execdetails/execdetails.go | 52 ++ 8 files changed, 1936 insertions(+) create mode 100644 executor/index_lookup_hash_join.go create mode 100644 executor/index_lookup_merge_join.go create mode 100644 executor/shuffle.go diff --git a/executor/aggregate.go b/executor/aggregate.go index 1a23837e2d267..d98f5710122fa 100644 --- a/executor/aggregate.go +++ b/executor/aggregate.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/codec" + "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/set" "github.com/spaolacci/murmur3" @@ -225,6 +226,23 @@ func (e *HashAggExec) Close() error { for range e.finalOutputCh { } e.executed = false +<<<<<<< HEAD +======= + + if e.runtimeStats != nil { + var partialConcurrency, finalConcurrency int + if e.isUnparallelExec { + partialConcurrency = 0 + finalConcurrency = 0 + } else { + partialConcurrency = cap(e.partialWorkers) + finalConcurrency = cap(e.finalWorkers) + } + partialConcurrencyInfo := execdetails.NewConcurrencyInfo("PartialConcurrency", partialConcurrency) + finalConcurrencyInfo := execdetails.NewConcurrencyInfo("FinalConcurrency", finalConcurrency) + e.runtimeStats.SetConcurrencyInfo(partialConcurrencyInfo, finalConcurrencyInfo) + } +>>>>>>> 0d95b09... executor: Remove unnecessary information in explain analyze output (#16248) return e.baseExecutor.Close() } diff --git a/executor/index_lookup_hash_join.go b/executor/index_lookup_hash_join.go new file mode 100644 index 0000000000000..5ee933b5dc0a3 --- /dev/null +++ b/executor/index_lookup_hash_join.go @@ -0,0 +1,739 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package executor + +import ( + "context" + "hash" + "hash/fnv" + "sync" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/expression" + plannercore "github.com/pingcap/tidb/planner/core" + "github.com/pingcap/tidb/util" + "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/codec" + "github.com/pingcap/tidb/util/execdetails" + "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/memory" + "github.com/pingcap/tidb/util/ranger" + "go.uber.org/zap" +) + +// numResChkHold indicates the number of resource chunks that an inner worker +// holds at the same time. +// It's used in 2 cases individually: +// 1. IndexMergeJoin +// 2. IndexNestedLoopHashJoin: +// It's used when IndexNestedLoopHashJoin.keepOuterOrder is true. +// Otherwise, there will be at most `concurrency` resource chunks throughout +// the execution of IndexNestedLoopHashJoin. +const numResChkHold = 4 + +// IndexNestedLoopHashJoin employs one outer worker and N inner workers to +// execute concurrently. The output order is not promised. +// +// The execution flow is very similar to IndexLookUpReader: +// 1. The outer worker reads N outer rows, builds a task and sends it to the +// inner worker channel. +// 2. The inner worker receives the tasks and does 3 things for every task: +// 1. builds hash table from the outer rows +// 2. builds key ranges from outer rows and fetches inner rows +// 3. probes the hash table and sends the join result to the main thread channel. +// Note: step 1 and step 2 runs concurrently. +// 3. The main thread receives the join results. +type IndexNestedLoopHashJoin struct { + IndexLookUpJoin + resultCh chan *indexHashJoinResult + joinChkResourceCh []chan *chunk.Chunk + // We build individual joiner for each inner worker when using chunk-based + // execution, to avoid the concurrency of joiner.chk and joiner.selected. + joiners []joiner + keepOuterOrder bool + curTask *indexHashJoinTask + // taskCh is only used when `keepOuterOrder` is true. + taskCh chan *indexHashJoinTask +} + +type indexHashJoinOuterWorker struct { + outerWorker + innerCh chan *indexHashJoinTask + keepOuterOrder bool + // taskCh is only used when the outer order needs to be promised. + taskCh chan *indexHashJoinTask +} + +type indexHashJoinInnerWorker struct { + innerWorker + matchedOuterPtrs []chunk.RowPtr + joiner joiner + joinChkResourceCh chan *chunk.Chunk + // resultCh is valid only when indexNestedLoopHashJoin do not need to keep + // order. Otherwise, it will be nil. + resultCh chan *indexHashJoinResult + taskCh <-chan *indexHashJoinTask + wg *sync.WaitGroup + joinKeyBuf []byte + outerRowStatus []outerRowStatusFlag +} + +type indexHashJoinResult struct { + chk *chunk.Chunk + err error + src chan<- *chunk.Chunk +} + +type indexHashJoinTask struct { + *lookUpJoinTask + outerRowStatus [][]outerRowStatusFlag + lookupMap *rowHashMap + err error + keepOuterOrder bool + // resultCh is only used when the outer order needs to be promised. + resultCh chan *indexHashJoinResult + // matchedInnerRowPtrs is only valid when the outer order needs to be + // promised. Otherwise, it will be nil. + // len(matchedInnerRowPtrs) equals to + // lookUpJoinTask.outerResult.NumChunks(), and the elements of every + // matchedInnerRowPtrs[chkIdx][rowIdx] indicates the matched inner row ptrs + // of the corresponding outer row. + matchedInnerRowPtrs [][][]chunk.RowPtr +} + +// Open implements the IndexNestedLoopHashJoin Executor interface. +func (e *IndexNestedLoopHashJoin) Open(ctx context.Context) error { + // Be careful, very dirty hack in this line!!! + // IndexLookUpJoin need to rebuild executor (the dataReaderBuilder) during + // executing. However `executor.Next()` is lazy evaluation when the RecordSet + // result is drained. + // Lazy evaluation means the saved session context may change during executor's + // building and its running. + // A specific sequence for example: + // + // e := buildExecutor() // txn at build time + // recordSet := runStmt(e) + // session.CommitTxn() // txn closed + // recordSet.Next() + // e.dataReaderBuilder.Build() // txn is used again, which is already closed + // + // The trick here is `getSnapshotTS` will cache snapshot ts in the dataReaderBuilder, + // so even txn is destroyed later, the dataReaderBuilder could still use the + // cached snapshot ts to construct DAG. + _, err := e.innerCtx.readerBuilder.getSnapshotTS() + if err != nil { + return err + } + + err = e.children[0].Open(ctx) + if err != nil { + return err + } + e.memTracker = memory.NewTracker(e.id, -1) + e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) + e.innerPtrBytes = make([][]byte, 0, 8) + e.startWorkers(ctx) + return nil +} + +func (e *IndexNestedLoopHashJoin) startWorkers(ctx context.Context) { + concurrency := e.ctx.GetSessionVars().IndexLookupJoinConcurrency + workerCtx, cancelFunc := context.WithCancel(ctx) + e.cancelFunc = cancelFunc + innerCh := make(chan *indexHashJoinTask, concurrency) + if e.keepOuterOrder { + e.taskCh = make(chan *indexHashJoinTask, concurrency) + } + e.workerWg.Add(1) + ow := e.newOuterWorker(innerCh) + go util.WithRecovery(func() { ow.run(workerCtx, cancelFunc) }, e.finishJoinWorkers) + + if !e.keepOuterOrder { + e.resultCh = make(chan *indexHashJoinResult, concurrency) + } else { + // When `keepOuterOrder` is true, each task holds their own `resultCh` + // individually, thus we do not need a global resultCh. + e.resultCh = nil + } + e.joinChkResourceCh = make([]chan *chunk.Chunk, concurrency) + for i := 0; i < concurrency; i++ { + if !e.keepOuterOrder { + e.joinChkResourceCh[i] = make(chan *chunk.Chunk, 1) + e.joinChkResourceCh[i] <- newFirstChunk(e) + } else { + e.joinChkResourceCh[i] = make(chan *chunk.Chunk, numResChkHold) + for j := 0; j < numResChkHold; j++ { + e.joinChkResourceCh[i] <- newFirstChunk(e) + } + } + } + + e.workerWg.Add(concurrency) + for i := 0; i < concurrency; i++ { + workerID := i + go util.WithRecovery(func() { e.newInnerWorker(innerCh, workerID).run(workerCtx, cancelFunc) }, e.finishJoinWorkers) + } + go e.wait4JoinWorkers() +} + +func (e *IndexNestedLoopHashJoin) finishJoinWorkers(r interface{}) { + if r != nil { + logutil.BgLogger().Error("IndexNestedLoopHashJoin failed", zap.Error(errors.Errorf("%v", r))) + if e.cancelFunc != nil { + e.cancelFunc() + } + } + e.workerWg.Done() +} + +func (e *IndexNestedLoopHashJoin) wait4JoinWorkers() { + e.workerWg.Wait() + if e.resultCh != nil { + close(e.resultCh) + } + if e.taskCh != nil { + close(e.taskCh) + } +} + +// Next implements the IndexNestedLoopHashJoin Executor interface. +func (e *IndexNestedLoopHashJoin) Next(ctx context.Context, req *chunk.Chunk) error { + req.Reset() + if e.keepOuterOrder { + return e.runInOrder(ctx, req) + } + // unordered run + var ( + result *indexHashJoinResult + ok bool + ) + select { + case result, ok = <-e.resultCh: + if !ok { + return nil + } + if result.err != nil { + return result.err + } + case <-ctx.Done(): + return nil + } + req.SwapColumns(result.chk) + result.src <- result.chk + return nil +} + +func (e *IndexNestedLoopHashJoin) runInOrder(ctx context.Context, req *chunk.Chunk) error { + var ( + result *indexHashJoinResult + ok bool + ) + for { + if e.isDryUpTasks(ctx) { + return nil + } + select { + case result, ok = <-e.curTask.resultCh: + if !ok { + e.curTask = nil + continue + } + if result.err != nil { + return result.err + } + case <-ctx.Done(): + return nil + } + req.SwapColumns(result.chk) + result.src <- result.chk + return nil + } +} + +// isDryUpTasks indicates whether all the tasks have been processed. +func (e *IndexNestedLoopHashJoin) isDryUpTasks(ctx context.Context) bool { + if e.curTask != nil { + return false + } + var ok bool + select { + case e.curTask, ok = <-e.taskCh: + if !ok { + return true + } + case <-ctx.Done(): + return true + } + return false +} + +// Close implements the IndexNestedLoopHashJoin Executor interface. +func (e *IndexNestedLoopHashJoin) Close() error { + if e.cancelFunc != nil { + e.cancelFunc() + e.cancelFunc = nil + } + if e.resultCh != nil { + for range e.resultCh { + } + e.resultCh = nil + } + if e.taskCh != nil { + for range e.taskCh { + } + e.taskCh = nil + } + if e.runtimeStats != nil { + concurrency := cap(e.joinChkResourceCh) + e.runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", concurrency)) + } + for i := range e.joinChkResourceCh { + close(e.joinChkResourceCh[i]) + } + e.joinChkResourceCh = nil + return e.baseExecutor.Close() +} + +func (ow *indexHashJoinOuterWorker) run(ctx context.Context, cancelFunc context.CancelFunc) { + defer close(ow.innerCh) + for { + task, err := ow.buildTask(ctx) + if task == nil { + return + } + if err != nil { + cancelFunc() + logutil.Logger(ctx).Error("indexHashJoinOuterWorker.run failed", zap.Error(err)) + return + } + if finished := ow.pushToChan(ctx, task, ow.innerCh); finished { + return + } + if ow.keepOuterOrder { + if finished := ow.pushToChan(ctx, task, ow.taskCh); finished { + return + } + } + } +} + +func (ow *indexHashJoinOuterWorker) buildTask(ctx context.Context) (*indexHashJoinTask, error) { + task, err := ow.outerWorker.buildTask(ctx) + if task == nil || err != nil { + return nil, err + } + var ( + resultCh chan *indexHashJoinResult + matchedInnerRowPtrs [][][]chunk.RowPtr + ) + if ow.keepOuterOrder { + resultCh = make(chan *indexHashJoinResult, numResChkHold) + matchedInnerRowPtrs = make([][][]chunk.RowPtr, task.outerResult.NumChunks()) + for i := range matchedInnerRowPtrs { + matchedInnerRowPtrs[i] = make([][]chunk.RowPtr, task.outerResult.GetChunk(i).NumRows()) + } + } + numChks := task.outerResult.NumChunks() + outerRowStatus := make([][]outerRowStatusFlag, numChks) + for i := 0; i < numChks; i++ { + outerRowStatus[i] = make([]outerRowStatusFlag, task.outerResult.GetChunk(i).NumRows()) + } + return &indexHashJoinTask{ + lookUpJoinTask: task, + outerRowStatus: outerRowStatus, + keepOuterOrder: ow.keepOuterOrder, + resultCh: resultCh, + matchedInnerRowPtrs: matchedInnerRowPtrs, + }, nil +} + +func (ow *indexHashJoinOuterWorker) pushToChan(ctx context.Context, task *indexHashJoinTask, dst chan<- *indexHashJoinTask) bool { + select { + case <-ctx.Done(): + return true + case dst <- task: + } + return false +} + +func (e *IndexNestedLoopHashJoin) newOuterWorker(innerCh chan *indexHashJoinTask) *indexHashJoinOuterWorker { + ow := &indexHashJoinOuterWorker{ + outerWorker: outerWorker{ + outerCtx: e.outerCtx, + ctx: e.ctx, + executor: e.children[0], + batchSize: 32, + maxBatchSize: e.ctx.GetSessionVars().IndexJoinBatchSize, + parentMemTracker: e.memTracker, + lookup: &e.IndexLookUpJoin, + }, + innerCh: innerCh, + keepOuterOrder: e.keepOuterOrder, + taskCh: e.taskCh, + } + return ow +} + +func (e *IndexNestedLoopHashJoin) newInnerWorker(taskCh chan *indexHashJoinTask, workerID int) *indexHashJoinInnerWorker { + // Since multiple inner workers run concurrently, we should copy join's indexRanges for every worker to avoid data race. + copiedRanges := make([]*ranger.Range, 0, len(e.indexRanges)) + for _, ran := range e.indexRanges { + copiedRanges = append(copiedRanges, ran.Clone()) + } + iw := &indexHashJoinInnerWorker{ + innerWorker: innerWorker{ + innerCtx: e.innerCtx, + outerCtx: e.outerCtx, + ctx: e.ctx, + executorChk: chunk.NewChunkWithCapacity(e.innerCtx.rowTypes, e.maxChunkSize), + indexRanges: copiedRanges, + keyOff2IdxOff: e.keyOff2IdxOff, + }, + taskCh: taskCh, + joiner: e.joiners[workerID], + joinChkResourceCh: e.joinChkResourceCh[workerID], + resultCh: e.resultCh, + matchedOuterPtrs: make([]chunk.RowPtr, 0, e.maxChunkSize), + joinKeyBuf: make([]byte, 1), + outerRowStatus: make([]outerRowStatusFlag, 0, e.maxChunkSize), + } + if e.lastColHelper != nil { + // nextCwf.TmpConstant needs to be reset for every individual + // inner worker to avoid data race when the inner workers is running + // concurrently. + nextCwf := *e.lastColHelper + nextCwf.TmpConstant = make([]*expression.Constant, len(e.lastColHelper.TmpConstant)) + for i := range e.lastColHelper.TmpConstant { + nextCwf.TmpConstant[i] = &expression.Constant{RetType: nextCwf.TargetCol.RetType} + } + iw.nextColCompareFilters = &nextCwf + } + return iw +} + +func (iw *indexHashJoinInnerWorker) run(ctx context.Context, cancelFunc context.CancelFunc) { + var task *indexHashJoinTask + joinResult, ok := iw.getNewJoinResult(ctx) + if !ok { + cancelFunc() + return + } + h, resultCh := fnv.New64(), iw.resultCh + for { + select { + case <-ctx.Done(): + return + case task, ok = <-iw.taskCh: + } + if !ok { + break + } + if task.err != nil { + joinResult.err = task.err + break + } + if task.keepOuterOrder { + resultCh = task.resultCh + } + err := iw.handleTask(ctx, cancelFunc, task, joinResult, h, resultCh) + if err != nil { + joinResult.err = err + break + } + if task.keepOuterOrder { + // We need to get a new result holder here because the old + // `joinResult` hash been sent to the `resultCh` or to the + // `joinChkResourceCh`. + joinResult, ok = iw.getNewJoinResult(ctx) + if !ok { + cancelFunc() + return + } + } + } + if joinResult.err != nil { + cancelFunc() + logutil.Logger(ctx).Error("indexHashJoinInnerWorker.run failed", zap.Error(joinResult.err)) + return + } + // When task.keepOuterOrder is TRUE(resultCh != iw.resultCh), the last + // joinResult will be checked when the a task has been processed, thus we do + // not need to check it here again. + if resultCh == iw.resultCh && joinResult.chk != nil && joinResult.chk.NumRows() > 0 { + select { + case resultCh <- joinResult: + case <-ctx.Done(): + return + } + } +} + +func (iw *indexHashJoinInnerWorker) getNewJoinResult(ctx context.Context) (*indexHashJoinResult, bool) { + joinResult := &indexHashJoinResult{ + src: iw.joinChkResourceCh, + } + ok := true + select { + case joinResult.chk, ok = <-iw.joinChkResourceCh: + case <-ctx.Done(): + return nil, false + } + return joinResult, ok +} + +func (iw *indexHashJoinInnerWorker) buildHashTableForOuterResult(ctx context.Context, cancelFunc context.CancelFunc, task *indexHashJoinTask, h hash.Hash64) { + buf, numChks := make([]byte, 1), task.outerResult.NumChunks() + task.lookupMap = newRowHashMap(task.outerResult.Len()) + for chkIdx := 0; chkIdx < numChks; chkIdx++ { + chk := task.outerResult.GetChunk(chkIdx) + numRows := chk.NumRows() + OUTER: + for rowIdx := 0; rowIdx < numRows; rowIdx++ { + if task.outerMatch != nil && !task.outerMatch[chkIdx][rowIdx] { + continue + } + row := chk.GetRow(rowIdx) + keyColIdx := iw.outerCtx.keyCols + for _, i := range keyColIdx { + if row.IsNull(i) { + continue OUTER + } + } + h.Reset() + err := codec.HashChunkRow(iw.ctx.GetSessionVars().StmtCtx, h, row, iw.outerCtx.rowTypes, keyColIdx, buf) + if err != nil { + cancelFunc() + logutil.Logger(ctx).Error("indexHashJoinInnerWorker.buildHashTableForOuterResult failed", zap.Error(err)) + return + } + rowPtr := chunk.RowPtr{ChkIdx: uint32(chkIdx), RowIdx: uint32(rowIdx)} + task.lookupMap.Put(h.Sum64(), rowPtr) + } + } +} + +func (iw *indexHashJoinInnerWorker) fetchInnerResults(ctx context.Context, task *lookUpJoinTask) error { + lookUpContents, err := iw.constructLookupContent(task) + if err != nil { + return err + } + lookUpContents = iw.sortAndDedupLookUpContents(lookUpContents) + return iw.innerWorker.fetchInnerResults(ctx, task, lookUpContents) +} + +func (iw *indexHashJoinInnerWorker) handleHashJoinInnerWorkerPanic(r interface{}) { + if r != nil { + iw.resultCh <- &indexHashJoinResult{err: errors.Errorf("%v", r)} + } + iw.wg.Done() +} + +func (iw *indexHashJoinInnerWorker) handleTask(ctx context.Context, cancelFunc context.CancelFunc, task *indexHashJoinTask, joinResult *indexHashJoinResult, h hash.Hash64, resultCh chan *indexHashJoinResult) error { + iw.wg = &sync.WaitGroup{} + iw.wg.Add(1) + // TODO(XuHuaiyu): we may always use the smaller side to build the hashtable. + go util.WithRecovery(func() { iw.buildHashTableForOuterResult(ctx, cancelFunc, task, h) }, iw.handleHashJoinInnerWorkerPanic) + err := iw.fetchInnerResults(ctx, task.lookUpJoinTask) + if err != nil { + return err + } + iw.wg.Wait() + if !task.keepOuterOrder { + return iw.doJoinUnordered(ctx, task, joinResult, h, resultCh) + } + return iw.doJoinInOrder(ctx, task, joinResult, h, resultCh) +} + +func (iw *indexHashJoinInnerWorker) doJoinUnordered(ctx context.Context, task *indexHashJoinTask, joinResult *indexHashJoinResult, h hash.Hash64, resultCh chan *indexHashJoinResult) error { + var ok bool + iter := chunk.NewIterator4List(task.innerResult) + for row := iter.Begin(); row != iter.End(); row = iter.Next() { + ok, joinResult = iw.joinMatchedInnerRow2Chunk(ctx, row, task, joinResult, h, iw.joinKeyBuf) + if !ok { + return errors.New("indexHashJoinInnerWorker.doJoinUnordered failed") + } + } + for chkIdx, outerRowStatus := range task.outerRowStatus { + chk := task.outerResult.GetChunk(chkIdx) + for rowIdx, val := range outerRowStatus { + if val == outerRowMatched { + continue + } + iw.joiner.onMissMatch(val == outerRowHasNull, chk.GetRow(rowIdx), joinResult.chk) + if joinResult.chk.IsFull() { + select { + case resultCh <- joinResult: + case <-ctx.Done(): + } + joinResult, ok = iw.getNewJoinResult(ctx) + if !ok { + return errors.New("indexHashJoinInnerWorker.doJoinUnordered failed") + } + } + } + } + return nil +} + +func (iw *indexHashJoinInnerWorker) getMatchedOuterRows(innerRow chunk.Row, task *indexHashJoinTask, h hash.Hash64, buf []byte) (matchedRows []chunk.Row, matchedRowPtr []chunk.RowPtr, err error) { + h.Reset() + err = codec.HashChunkRow(iw.ctx.GetSessionVars().StmtCtx, h, innerRow, iw.rowTypes, iw.keyCols, buf) + if err != nil { + return nil, nil, err + } + iw.matchedOuterPtrs = task.lookupMap.Get(h.Sum64()) + if len(iw.matchedOuterPtrs) == 0 { + return nil, nil, nil + } + joinType := JoinerType(iw.joiner) + isSemiJoin := joinType == plannercore.SemiJoin || joinType == plannercore.LeftOuterSemiJoin + matchedRows = make([]chunk.Row, 0, len(iw.matchedOuterPtrs)) + matchedRowPtr = make([]chunk.RowPtr, 0, len(iw.matchedOuterPtrs)) + for _, ptr := range iw.matchedOuterPtrs { + outerRow := task.outerResult.GetRow(ptr) + ok, err := codec.EqualChunkRow(iw.ctx.GetSessionVars().StmtCtx, innerRow, iw.rowTypes, iw.keyCols, outerRow, iw.outerCtx.rowTypes, iw.outerCtx.keyCols) + if err != nil { + return nil, nil, err + } + if !ok || (task.outerRowStatus[ptr.ChkIdx][ptr.RowIdx] == outerRowMatched && isSemiJoin) { + continue + } + matchedRows = append(matchedRows, outerRow) + matchedRowPtr = append(matchedRowPtr, chunk.RowPtr{ChkIdx: ptr.ChkIdx, RowIdx: ptr.RowIdx}) + } + return matchedRows, matchedRowPtr, nil +} + +func (iw *indexHashJoinInnerWorker) joinMatchedInnerRow2Chunk(ctx context.Context, innerRow chunk.Row, task *indexHashJoinTask, + joinResult *indexHashJoinResult, h hash.Hash64, buf []byte) (bool, *indexHashJoinResult) { + matchedOuterRows, matchedOuterRowPtr, err := iw.getMatchedOuterRows(innerRow, task, h, buf) + if err != nil { + joinResult.err = err + return false, joinResult + } + if len(matchedOuterRows) == 0 { + return true, joinResult + } + var ( + ok bool + iter = chunk.NewIterator4Slice(matchedOuterRows) + cursor = 0 + ) + for iter.Begin(); iter.Current() != iter.End(); { + iw.outerRowStatus, err = iw.joiner.tryToMatchOuters(iter, innerRow, joinResult.chk, iw.outerRowStatus) + if err != nil { + joinResult.err = err + return false, joinResult + } + for _, status := range iw.outerRowStatus { + chkIdx, rowIdx := matchedOuterRowPtr[cursor].ChkIdx, matchedOuterRowPtr[cursor].RowIdx + if status == outerRowMatched || task.outerRowStatus[chkIdx][rowIdx] == outerRowUnmatched { + task.outerRowStatus[chkIdx][rowIdx] = status + } + cursor++ + } + if joinResult.chk.IsFull() { + select { + case iw.resultCh <- joinResult: + case <-ctx.Done(): + } + joinResult, ok = iw.getNewJoinResult(ctx) + if !ok { + return false, joinResult + } + } + } + return true, joinResult +} + +func (iw *indexHashJoinInnerWorker) collectMatchedInnerPtrs4OuterRows(ctx context.Context, innerRow chunk.Row, innerRowPtr chunk.RowPtr, + task *indexHashJoinTask, h hash.Hash64, buf []byte) error { + _, matchedOuterRowIdx, err := iw.getMatchedOuterRows(innerRow, task, h, buf) + if err != nil { + return err + } + for _, outerRowPtr := range matchedOuterRowIdx { + chkIdx, rowIdx := outerRowPtr.ChkIdx, outerRowPtr.RowIdx + task.matchedInnerRowPtrs[chkIdx][rowIdx] = append(task.matchedInnerRowPtrs[chkIdx][rowIdx], innerRowPtr) + } + return nil +} + +// doJoinInOrder follows the following steps: +// 1. collect all the matched inner row ptrs for every outer row +// 2. do the join work +// 2.1 collect all the matched inner rows using the collected ptrs for every outer row +// 2.2 call tryToMatchInners for every outer row +// 2.3 call onMissMatch when no inner rows are matched +func (iw *indexHashJoinInnerWorker) doJoinInOrder(ctx context.Context, task *indexHashJoinTask, joinResult *indexHashJoinResult, h hash.Hash64, resultCh chan *indexHashJoinResult) (err error) { + defer func() { + if err == nil && joinResult.chk != nil { + if joinResult.chk.NumRows() > 0 { + select { + case resultCh <- joinResult: + case <-ctx.Done(): + return + } + } else { + joinResult.src <- joinResult.chk + } + } + close(resultCh) + }() + for i, numChunks := 0, task.innerResult.NumChunks(); i < numChunks; i++ { + for j, chk := 0, task.innerResult.GetChunk(i); j < chk.NumRows(); j++ { + row := chk.GetRow(j) + ptr := chunk.RowPtr{ChkIdx: uint32(i), RowIdx: uint32(j)} + err = iw.collectMatchedInnerPtrs4OuterRows(ctx, row, ptr, task, h, iw.joinKeyBuf) + if err != nil { + return err + } + } + } + // TODO: matchedInnerRowPtrs and matchedInnerRows can be moved to inner worker. + matchedInnerRows := make([]chunk.Row, len(task.matchedInnerRowPtrs)) + var hasMatched, hasNull, ok bool + for chkIdx, innerRowPtrs4Chk := range task.matchedInnerRowPtrs { + for outerRowIdx, innerRowPtrs := range innerRowPtrs4Chk { + matchedInnerRows, hasMatched, hasNull = matchedInnerRows[:0], false, false + outerRow := task.outerResult.GetChunk(chkIdx).GetRow(outerRowIdx) + for _, ptr := range innerRowPtrs { + matchedInnerRows = append(matchedInnerRows, task.innerResult.GetRow(ptr)) + } + iter := chunk.NewIterator4Slice(matchedInnerRows) + for iter.Begin(); iter.Current() != iter.End(); { + matched, isNull, err := iw.joiner.tryToMatchInners(outerRow, iter, joinResult.chk) + if err != nil { + return err + } + hasMatched, hasNull = matched || hasMatched, isNull || hasNull + if joinResult.chk.IsFull() { + select { + case resultCh <- joinResult: + case <-ctx.Done(): + return nil + } + joinResult, ok = iw.getNewJoinResult(ctx) + if !ok { + return errors.New("indexHashJoinInnerWorker.doJoinInOrder failed") + } + } + } + if !hasMatched { + iw.joiner.onMissMatch(hasNull, outerRow, joinResult.chk) + } + } + } + return nil +} diff --git a/executor/index_lookup_join.go b/executor/index_lookup_join.go index a0f522dd3e695..9e3bd5b6806ac 100644 --- a/executor/index_lookup_join.go +++ b/executor/index_lookup_join.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/codec" + "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/mvmap" @@ -664,5 +665,13 @@ func (e *IndexLookUpJoin) Close() error { } e.workerWg.Wait() e.memTracker = nil +<<<<<<< HEAD return e.children[0].Close() +======= + if e.runtimeStats != nil { + concurrency := cap(e.resultCh) + e.runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", concurrency)) + } + return e.baseExecutor.Close() +>>>>>>> 0d95b09... executor: Remove unnecessary information in explain analyze output (#16248) } diff --git a/executor/index_lookup_merge_join.go b/executor/index_lookup_merge_join.go new file mode 100644 index 0000000000000..648bd2cb4e783 --- /dev/null +++ b/executor/index_lookup_merge_join.go @@ -0,0 +1,701 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package executor + +import ( + "context" + "fmt" + "sort" + "sync" + "sync/atomic" + + "github.com/pingcap/parser/terror" + "github.com/pingcap/tidb/expression" + plannercore "github.com/pingcap/tidb/planner/core" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/execdetails" + "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/memory" + "github.com/pingcap/tidb/util/ranger" + "github.com/pingcap/tidb/util/stringutil" + "go.uber.org/zap" +) + +// IndexLookUpMergeJoin realizes IndexLookUpJoin by merge join +// It preserves the order of the outer table and support batch lookup. +// +// The execution flow is very similar to IndexLookUpReader: +// 1. outerWorker read N outer rows, build a task and send it to result channel and inner worker channel. +// 2. The innerWorker receives the task, builds key ranges from outer rows and fetch inner rows, then do merge join. +// 3. main thread receives the task and fetch results from the channel in task one by one. +// 4. If channel has been closed, main thread receives the next task. +type IndexLookUpMergeJoin struct { + baseExecutor + + resultCh <-chan *lookUpMergeJoinTask + cancelFunc context.CancelFunc + workerWg *sync.WaitGroup + + outerMergeCtx outerMergeCtx + innerMergeCtx innerMergeCtx + + joiners []joiner + joinChkResourceCh []chan *chunk.Chunk + isOuterJoin bool + + requiredRows int64 + + task *lookUpMergeJoinTask + + indexRanges []*ranger.Range + keyOff2IdxOff []int + + // lastColHelper store the information for last col if there's complicated filter like col > x_col and col < x_col + 100. + lastColHelper *plannercore.ColWithCmpFuncManager + + memTracker *memory.Tracker // track memory usage +} + +type outerMergeCtx struct { + rowTypes []*types.FieldType + joinKeys []*expression.Column + keyCols []int + filter expression.CNFExprs + needOuterSort bool + compareFuncs []expression.CompareFunc +} + +type innerMergeCtx struct { + readerBuilder *dataReaderBuilder + rowTypes []*types.FieldType + joinKeys []*expression.Column + keyCols []int + compareFuncs []expression.CompareFunc + colLens []int + desc bool + keyOff2KeyOffOrderByIdx []int +} + +type lookUpMergeJoinTask struct { + outerResult *chunk.List + outerOrderIdx []chunk.RowPtr + + innerResult *chunk.Chunk + innerIter chunk.Iterator + + sameKeyInnerRows []chunk.Row + sameKeyIter chunk.Iterator + + doneErr error + results chan *indexMergeJoinResult + + memTracker *memory.Tracker +} + +type outerMergeWorker struct { + outerMergeCtx + + lookup *IndexLookUpMergeJoin + + ctx sessionctx.Context + executor Executor + + maxBatchSize int + batchSize int + + nextColCompareFilters *plannercore.ColWithCmpFuncManager + + resultCh chan<- *lookUpMergeJoinTask + innerCh chan<- *lookUpMergeJoinTask + + parentMemTracker *memory.Tracker +} + +type innerMergeWorker struct { + innerMergeCtx + + taskCh <-chan *lookUpMergeJoinTask + joinChkResourceCh chan *chunk.Chunk + outerMergeCtx outerMergeCtx + ctx sessionctx.Context + innerExec Executor + joiner joiner + retFieldTypes []*types.FieldType + + maxChunkSize int + indexRanges []*ranger.Range + nextColCompareFilters *plannercore.ColWithCmpFuncManager + keyOff2IdxOff []int +} + +type indexMergeJoinResult struct { + chk *chunk.Chunk + src chan<- *chunk.Chunk +} + +// Open implements the Executor interface +func (e *IndexLookUpMergeJoin) Open(ctx context.Context) error { + // Be careful, very dirty hack in this line!!! + // IndexLookMergeUpJoin need to rebuild executor (the dataReaderBuilder) during + // executing. However `executor.Next()` is lazy evaluation when the RecordSet + // result is drained. + // Lazy evaluation means the saved session context may change during executor's + // building and its running. + // A specific sequence for example: + // + // e := buildExecutor() // txn at build time + // recordSet := runStmt(e) + // session.CommitTxn() // txn closed + // recordSet.Next() + // e.dataReaderBuilder.Build() // txn is used again, which is already closed + // + // The trick here is `getSnapshotTS` will cache snapshot ts in the dataReaderBuilder, + // so even txn is destroyed later, the dataReaderBuilder could still use the + // cached snapshot ts to construct DAG. + _, err := e.innerMergeCtx.readerBuilder.getSnapshotTS() + if err != nil { + return err + } + + err = e.children[0].Open(ctx) + if err != nil { + return err + } + e.memTracker = memory.NewTracker(e.id, -1) + e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) + e.startWorkers(ctx) + return nil +} + +func (e *IndexLookUpMergeJoin) startWorkers(ctx context.Context) { + // TODO: consider another session currency variable for index merge join. + // Because its parallelization is not complete. + concurrency := e.ctx.GetSessionVars().IndexLookupJoinConcurrency + resultCh := make(chan *lookUpMergeJoinTask, concurrency) + e.resultCh = resultCh + e.joinChkResourceCh = make([]chan *chunk.Chunk, concurrency) + for i := 0; i < concurrency; i++ { + e.joinChkResourceCh[i] = make(chan *chunk.Chunk, numResChkHold) + for j := 0; j < numResChkHold; j++ { + e.joinChkResourceCh[i] <- chunk.NewChunkWithCapacity(e.retFieldTypes, e.maxChunkSize) + } + } + workerCtx, cancelFunc := context.WithCancel(ctx) + e.cancelFunc = cancelFunc + innerCh := make(chan *lookUpMergeJoinTask, concurrency) + e.workerWg.Add(1) + go e.newOuterWorker(resultCh, innerCh).run(workerCtx, e.workerWg, e.cancelFunc) + e.workerWg.Add(concurrency) + for i := 0; i < concurrency; i++ { + go e.newInnerMergeWorker(innerCh, i).run(workerCtx, e.workerWg, e.cancelFunc) + } +} + +func (e *IndexLookUpMergeJoin) newOuterWorker(resultCh, innerCh chan *lookUpMergeJoinTask) *outerMergeWorker { + omw := &outerMergeWorker{ + outerMergeCtx: e.outerMergeCtx, + ctx: e.ctx, + lookup: e, + executor: e.children[0], + resultCh: resultCh, + innerCh: innerCh, + batchSize: 32, + maxBatchSize: e.ctx.GetSessionVars().IndexJoinBatchSize, + parentMemTracker: e.memTracker, + nextColCompareFilters: e.lastColHelper, + } + return omw +} + +func (e *IndexLookUpMergeJoin) newInnerMergeWorker(taskCh chan *lookUpMergeJoinTask, workID int) *innerMergeWorker { + // Since multiple inner workers run concurrently, we should copy join's indexRanges for every worker to avoid data race. + copiedRanges := make([]*ranger.Range, 0, len(e.indexRanges)) + for _, ran := range e.indexRanges { + copiedRanges = append(copiedRanges, ran.Clone()) + } + imw := &innerMergeWorker{ + innerMergeCtx: e.innerMergeCtx, + outerMergeCtx: e.outerMergeCtx, + taskCh: taskCh, + ctx: e.ctx, + indexRanges: copiedRanges, + keyOff2IdxOff: e.keyOff2IdxOff, + joiner: e.joiners[workID], + joinChkResourceCh: e.joinChkResourceCh[workID], + retFieldTypes: e.retFieldTypes, + maxChunkSize: e.maxChunkSize, + } + if e.lastColHelper != nil { + // nextCwf.TmpConstant needs to be reset for every individual + // inner worker to avoid data race when the inner workers is running + // concurrently. + nextCwf := *e.lastColHelper + nextCwf.TmpConstant = make([]*expression.Constant, len(e.lastColHelper.TmpConstant)) + for i := range e.lastColHelper.TmpConstant { + nextCwf.TmpConstant[i] = &expression.Constant{RetType: nextCwf.TargetCol.RetType} + } + imw.nextColCompareFilters = &nextCwf + } + return imw +} + +// Next implements the Executor interface +func (e *IndexLookUpMergeJoin) Next(ctx context.Context, req *chunk.Chunk) error { + if e.isOuterJoin { + atomic.StoreInt64(&e.requiredRows, int64(req.RequiredRows())) + } + req.Reset() + if e.task == nil { + e.getFinishedTask(ctx) + } + for e.task != nil { + select { + case result, ok := <-e.task.results: + if !ok { + if e.task.doneErr != nil { + return e.task.doneErr + } + e.getFinishedTask(ctx) + continue + } + req.SwapColumns(result.chk) + result.src <- result.chk + return nil + case <-ctx.Done(): + return nil + } + } + + return nil +} + +func (e *IndexLookUpMergeJoin) getFinishedTask(ctx context.Context) { + select { + case e.task = <-e.resultCh: + case <-ctx.Done(): + e.task = nil + } + + // TODO: reuse the finished task memory to build tasks. +} + +func (omw *outerMergeWorker) run(ctx context.Context, wg *sync.WaitGroup, cancelFunc context.CancelFunc) { + defer func() { + close(omw.resultCh) + close(omw.innerCh) + wg.Done() + if r := recover(); r != nil { + logutil.Logger(ctx).Error("panic in outerMergeWorker.run", + zap.Reflect("r", r), + zap.Stack("stack trace")) + cancelFunc() + } + }() + for { + task, err := omw.buildTask(ctx) + if err != nil { + task.doneErr = err + close(task.results) + omw.pushToChan(ctx, task, omw.resultCh) + return + } + if task == nil { + return + } + + if finished := omw.pushToChan(ctx, task, omw.innerCh); finished { + return + } + + if finished := omw.pushToChan(ctx, task, omw.resultCh); finished { + return + } + } +} + +func (omw *outerMergeWorker) pushToChan(ctx context.Context, task *lookUpMergeJoinTask, dst chan<- *lookUpMergeJoinTask) (finished bool) { + select { + case <-ctx.Done(): + return true + case dst <- task: + } + return false +} + +// buildTask builds a lookUpMergeJoinTask and read outer rows. +// When err is not nil, task must not be nil to send the error to the main thread via task +func (omw *outerMergeWorker) buildTask(ctx context.Context) (*lookUpMergeJoinTask, error) { + task := &lookUpMergeJoinTask{ + results: make(chan *indexMergeJoinResult, numResChkHold), + outerResult: chunk.NewList(omw.rowTypes, omw.executor.base().initCap, omw.executor.base().maxChunkSize), + } + task.memTracker = memory.NewTracker(stringutil.MemoizeStr(func() string { return fmt.Sprintf("lookup join task %p", task) }), -1) + task.memTracker.AttachTo(omw.parentMemTracker) + + omw.increaseBatchSize() + requiredRows := omw.batchSize + if omw.lookup.isOuterJoin { + requiredRows = int(atomic.LoadInt64(&omw.lookup.requiredRows)) + } + if requiredRows <= 0 || requiredRows > omw.maxBatchSize { + requiredRows = omw.maxBatchSize + } + for requiredRows > 0 { + execChk := newFirstChunk(omw.executor) + err := Next(ctx, omw.executor, execChk) + if err != nil { + return task, err + } + if execChk.NumRows() == 0 { + break + } + + task.outerResult.Add(execChk) + requiredRows -= execChk.NumRows() + task.memTracker.Consume(execChk.MemoryUsage()) + } + + if task.outerResult.Len() == 0 { + return nil, nil + } + + return task, nil +} + +func (omw *outerMergeWorker) increaseBatchSize() { + if omw.batchSize < omw.maxBatchSize { + omw.batchSize *= 2 + } + if omw.batchSize > omw.maxBatchSize { + omw.batchSize = omw.maxBatchSize + } +} + +func (imw *innerMergeWorker) run(ctx context.Context, wg *sync.WaitGroup, cancelFunc context.CancelFunc) { + var task *lookUpMergeJoinTask + defer func() { + wg.Done() + if r := recover(); r != nil { + logutil.Logger(ctx).Error("panic in innerMergeWorker.run", + zap.Reflect("r", r), + zap.Stack("stack trace")) + cancelFunc() + } + }() + + for ok := true; ok; { + select { + case task, ok = <-imw.taskCh: + if !ok { + return + } + case <-ctx.Done(): + return + } + + err := imw.handleTask(ctx, task) + task.doneErr = err + close(task.results) + } +} + +func (imw *innerMergeWorker) handleTask(ctx context.Context, task *lookUpMergeJoinTask) (err error) { + numOuterChks := task.outerResult.NumChunks() + var outerMatch [][]bool + if imw.outerMergeCtx.filter != nil { + outerMatch = make([][]bool, numOuterChks) + for i := 0; i < numOuterChks; i++ { + chk := task.outerResult.GetChunk(i) + outerMatch[i] = make([]bool, chk.NumRows()) + outerMatch[i], err = expression.VectorizedFilter(imw.ctx, imw.outerMergeCtx.filter, chunk.NewIterator4Chunk(chk), outerMatch[i]) + if err != nil { + return err + } + } + } + task.outerOrderIdx = make([]chunk.RowPtr, 0, task.outerResult.Len()) + for i := 0; i < numOuterChks; i++ { + numRow := task.outerResult.GetChunk(i).NumRows() + for j := 0; j < numRow; j++ { + if len(outerMatch) == 0 || outerMatch[i][j] { + task.outerOrderIdx = append(task.outerOrderIdx, chunk.RowPtr{ChkIdx: uint32(i), RowIdx: uint32(j)}) + } + } + } + task.memTracker.Consume(int64(cap(task.outerOrderIdx))) + // needOuterSort means the outer side property items can't guarantee the order of join keys. + // Because the necessary condition of merge join is both outer and inner keep order of join keys. + // In this case, we need sort the outer side. + if imw.outerMergeCtx.needOuterSort { + sort.Slice(task.outerOrderIdx, func(i, j int) bool { + idxI, idxJ := task.outerOrderIdx[i], task.outerOrderIdx[j] + rowI, rowJ := task.outerResult.GetRow(idxI), task.outerResult.GetRow(idxJ) + var cmp int64 + var err error + for _, keyOff := range imw.keyOff2KeyOffOrderByIdx { + joinKey := imw.outerMergeCtx.joinKeys[keyOff] + cmp, _, err = imw.outerMergeCtx.compareFuncs[keyOff](imw.ctx, joinKey, joinKey, rowI, rowJ) + terror.Log(err) + if cmp != 0 { + break + } + } + if cmp != 0 || imw.nextColCompareFilters == nil { + return cmp < 0 + } + return imw.nextColCompareFilters.CompareRow(rowI, rowJ) < 0 + }) + } + dLookUpKeys, err := imw.constructDatumLookupKeys(task) + if err != nil { + return err + } + dLookUpKeys = imw.dedupDatumLookUpKeys(dLookUpKeys) + // If the order requires descending, the deDupedLookUpContents is keep descending order before. + // So at the end, we should generate the ascending deDupedLookUpContents to build the correct range for inner read. + if !imw.outerMergeCtx.needOuterSort && imw.desc { + lenKeys := len(dLookUpKeys) + for i := 0; i < lenKeys/2; i++ { + dLookUpKeys[i], dLookUpKeys[lenKeys-i-1] = dLookUpKeys[lenKeys-i-1], dLookUpKeys[i] + } + } + imw.innerExec, err = imw.readerBuilder.buildExecutorForIndexJoin(ctx, dLookUpKeys, imw.indexRanges, imw.keyOff2IdxOff, imw.nextColCompareFilters) + if err != nil { + return err + } + defer terror.Call(imw.innerExec.Close) + _, err = imw.fetchNextInnerResult(ctx, task) + if err != nil { + return err + } + err = imw.doMergeJoin(ctx, task) + return err +} + +func (imw *innerMergeWorker) fetchNewChunkWhenFull(ctx context.Context, task *lookUpMergeJoinTask, chk **chunk.Chunk) (continueJoin bool) { + if !(*chk).IsFull() { + return true + } + select { + case task.results <- &indexMergeJoinResult{*chk, imw.joinChkResourceCh}: + case <-ctx.Done(): + return false + } + var ok bool + *chk, ok = <-imw.joinChkResourceCh + if !ok { + return false + } + (*chk).Reset() + return true +} + +func (imw *innerMergeWorker) doMergeJoin(ctx context.Context, task *lookUpMergeJoinTask) (err error) { + chk := <-imw.joinChkResourceCh + defer func() { + if chk.NumRows() > 0 { + select { + case task.results <- &indexMergeJoinResult{chk, imw.joinChkResourceCh}: + case <-ctx.Done(): + return + } + } else { + imw.joinChkResourceCh <- chk + } + }() + + initCmpResult := 1 + if imw.innerMergeCtx.desc { + initCmpResult = -1 + } + noneInnerRowsRemain := task.innerResult.NumRows() == 0 + + for _, outerIdx := range task.outerOrderIdx { + outerRow := task.outerResult.GetRow(outerIdx) + hasMatch, hasNull, cmpResult := false, false, initCmpResult + // If it has iterated out all inner rows and the inner rows with same key is empty, + // that means the outer row needn't match any inner rows. + if noneInnerRowsRemain && len(task.sameKeyInnerRows) == 0 { + goto missMatch + } + if len(task.sameKeyInnerRows) > 0 { + cmpResult, err = imw.compare(outerRow, task.sameKeyIter.Begin()) + if err != nil { + return err + } + } + if (cmpResult > 0 && !imw.innerMergeCtx.desc) || (cmpResult < 0 && imw.innerMergeCtx.desc) { + if noneInnerRowsRemain { + task.sameKeyInnerRows = task.sameKeyInnerRows[:0] + goto missMatch + } + noneInnerRowsRemain, err = imw.fetchInnerRowsWithSameKey(ctx, task, outerRow) + if err != nil { + return err + } + } + + for task.sameKeyIter.Current() != task.sameKeyIter.End() { + matched, isNull, err := imw.joiner.tryToMatchInners(outerRow, task.sameKeyIter, chk) + if err != nil { + return err + } + hasMatch = hasMatch || matched + hasNull = hasNull || isNull + if !imw.fetchNewChunkWhenFull(ctx, task, &chk) { + return nil + } + } + + missMatch: + if !hasMatch { + imw.joiner.onMissMatch(hasNull, outerRow, chk) + if !imw.fetchNewChunkWhenFull(ctx, task, &chk) { + return nil + } + } + } + + return nil +} + +// fetchInnerRowsWithSameKey collects the inner rows having the same key with one outer row. +func (imw *innerMergeWorker) fetchInnerRowsWithSameKey(ctx context.Context, task *lookUpMergeJoinTask, key chunk.Row) (noneInnerRows bool, err error) { + task.sameKeyInnerRows = task.sameKeyInnerRows[:0] + curRow := task.innerIter.Current() + var cmpRes int + for cmpRes, err = imw.compare(key, curRow); ((cmpRes >= 0 && !imw.desc) || (cmpRes <= 0 && imw.desc)) && err == nil; cmpRes, err = imw.compare(key, curRow) { + if cmpRes == 0 { + task.sameKeyInnerRows = append(task.sameKeyInnerRows, curRow) + } + curRow = task.innerIter.Next() + if curRow == task.innerIter.End() { + curRow, err = imw.fetchNextInnerResult(ctx, task) + if err != nil || task.innerResult.NumRows() == 0 { + break + } + } + } + task.sameKeyIter = chunk.NewIterator4Slice(task.sameKeyInnerRows) + task.sameKeyIter.Begin() + noneInnerRows = task.innerResult.NumRows() == 0 + return +} + +func (imw *innerMergeWorker) compare(outerRow, innerRow chunk.Row) (int, error) { + for _, keyOff := range imw.innerMergeCtx.keyOff2KeyOffOrderByIdx { + cmp, _, err := imw.innerMergeCtx.compareFuncs[keyOff](imw.ctx, imw.outerMergeCtx.joinKeys[keyOff], imw.innerMergeCtx.joinKeys[keyOff], outerRow, innerRow) + if err != nil || cmp != 0 { + return int(cmp), err + } + } + return 0, nil +} + +func (imw *innerMergeWorker) constructDatumLookupKeys(task *lookUpMergeJoinTask) ([]*indexJoinLookUpContent, error) { + numRows := len(task.outerOrderIdx) + dLookUpKeys := make([]*indexJoinLookUpContent, 0, numRows) + for i := 0; i < numRows; i++ { + dLookUpKey, err := imw.constructDatumLookupKey(task, task.outerOrderIdx[i]) + if err != nil { + return nil, err + } + if dLookUpKey == nil { + continue + } + dLookUpKeys = append(dLookUpKeys, dLookUpKey) + } + + return dLookUpKeys, nil +} + +func (imw *innerMergeWorker) constructDatumLookupKey(task *lookUpMergeJoinTask, rowIdx chunk.RowPtr) (*indexJoinLookUpContent, error) { + outerRow := task.outerResult.GetRow(rowIdx) + sc := imw.ctx.GetSessionVars().StmtCtx + keyLen := len(imw.keyCols) + dLookupKey := make([]types.Datum, 0, keyLen) + for i, keyCol := range imw.outerMergeCtx.keyCols { + outerValue := outerRow.GetDatum(keyCol, imw.outerMergeCtx.rowTypes[keyCol]) + // Join-on-condition can be promised to be equal-condition in + // IndexNestedLoopJoin, thus the filter will always be false if + // outerValue is null, and we don't need to lookup it. + if outerValue.IsNull() { + return nil, nil + } + innerColType := imw.rowTypes[imw.keyCols[i]] + innerValue, err := outerValue.ConvertTo(sc, innerColType) + if err != nil { + // If the converted outerValue overflows, we don't need to lookup it. + if terror.ErrorEqual(err, types.ErrOverflow) { + return nil, nil + } + return nil, err + } + cmp, err := outerValue.CompareDatum(sc, &innerValue) + if err != nil { + return nil, err + } + if cmp != 0 { + // If the converted outerValue is not equal to the origin outerValue, we don't need to lookup it. + return nil, nil + } + dLookupKey = append(dLookupKey, innerValue) + } + return &indexJoinLookUpContent{keys: dLookupKey, row: task.outerResult.GetRow(rowIdx)}, nil +} + +func (imw *innerMergeWorker) dedupDatumLookUpKeys(lookUpContents []*indexJoinLookUpContent) []*indexJoinLookUpContent { + if len(lookUpContents) < 2 { + return lookUpContents + } + sc := imw.ctx.GetSessionVars().StmtCtx + deDupedLookUpContents := lookUpContents[:1] + for i := 1; i < len(lookUpContents); i++ { + cmp := compareRow(sc, lookUpContents[i].keys, lookUpContents[i-1].keys) + if cmp != 0 || (imw.nextColCompareFilters != nil && imw.nextColCompareFilters.CompareRow(lookUpContents[i].row, lookUpContents[i-1].row) != 0) { + deDupedLookUpContents = append(deDupedLookUpContents, lookUpContents[i]) + } + } + return deDupedLookUpContents +} + +// fetchNextInnerResult collects a chunk of inner results from inner child executor. +func (imw *innerMergeWorker) fetchNextInnerResult(ctx context.Context, task *lookUpMergeJoinTask) (beginRow chunk.Row, err error) { + task.innerResult = chunk.NewChunkWithCapacity(retTypes(imw.innerExec), imw.ctx.GetSessionVars().MaxChunkSize) + err = Next(ctx, imw.innerExec, task.innerResult) + task.innerIter = chunk.NewIterator4Chunk(task.innerResult) + beginRow = task.innerIter.Begin() + return +} + +// Close implements the Executor interface. +func (e *IndexLookUpMergeJoin) Close() error { + if e.cancelFunc != nil { + e.cancelFunc() + e.cancelFunc = nil + } + e.workerWg.Wait() + for i := range e.joinChkResourceCh { + close(e.joinChkResourceCh[i]) + } + e.joinChkResourceCh = nil + e.memTracker = nil + if e.runtimeStats != nil { + concurrency := cap(e.resultCh) + e.runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", concurrency)) + } + return e.baseExecutor.Close() +} diff --git a/executor/join.go b/executor/join.go index 480f370d9b1e8..69d7d238c4053 100644 --- a/executor/join.go +++ b/executor/join.go @@ -30,6 +30,11 @@ import ( "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/codec" +<<<<<<< HEAD +======= + "github.com/pingcap/tidb/util/disk" + "github.com/pingcap/tidb/util/execdetails" +>>>>>>> 0d95b09... executor: Remove unnecessary information in explain analyze output (#16248) "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/mvmap" "github.com/pingcap/tidb/util/stringutil" @@ -138,6 +143,16 @@ func (e *HashJoinExec) Close() error { } e.memTracker = nil +<<<<<<< HEAD +======= + if e.runtimeStats != nil { + concurrency := cap(e.joiners) + e.runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", concurrency)) + if e.rowContainer != nil { + e.runtimeStats.SetAdditionalInfo(e.rowContainer.stat.String()) + } + } +>>>>>>> 0d95b09... executor: Remove unnecessary information in explain analyze output (#16248) err := e.baseExecutor.Close() return err } diff --git a/executor/projection.go b/executor/projection.go index 83da1a5857d1e..6c87ceaf00be2 100644 --- a/executor/projection.go +++ b/executor/projection.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/logutil" "go.uber.org/zap" ) @@ -259,8 +260,28 @@ func (e *ProjectionExec) Close() error { } if e.outputCh != nil { close(e.finishCh) +<<<<<<< HEAD // Wait for "projectionInputFetcher" to finish and exit. for range e.outputCh { +======= + e.wg.Wait() // Wait for fetcher and workers to finish and exit. + + // clear fetcher + e.drainInputCh(e.fetcher.inputCh) + e.drainOutputCh(e.fetcher.outputCh) + + // clear workers + for _, w := range e.workers { + e.drainInputCh(w.inputCh) + e.drainOutputCh(w.outputCh) + } + } + if e.runtimeStats != nil { + if e.isUnparallelExec() { + e.runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", 0)) + } else { + e.runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", int(e.numWorkers))) +>>>>>>> 0d95b09... executor: Remove unnecessary information in explain analyze output (#16248) } e.outputCh = nil } diff --git a/executor/shuffle.go b/executor/shuffle.go new file mode 100644 index 0000000000000..c8f93bafb4bf7 --- /dev/null +++ b/executor/shuffle.go @@ -0,0 +1,381 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package executor + +import ( + "context" + "sync" + + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/expression" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/execdetails" + "github.com/pingcap/tidb/util/logutil" + "github.com/spaolacci/murmur3" + "go.uber.org/zap" +) + +// ShuffleExec is the executor to run other executors in a parallel manner. +// 1. It fetches chunks from `DataSource`. +// 2. It splits tuples from `DataSource` into N partitions (Only "split by hash" is implemented so far). +// 3. It invokes N workers in parallel, assign each partition as input to each worker and execute child executors. +// 4. It collects outputs from each worker, then sends outputs to its parent. +// +// +-------------+ +// +-------| Main Thread | +// | +------+------+ +// | ^ +// | | +// | + +// v +++ +// outputHolderCh | | outputCh (1 x Concurrency) +// v +++ +// | ^ +// | | +// | +-------+-------+ +// v | | +// +--------------+ +--------------+ +// +----- | worker | ....... | worker | worker (N Concurrency): child executor, eg. WindowExec (+SortExec) +// | +------------+-+ +-+------------+ +// | ^ ^ +// | | | +// | +-+ +-+ ...... +-+ +// | | | | | | | +// | ... ... ... inputCh (Concurrency x 1) +// v | | | | | | +// inputHolderCh +++ +++ +++ +// v ^ ^ ^ +// | | | | +// | +------o----+ | +// | | +-----------------+-----+ +// | | | +// | +---+------------+------------+----+-----------+ +// | | Partition Splitter | +// | +--------------+-+------------+-+--------------+ +// | ^ +// | | +// | +---------------v-----------------+ +// +----------> | fetch data from DataSource | +// +---------------------------------+ +// +//////////////////////////////////////////////////////////////////////////////////////// +type ShuffleExec struct { + baseExecutor + concurrency int + workers []*shuffleWorker + + prepared bool + executed bool + + splitter partitionSplitter + dataSource Executor + + finishCh chan struct{} + outputCh chan *shuffleOutput +} + +type shuffleOutput struct { + chk *chunk.Chunk + err error + giveBackCh chan *chunk.Chunk +} + +// Open implements the Executor Open interface. +func (e *ShuffleExec) Open(ctx context.Context) error { + if err := e.dataSource.Open(ctx); err != nil { + return err + } + if err := e.baseExecutor.Open(ctx); err != nil { + return err + } + + e.prepared = false + e.finishCh = make(chan struct{}, 1) + e.outputCh = make(chan *shuffleOutput, e.concurrency) + + for _, w := range e.workers { + w.finishCh = e.finishCh + + w.inputCh = make(chan *chunk.Chunk, 1) + w.inputHolderCh = make(chan *chunk.Chunk, 1) + w.outputCh = e.outputCh + w.outputHolderCh = make(chan *chunk.Chunk, 1) + + if err := w.childExec.Open(ctx); err != nil { + return err + } + + w.inputHolderCh <- newFirstChunk(e.dataSource) + w.outputHolderCh <- newFirstChunk(e) + } + + return nil +} + +// Close implements the Executor Close interface. +func (e *ShuffleExec) Close() error { + if !e.prepared { + for _, w := range e.workers { + close(w.inputHolderCh) + close(w.inputCh) + close(w.outputHolderCh) + } + close(e.outputCh) + } + close(e.finishCh) + for _, w := range e.workers { + for range w.inputCh { + } + } + for range e.outputCh { // workers exit before `e.outputCh` is closed. + } + e.executed = false + + if e.runtimeStats != nil { + e.runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("ShuffleConcurrency", e.concurrency)) + } + + err := e.dataSource.Close() + err1 := e.baseExecutor.Close() + if err != nil { + return errors.Trace(err) + } + return errors.Trace(err1) +} + +func (e *ShuffleExec) prepare4ParallelExec(ctx context.Context) { + go e.fetchDataAndSplit(ctx) + + waitGroup := &sync.WaitGroup{} + waitGroup.Add(len(e.workers)) + for _, w := range e.workers { + go w.run(ctx, waitGroup) + } + + go e.waitWorkerAndCloseOutput(waitGroup) +} + +func (e *ShuffleExec) waitWorkerAndCloseOutput(waitGroup *sync.WaitGroup) { + waitGroup.Wait() + close(e.outputCh) +} + +// Next implements the Executor Next interface. +func (e *ShuffleExec) Next(ctx context.Context, req *chunk.Chunk) error { + req.Reset() + if !e.prepared { + e.prepare4ParallelExec(ctx) + e.prepared = true + } + + failpoint.Inject("shuffleError", func(val failpoint.Value) { + if val.(bool) { + failpoint.Return(errors.New("ShuffleExec.Next error")) + } + }) + + if e.executed { + return nil + } + + result, ok := <-e.outputCh + if !ok { + e.executed = true + return nil + } + if result.err != nil { + return result.err + } + req.SwapColumns(result.chk) // `shuffleWorker` will not send an empty `result.chk` to `e.outputCh`. + result.giveBackCh <- result.chk + + return nil +} + +func recoveryShuffleExec(output chan *shuffleOutput, r interface{}) { + err := errors.Errorf("%v", r) + output <- &shuffleOutput{err: errors.Errorf("%v", r)} + logutil.BgLogger().Error("shuffle panicked", zap.Error(err), zap.Stack("stack")) +} + +func (e *ShuffleExec) fetchDataAndSplit(ctx context.Context) { + var ( + err error + workerIndices []int + ) + results := make([]*chunk.Chunk, len(e.workers)) + chk := newFirstChunk(e.dataSource) + + defer func() { + if r := recover(); r != nil { + recoveryShuffleExec(e.outputCh, r) + } + for _, w := range e.workers { + close(w.inputCh) + } + }() + + for { + err = Next(ctx, e.dataSource, chk) + if err != nil { + e.outputCh <- &shuffleOutput{err: err} + return + } + if chk.NumRows() == 0 { + break + } + + workerIndices, err = e.splitter.split(e.ctx, chk, workerIndices) + if err != nil { + e.outputCh <- &shuffleOutput{err: err} + return + } + numRows := chk.NumRows() + for i := 0; i < numRows; i++ { + workerIdx := workerIndices[i] + w := e.workers[workerIdx] + + if results[workerIdx] == nil { + select { + case <-e.finishCh: + return + case results[workerIdx] = <-w.inputHolderCh: + break + } + } + results[workerIdx].AppendRow(chk.GetRow(i)) + if results[workerIdx].IsFull() { + w.inputCh <- results[workerIdx] + results[workerIdx] = nil + } + } + } + for i, w := range e.workers { + if results[i] != nil { + w.inputCh <- results[i] + results[i] = nil + } + } +} + +var _ Executor = &shuffleWorker{} + +// shuffleWorker is the multi-thread worker executing child executors within "partition". +type shuffleWorker struct { + baseExecutor + childExec Executor + + finishCh <-chan struct{} + executed bool + + // Workers get inputs from dataFetcherThread by `inputCh`, + // and output results to main thread by `outputCh`. + // `inputHolderCh` and `outputHolderCh` are "Chunk Holder" channels of `inputCh` and `outputCh` respectively, + // which give the `*Chunk` back, to implement the data transport in a streaming manner. + inputCh chan *chunk.Chunk + inputHolderCh chan *chunk.Chunk + outputCh chan *shuffleOutput + outputHolderCh chan *chunk.Chunk +} + +// Open implements the Executor Open interface. +func (e *shuffleWorker) Open(ctx context.Context) error { + if err := e.baseExecutor.Open(ctx); err != nil { + return err + } + e.executed = false + return nil +} + +// Close implements the Executor Close interface. +func (e *shuffleWorker) Close() error { + return errors.Trace(e.baseExecutor.Close()) +} + +// Next implements the Executor Next interface. +// It is called by `Tail` executor within "shuffle", to fetch data from `DataSource` by `inputCh`. +func (e *shuffleWorker) Next(ctx context.Context, req *chunk.Chunk) error { + req.Reset() + if e.executed { + return nil + } + select { + case <-e.finishCh: + e.executed = true + return nil + case result, ok := <-e.inputCh: + if !ok || result.NumRows() == 0 { + e.executed = true + return nil + } + req.SwapColumns(result) + e.inputHolderCh <- result + return nil + } +} + +func (e *shuffleWorker) run(ctx context.Context, waitGroup *sync.WaitGroup) { + defer func() { + if r := recover(); r != nil { + recoveryShuffleExec(e.outputCh, r) + } + waitGroup.Done() + }() + + for { + select { + case <-e.finishCh: + return + case chk := <-e.outputHolderCh: + if err := Next(ctx, e.childExec, chk); err != nil { + e.outputCh <- &shuffleOutput{err: err} + return + } + + // Should not send an empty `chk` to `e.outputCh`. + if chk.NumRows() == 0 { + return + } + e.outputCh <- &shuffleOutput{chk: chk, giveBackCh: e.outputHolderCh} + } + } +} + +var _ partitionSplitter = &partitionHashSplitter{} + +type partitionSplitter interface { + split(ctx sessionctx.Context, input *chunk.Chunk, workerIndices []int) ([]int, error) +} + +type partitionHashSplitter struct { + byItems []expression.Expression + numWorkers int + hashKeys [][]byte +} + +func (s *partitionHashSplitter) split(ctx sessionctx.Context, input *chunk.Chunk, workerIndices []int) ([]int, error) { + var err error + s.hashKeys, err = getGroupKey(ctx, input, s.hashKeys, s.byItems) + if err != nil { + return workerIndices, err + } + workerIndices = workerIndices[:0] + numRows := input.NumRows() + for i := 0; i < numRows; i++ { + workerIndices = append(workerIndices, int(murmur3.Sum32(s.hashKeys[i]))%s.numWorkers) + } + return workerIndices, nil +} diff --git a/util/execdetails/execdetails.go b/util/execdetails/execdetails.go index 515d01c00b14c..0bd487a7ba3b1 100644 --- a/util/execdetails/execdetails.go +++ b/util/execdetails/execdetails.go @@ -328,6 +328,20 @@ type RuntimeStatsColl struct { readerStats map[string]*ReaderRuntimeStats } +<<<<<<< HEAD +======= +// ConcurrencyInfo is used to save the concurrency information of the executor operator +type ConcurrencyInfo struct { + concurrencyName string + concurrencyNum int +} + +// NewConcurrencyInfo creates new executor's concurrencyInfo. +func NewConcurrencyInfo(name string, num int) *ConcurrencyInfo { + return &ConcurrencyInfo{name, num} +} + +>>>>>>> 0d95b09... executor: Remove unnecessary information in explain analyze output (#16248) // RuntimeStats collects one executor's execution info. type RuntimeStats struct { // executor's Next() called times. @@ -336,6 +350,17 @@ type RuntimeStats struct { consume int64 // executor return row count. rows int64 +<<<<<<< HEAD +======= + + // protect concurrency + mu sync.Mutex + // executor concurrency information + concurrency []*ConcurrencyInfo + + // additional information for executors + additionalInfo string +>>>>>>> 0d95b09... executor: Remove unnecessary information in explain analyze output (#16248) } // NewRuntimeStatsColl creates new executor collector. @@ -420,6 +445,33 @@ func (e *RuntimeStats) SetRowNum(rowNum int64) { atomic.StoreInt64(&e.rows, rowNum) } +<<<<<<< HEAD +======= +// SetConcurrencyInfo sets the concurrency informations. +// We must clear the concurrencyInfo first when we call the SetConcurrencyInfo. +// When the num <= 0, it means the exector operator is not executed parallel. +func (e *RuntimeStats) SetConcurrencyInfo(infos ...*ConcurrencyInfo) { + e.mu.Lock() + defer e.mu.Unlock() + e.concurrency = e.concurrency[:0] + for _, info := range infos { + e.concurrency = append(e.concurrency, info) + } +} + +// SetAdditionalInfo sets the additional information. +func (e *RuntimeStats) SetAdditionalInfo(info string) { + e.mu.Lock() + e.additionalInfo = info + e.mu.Unlock() +} + +// GetActRows return rows of CopRuntimeStats. +func (e *RuntimeStats) GetActRows() int64 { + return e.rows +} + +>>>>>>> 0d95b09... executor: Remove unnecessary information in explain analyze output (#16248) func (e *RuntimeStats) String() string { if e == nil { return ""