diff --git a/executor/index_merge_reader.go b/executor/index_merge_reader.go index fabf68c1fe36d..ce2351d135a21 100644 --- a/executor/index_merge_reader.go +++ b/executor/index_merge_reader.go @@ -49,6 +49,13 @@ var ( _ Executor = &IndexMergeReaderExecutor{} ) +const ( + partialIndexWorkerType = "IndexMergePartialIndexWorker" + partialTableWorkerType = "IndexMergePartialTableWorker" + processWorkerType = "IndexMergeProcessWorker" + tableScanWorkerType = "IndexMergeTableScanWorker" +) + // IndexMergeReaderExecutor accesses a table with multiple index/table scan. // There are three types of workers: // 1. partialTableWorker/partialIndexWorker, which are used to fetch the handles @@ -87,10 +94,10 @@ type IndexMergeReaderExecutor struct { // All fields above are immutable. - tblWorkerWg sync.WaitGroup - idxWorkerWg sync.WaitGroup - processWokerWg sync.WaitGroup - finished chan struct{} + tblWorkerWg sync.WaitGroup + idxWorkerWg sync.WaitGroup + processWorkerWg sync.WaitGroup + finished chan struct{} workerStarted bool keyRanges [][]kv.KeyRange @@ -245,20 +252,26 @@ func (e *IndexMergeReaderExecutor) startIndexMergeProcessWorker(ctx context.Cont indexMerge: e, stats: e.stats, } - e.processWokerWg.Add(1) + e.processWorkerWg.Add(1) go func() { defer trace.StartRegion(ctx, "IndexMergeProcessWorker").End() util.WithRecovery( func() { idxMergeProcessWorker.fetchLoop(ctx, fetch, workCh, e.resultCh, e.finished) }, - idxMergeProcessWorker.handleLoopFetcherPanic(ctx, e.resultCh), + handleWorkerPanic(ctx, e.finished, e.resultCh, processWorkerType), ) - e.processWokerWg.Done() + e.processWorkerWg.Done() }() } func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, exitCh <-chan struct{}, fetchCh chan<- *lookupTableTask, workID int) error { + failpoint.Inject("testIndexMergeResultChCloseEarly", func(_ failpoint.Value) { + // Wait for processWorker to close resultCh. + time.Sleep(2) + // Should use fetchCh instead of resultCh to send error. + syncErr(ctx, e.finished, fetchCh, errors.New("testIndexMergeResultChCloseEarly")) + }) if e.runtimeStats != nil { collExec := true e.dagPBs[workID].CollectExecutionSummaries = &collExec @@ -282,6 +295,17 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, defer e.idxWorkerWg.Done() util.WithRecovery( func() { + failpoint.Inject("testIndexMergePanicPartialIndexWorker", nil) + failpoint.Inject("mockSleepBeforeStartTableReader", func(_ failpoint.Value) { + select { + case <-ctx.Done(): + failpoint.Return() + case <-e.finished: + failpoint.Return() + case <-exitCh: + failpoint.Return() + } + }) worker := &partialIndexWorker{ stats: e.stats, idxID: e.getPartitalPlanID(workID), @@ -289,13 +313,14 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, batchSize: e.maxChunkSize, maxBatchSize: e.ctx.GetSessionVars().IndexLookupSize, maxChunkSize: e.maxChunkSize, + memTracker: e.memTracker, } if e.isCorColInPartialFilters[workID] { // We got correlated column, so need to refresh Selection operator. var err error if e.dagPBs[workID].Executors, _, err = constructDistExec(e.ctx, e.partialPlans[workID]); err != nil { - worker.syncErr(e.resultCh, err) + syncErr(ctx, e.finished, fetchCh, err) return } } @@ -323,12 +348,12 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, // init kvReq and worker for this partition kvReq, err := builder.SetKeyRanges(keyRange).Build() if err != nil { - worker.syncErr(e.resultCh, err) + syncErr(ctx, e.finished, fetchCh, err) return } result, err := distsql.SelectWithRuntimeStats(ctx, e.ctx, kvReq, e.handleCols.GetFieldsTypes(), e.feedbacks[workID], getPhysicalPlanIDs(e.partialPlans[workID]), e.getPartitalPlanID(workID)) if err != nil { - worker.syncErr(e.resultCh, err) + syncErr(ctx, e.finished, fetchCh, err) return } worker.batchSize = e.maxChunkSize @@ -341,7 +366,7 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, // fetch all data from this partition ctx1, cancel := context.WithCancel(ctx) - _, fetchErr := worker.fetchHandles(ctx1, result, exitCh, fetchCh, e.resultCh, e.finished, e.handleCols) + _, fetchErr := worker.fetchHandles(ctx1, result, exitCh, fetchCh, e.finished, e.handleCols) if fetchErr != nil { // this error is synced in fetchHandles(), don't sync it again e.feedbacks[workID].Invalidate() } @@ -355,7 +380,7 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, } } }, - e.handleHandlesFetcherPanic(ctx, e.resultCh, "partialIndexWorker"), + handleWorkerPanic(ctx, e.finished, fetchCh, partialIndexWorkerType), ) }() @@ -379,6 +404,17 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context, defer e.idxWorkerWg.Done() util.WithRecovery( func() { + failpoint.Inject("testIndexMergePanicPartialTableWorker", nil) + failpoint.Inject("mockSleepBeforeStartTableReader", func(_ failpoint.Value) { + select { + case <-ctx.Done(): + failpoint.Return() + case <-e.finished: + failpoint.Return() + case <-exitCh: + failpoint.Return() + } + }) var err error partialTableReader := &TableReaderExecutor{ baseExecutor: newBaseExecutor(e.ctx, ts.Schema(), e.getPartitalPlanID(workID)), @@ -399,11 +435,12 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context, maxBatchSize: e.ctx.GetSessionVars().IndexLookupSize, maxChunkSize: e.maxChunkSize, tableReader: partialTableReader, + memTracker: e.memTracker, } if e.isCorColInPartialFilters[workID] { if e.dagPBs[workID].Executors, _, err = constructDistExec(e.ctx, e.partialPlans[workID]); err != nil { - worker.syncErr(e.resultCh, err) + syncErr(ctx, e.finished, fetchCh, err) return } partialTableReader.dagPB = e.dagPBs[workID] @@ -421,7 +458,7 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context, partialTableReader.table = tbl if err = partialTableReader.Open(ctx); err != nil { logutil.Logger(ctx).Error("open Select result failed:", zap.Error(err)) - worker.syncErr(e.resultCh, err) + syncErr(ctx, e.finished, fetchCh, err) break } worker.batchSize = e.maxChunkSize @@ -434,7 +471,7 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context, // fetch all handles from this table ctx1, cancel := context.WithCancel(ctx) - _, fetchErr := worker.fetchHandles(ctx1, exitCh, fetchCh, e.resultCh, e.finished, e.handleCols) + _, fetchErr := worker.fetchHandles(ctx1, exitCh, fetchCh, e.finished, e.handleCols) if fetchErr != nil { // this error is synced in fetchHandles, so don't sync it again e.feedbacks[workID].Invalidate() } @@ -450,7 +487,7 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context, } } }, - e.handleHandlesFetcherPanic(ctx, e.resultCh, "partialTableWorker"), + handleWorkerPanic(ctx, e.finished, fetchCh, partialTableWorkerType), ) }() return nil @@ -487,17 +524,10 @@ type partialTableWorker struct { maxChunkSize int tableReader Executor partition table.PhysicalTable // it indicates if this worker is accessing a particular partition table + memTracker *memory.Tracker } -func (w *partialTableWorker) syncErr(resultCh chan<- *lookupTableTask, err error) { - doneCh := make(chan error, 1) - doneCh <- err - resultCh <- &lookupTableTask{ - doneCh: doneCh, - } -} - -func (w *partialTableWorker) fetchHandles(ctx context.Context, exitCh <-chan struct{}, fetchCh chan<- *lookupTableTask, resultCh chan<- *lookupTableTask, +func (w *partialTableWorker) fetchHandles(ctx context.Context, exitCh <-chan struct{}, fetchCh chan<- *lookupTableTask, finished <-chan struct{}, handleCols plannercore.HandleCols) (count int64, err error) { chk := chunk.NewChunkWithCapacity(retTypes(w.tableReader), w.maxChunkSize) var basic *execdetails.BasicRuntimeStats @@ -508,7 +538,7 @@ func (w *partialTableWorker) fetchHandles(ctx context.Context, exitCh <-chan str start := time.Now() handles, retChunk, err := w.extractTaskHandles(ctx, chk, handleCols) if err != nil { - w.syncErr(resultCh, err) + syncErr(ctx, finished, fetchCh, err) return count, err } if len(handles) == 0 { @@ -537,6 +567,8 @@ func (w *partialTableWorker) fetchHandles(ctx context.Context, exitCh <-chan str func (w *partialTableWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk, handleCols plannercore.HandleCols) ( handles []kv.Handle, retChk *chunk.Chunk, err error) { handles = make([]kv.Handle, 0, w.batchSize) + var memUsage int64 + defer w.memTracker.Consume(-memUsage) for len(handles) < w.batchSize { chk.SetRequiredRows(w.batchSize-len(handles), w.maxChunkSize) err = errors.Trace(w.tableReader.Next(ctx, chk)) @@ -544,8 +576,14 @@ func (w *partialTableWorker) extractTaskHandles(ctx context.Context, chk *chunk. return handles, nil, err } if chk.NumRows() == 0 { + failpoint.Inject("testIndexMergeErrorPartialTableWorker", func(v failpoint.Value) { + failpoint.Return(handles, nil, errors.New(v.(string))) + }) return handles, retChk, nil } + memDelta := chk.MemoryUsage() + memUsage += memDelta + w.memTracker.Consume(memDelta) for i := 0; i < chk.NumRows(); i++ { handle, err := handleCols.BuildHandle(chk.GetRow(i)) if err != nil { @@ -590,13 +628,13 @@ func (e *IndexMergeReaderExecutor) startIndexMergeTableScanWorker(ctx context.Co defer trace.StartRegion(ctx, "IndexMergeTableScanWorker").End() var task *lookupTableTask util.WithRecovery( - // Note we use the address of `task` as the argument of both `pickAndExecTask` and `handlePickAndExecTaskPanic` + // Note we use the address of `task` as the argument of both `pickAndExecTask` and `handleTableScanWorkerPanic` // because `task` is expected to be assigned in `pickAndExecTask`, and this assignment should also be visible - // in `handlePickAndExecTaskPanic` since it will get `doneCh` from `task`. Golang always pass argument by value, + // in `handleTableScanWorkerPanic` since it will get `doneCh` from `task`. Golang always pass argument by value, // so if we don't use the address of `task` as the argument, the assignment to `task` in `pickAndExecTask` is - // not visible in `handlePickAndExecTaskPanic` + // not visible in `handleTableScanWorkerPanic` func() { worker.pickAndExecTask(ctx1, &task) }, - worker.handlePickAndExecTaskPanic(ctx1, &task), + worker.handleTableScanWorkerPanic(ctx1, e.finished, &task, tableScanWorkerType), ) cancel() e.tblWorkerWg.Done() @@ -681,19 +719,32 @@ func (e *IndexMergeReaderExecutor) getResultTask() (*lookupTableTask, error) { return e.resultCurr, nil } -func (e *IndexMergeReaderExecutor) handleHandlesFetcherPanic(ctx context.Context, resultCh chan<- *lookupTableTask, worker string) func(r interface{}) { +func handleWorkerPanic(ctx context.Context, finished <-chan struct{}, ch chan<- *lookupTableTask, worker string) func(r interface{}) { return func(r interface{}) { + if worker == processWorkerType { + // There is only one processWorker, so it's safe to close here. + // No need to worry about "close on closed channel" error. + defer close(ch) + } if r == nil { return } - err4Panic := errors.Errorf("panic in IndexMergeReaderExecutor %s: %v", worker, r) + err4Panic := errors.Errorf("%s: %v", worker, r) logutil.Logger(ctx).Error(err4Panic.Error()) doneCh := make(chan error, 1) doneCh <- err4Panic - resultCh <- &lookupTableTask{ + task := &lookupTableTask{ doneCh: doneCh, } + select { + case <-ctx.Done(): + return + case <-finished: + return + case ch <- task: + return + } } } @@ -703,9 +754,9 @@ func (e *IndexMergeReaderExecutor) Close() error { return nil } close(e.finished) - e.processWokerWg.Wait() e.tblWorkerWg.Wait() e.idxWorkerWg.Wait() + e.processWorkerWg.Wait() e.finished = nil e.workerStarted = false // TODO: how to store e.feedbacks @@ -719,17 +770,32 @@ type indexMergeProcessWorker struct { func (w *indexMergeProcessWorker) fetchLoop(ctx context.Context, fetchCh <-chan *lookupTableTask, workCh chan<- *lookupTableTask, resultCh chan<- *lookupTableTask, finished <-chan struct{}) { - defer func() { - close(workCh) - close(resultCh) - }() + failpoint.Inject("testIndexMergeResultChCloseEarly", func(_ failpoint.Value) { + failpoint.Return() + }) + memTracker := memory.NewTracker(w.indexMerge.id, -1) + memTracker.AttachTo(w.indexMerge.memTracker) + defer memTracker.Detach() + defer close(workCh) + failpoint.Inject("testIndexMergePanicProcessWorkerUnion", nil) distinctHandles := make(map[int64]*kv.HandleMap) for task := range fetchCh { + select { + case err := <-task.doneCh: + // If got error from partialIndexWorker/partialTableWorker, stop processing. + if err != nil { + syncErr(ctx, finished, resultCh, err) + return + } + default: + } start := time.Now() handles := task.handles fhs := make([]kv.Handle, 0, 8) + memTracker.Consume(int64(cap(task.handles) * 8)) + var tblID int64 if w.indexMerge.partitionTableMode { tblID = getPhysicalTableID(task.partitionTable) @@ -770,22 +836,6 @@ func (w *indexMergeProcessWorker) fetchLoop(ctx context.Context, fetchCh <-chan } } -func (w *indexMergeProcessWorker) handleLoopFetcherPanic(ctx context.Context, resultCh chan<- *lookupTableTask) func(r interface{}) { - return func(r interface{}) { - if r == nil { - return - } - - err4Panic := errors.Errorf("panic in IndexMergeReaderExecutor indexMergeTableWorker: %v", r) - logutil.Logger(ctx).Error(err4Panic.Error()) - doneCh := make(chan error, 1) - doneCh <- err4Panic - resultCh <- &lookupTableTask{ - doneCh: doneCh, - } - } -} - type partialIndexWorker struct { stats *IndexMergeRuntimeStat sc sessionctx.Context @@ -794,14 +844,26 @@ type partialIndexWorker struct { maxBatchSize int maxChunkSize int partition table.PhysicalTable // it indicates if this worker is accessing a particular partition table + memTracker *memory.Tracker } -func (w *partialIndexWorker) syncErr(resultCh chan<- *lookupTableTask, err error) { +func syncErr(ctx context.Context, finished <-chan struct{}, errCh chan<- *lookupTableTask, err error) { + logutil.BgLogger().Error("IndexMergeReaderExecutor.syncErr", zap.Error(err)) doneCh := make(chan error, 1) doneCh <- err - resultCh <- &lookupTableTask{ + task := &lookupTableTask{ doneCh: doneCh, } + + // ctx.Done and finished is to avoid write channel is stuck. + select { + case <-ctx.Done(): + return + case <-finished: + return + case errCh <- task: + return + } } func (w *partialIndexWorker) fetchHandles( @@ -809,7 +871,6 @@ func (w *partialIndexWorker) fetchHandles( result distsql.SelectResult, exitCh <-chan struct{}, fetchCh chan<- *lookupTableTask, - resultCh chan<- *lookupTableTask, finished <-chan struct{}, handleCols plannercore.HandleCols) (count int64, err error) { chk := chunk.NewChunkWithCapacity(handleCols.GetFieldsTypes(), w.maxChunkSize) @@ -824,7 +885,7 @@ func (w *partialIndexWorker) fetchHandles( start := time.Now() handles, retChunk, err := w.extractTaskHandles(ctx, chk, result, handleCols) if err != nil { - w.syncErr(resultCh, err) + syncErr(ctx, finished, fetchCh, err) return count, err } if len(handles) == 0 { @@ -853,6 +914,8 @@ func (w *partialIndexWorker) fetchHandles( func (w *partialIndexWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk, idxResult distsql.SelectResult, handleCols plannercore.HandleCols) ( handles []kv.Handle, retChk *chunk.Chunk, err error) { handles = make([]kv.Handle, 0, w.batchSize) + var memUsage int64 + defer w.memTracker.Consume(-memUsage) for len(handles) < w.batchSize { chk.SetRequiredRows(w.batchSize-len(handles), w.maxChunkSize) err = errors.Trace(idxResult.Next(ctx, chk)) @@ -860,8 +923,14 @@ func (w *partialIndexWorker) extractTaskHandles(ctx context.Context, chk *chunk. return handles, nil, err } if chk.NumRows() == 0 { + failpoint.Inject("testIndexMergeErrorPartialIndexWorker", func(v failpoint.Value) { + failpoint.Return(handles, nil, errors.New(v.(string))) + }) return handles, retChk, nil } + memDelta := chk.MemoryUsage() + memUsage += memDelta + w.memTracker.Consume(memDelta) for i := 0; i < chk.NumRows(); i++ { handle, err := handleCols.BuildHandleFromIndexRow(chk.GetRow(i)) if err != nil { @@ -912,6 +981,17 @@ func (w *indexMergeTableScanWorker) pickAndExecTask(ctx context.Context, task ** case <-w.finished: return } + // Make sure panic failpoint is after fetch task from workCh. + // Otherwise cannot send error to task.doneCh. + failpoint.Inject("testIndexMergePanicTableScanWorker", nil) + failpoint.Inject("mockSleepBeforeStartTableReader", func(_ failpoint.Value) { + select { + case <-ctx.Done(): + failpoint.Return() + case <-w.finished: + failpoint.Return() + } + }) execStart := time.Now() err := w.executeTask(ctx, *task) if w.stats != nil { @@ -924,16 +1004,23 @@ func (w *indexMergeTableScanWorker) pickAndExecTask(ctx context.Context, task ** } } -func (w *indexMergeTableScanWorker) handlePickAndExecTaskPanic(ctx context.Context, task **lookupTableTask) func(r interface{}) { +func (w *indexMergeTableScanWorker) handleTableScanWorkerPanic(ctx context.Context, finished <-chan struct{}, task **lookupTableTask, worker string) func(r interface{}) { return func(r interface{}) { if r == nil { return } - err4Panic := errors.Errorf("panic in IndexMergeReaderExecutor indexMergeTableWorker: %v", r) + err4Panic := errors.Errorf("%s: %v", worker, r) logutil.Logger(ctx).Error(err4Panic.Error()) if *task != nil { - (*task).doneCh <- err4Panic + select { + case <-ctx.Done(): + return + case <-finished: + return + case (*task).doneCh <- err4Panic: + return + } } } } diff --git a/executor/index_merge_reader_test.go b/executor/index_merge_reader_test.go index 9d06eb3d70b68..81dcc8155a7c7 100644 --- a/executor/index_merge_reader_test.go +++ b/executor/index_merge_reader_test.go @@ -605,3 +605,82 @@ func TestPessimisticLockOnPartitionForIndexMerge(t *testing.T) { // TODO: add support for index merge reader in dynamic tidb_partition_prune_mode } + +func TestIndexMergePanic(t *testing.T) { + store, clean := testkit.CreateMockStore(t) + defer clean() + tk := testkit.NewTestKit(t, store) + + tk.MustExec("use test") + tk.MustExec("drop table if exists t1") + tk.MustExec("create table t1(c1 int, c2 bigint, c3 bigint, primary key(c1), key(c2), key(c3));") + tk.MustExec("insert into t1 values(1, 1, 1), (100, 100, 100)") + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeResultChCloseEarly", "return(true)")) + tk.MustExec("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c1 < 100 or c2 < 100") + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testIndexMergeResultChCloseEarly")) + + tk.MustExec("use test") + tk.MustExec("drop table if exists t1") + tk.MustExec("create table t1(c1 int, c2 bigint, c3 bigint, primary key(c1), key(c2), key(c3)) partition by hash(c1) partitions 10;") + insertStr := "insert into t1 values(0, 0, 0)" + for i := 1; i < 1000; i++ { + insertStr += fmt.Sprintf(", (%d, %d, %d)", i, i, i) + } + tk.MustExec(insertStr) + tk.MustExec("analyze table t1;") + tk.MustExec("set tidb_partition_prune_mode = 'dynamic'") + + minV := 200 + maxV := 1000 + runSQL := func(fp string) { + var sql string + v1 := rand.Intn(maxV-minV) + minV + v2 := rand.Intn(maxV-minV) + minV + sql = fmt.Sprintf("select /*+ use_index_merge(t1) */ c1 from t1 where c1 < %d or c2 < %d;", v1, v2) + res := tk.MustQuery("explain " + sql).Rows() + require.Contains(t, res[1][0], "IndexMerge") + err := tk.QueryToErr(sql) + require.Contains(t, err.Error(), fp) + } + + packagePath := "github.com/pingcap/tidb/executor/" + panicFPPaths := []string{ + packagePath + "testIndexMergePanicPartialIndexWorker", + packagePath + "testIndexMergePanicPartialTableWorker", + + packagePath + "testIndexMergePanicProcessWorkerUnion", + + packagePath + "testIndexMergePanicTableScanWorker", + } + for _, fp := range panicFPPaths { + fmt.Println("handling failpoint: ", fp) + if !strings.Contains(fp, "testIndexMergePanicTableScanWorker") { + // When mockSleepBeforeStartTableReader is enabled, will not read real data. This is to avoid leaking goroutines in coprocessor. + // But should disable mockSleepBeforeStartTableReader for testIndexMergePanicTableScanWorker. + // Because finalTableScanWorker need task.doneCh to pass error, so need partialIndexWorker/partialTableWorker runs normally. + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/mockSleepBeforeStartTableReader", "return(1000)")) + } + for i := 0; i < 1000; i++ { + require.NoError(t, failpoint.Enable(fp, fmt.Sprintf(`panic("%s")`, fp))) + runSQL(fp) + require.NoError(t, failpoint.Disable(fp)) + } + if !strings.Contains(fp, "testIndexMergePanicTableScanWorker") { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/mockSleepBeforeStartTableReader")) + } + } + + errFPPaths := []string{ + packagePath + "testIndexMergeErrorPartialIndexWorker", + packagePath + "testIndexMergeErrorPartialTableWorker", + } + for _, fp := range errFPPaths { + fmt.Println("handling failpoint: ", fp) + require.NoError(t, failpoint.Enable(fp, fmt.Sprintf(`return("%s")`, fp))) + for i := 0; i < 100; i++ { + runSQL(fp) + } + require.NoError(t, failpoint.Disable(fp)) + } +}