From 64ab333a72ce82b6b35e46054442213267a23046 Mon Sep 17 00:00:00 2001 From: HuaiyuXu <391585975@qq.com> Date: Wed, 15 Jul 2020 16:41:24 +0800 Subject: [PATCH] cherry pick #18573 to release-4.0 Signed-off-by: ti-srebot --- executor/index_lookup_hash_join.go | 47 +++++++++++++++---------- executor/index_lookup_merge_join.go | 2 +- executor/index_merge_reader.go | 4 +-- executor/join_test.go | 54 +++++++++++++++++++++++++++++ 4 files changed, 85 insertions(+), 22 deletions(-) diff --git a/executor/index_lookup_hash_join.go b/executor/index_lookup_hash_join.go index e66c912160153..4dc8109feef63 100644 --- a/executor/index_lookup_hash_join.go +++ b/executor/index_lookup_hash_join.go @@ -21,16 +21,15 @@ import ( "sync" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/expression" plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/codec" "github.com/pingcap/tidb/util/execdetails" - "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/ranger" - "go.uber.org/zap" ) // numResChkHold indicates the number of resource chunks that an inner worker @@ -158,7 +157,7 @@ func (e *IndexNestedLoopHashJoin) startWorkers(ctx context.Context) { } e.workerWg.Add(1) ow := e.newOuterWorker(innerCh) - go util.WithRecovery(func() { ow.run(workerCtx, cancelFunc) }, e.finishJoinWorkers) + go util.WithRecovery(func() { ow.run(workerCtx) }, e.finishJoinWorkers) if !e.keepOuterOrder { e.resultCh = make(chan *indexHashJoinResult, concurrency) @@ -230,7 +229,7 @@ func (e *IndexNestedLoopHashJoin) Next(ctx context.Context, req *chunk.Chunk) er return result.err } case <-ctx.Done(): - return nil + return ctx.Err() } req.SwapColumns(result.chk) result.src <- result.chk @@ -256,7 +255,7 @@ func (e *IndexNestedLoopHashJoin) runInOrder(ctx context.Context, req *chunk.Chu return result.err } case <-ctx.Done(): - return nil + return ctx.Err() } req.SwapColumns(result.chk) result.src <- result.chk @@ -308,16 +307,22 @@ func (e *IndexNestedLoopHashJoin) Close() error { return e.baseExecutor.Close() } -func (ow *indexHashJoinOuterWorker) run(ctx context.Context, cancelFunc context.CancelFunc) { +func (ow *indexHashJoinOuterWorker) run(ctx context.Context) { defer close(ow.innerCh) for { task, err := ow.buildTask(ctx) - if task == nil { + failpoint.Inject("testIndexHashJoinOuterWorkerErr", func() { + err = errors.New("mockIndexHashJoinOuterWorkerErr") + }) + if err != nil { + task = &indexHashJoinTask{err: err} + ow.pushToChan(ctx, task, ow.innerCh) + if ow.keepOuterOrder { + ow.pushToChan(ctx, task, ow.taskCh) + } return } - if err != nil { - cancelFunc() - logutil.Logger(ctx).Error("indexHashJoinOuterWorker.run failed", zap.Error(err)) + if task == nil { return } if finished := ow.pushToChan(ctx, task, ow.innerCh); finished { @@ -449,7 +454,7 @@ func (iw *indexHashJoinInnerWorker) run(ctx context.Context, cancelFunc context. if task.keepOuterOrder { resultCh = task.resultCh } - err := iw.handleTask(ctx, cancelFunc, task, joinResult, h, resultCh) + err := iw.handleTask(ctx, task, joinResult, h, resultCh) if err != nil { joinResult.err = err break @@ -465,9 +470,11 @@ func (iw *indexHashJoinInnerWorker) run(ctx context.Context, cancelFunc context. } } } + failpoint.Inject("testIndexHashJoinInnerWorkerErr", func() { + joinResult.err = errors.New("mockIndexHashJoinInnerWorkerErr") + }) if joinResult.err != nil { - cancelFunc() - logutil.Logger(ctx).Error("indexHashJoinInnerWorker.run failed", zap.Error(joinResult.err)) + resultCh <- joinResult return } // When task.keepOuterOrder is TRUE(resultCh != iw.resultCh), the last @@ -495,7 +502,7 @@ func (iw *indexHashJoinInnerWorker) getNewJoinResult(ctx context.Context) (*inde return joinResult, ok } -func (iw *indexHashJoinInnerWorker) buildHashTableForOuterResult(ctx context.Context, cancelFunc context.CancelFunc, task *indexHashJoinTask, h hash.Hash64) { +func (iw *indexHashJoinInnerWorker) buildHashTableForOuterResult(ctx context.Context, task *indexHashJoinTask, h hash.Hash64) { buf, numChks := make([]byte, 1), task.outerResult.NumChunks() task.lookupMap = newRowHashMap(task.outerResult.Len()) for chkIdx := 0; chkIdx < numChks; chkIdx++ { @@ -515,10 +522,12 @@ func (iw *indexHashJoinInnerWorker) buildHashTableForOuterResult(ctx context.Con } h.Reset() err := codec.HashChunkRow(iw.ctx.GetSessionVars().StmtCtx, h, row, iw.outerCtx.rowTypes, keyColIdx, buf) + failpoint.Inject("testIndexHashJoinBuildErr", func() { + err = errors.New("mockIndexHashJoinBuildErr") + }) if err != nil { - cancelFunc() - logutil.Logger(ctx).Error("indexHashJoinInnerWorker.buildHashTableForOuterResult failed", zap.Error(err)) - return + // This panic will be recovered by the invoker. + panic(err.Error()) } rowPtr := chunk.RowPtr{ChkIdx: uint32(chkIdx), RowIdx: uint32(rowIdx)} task.lookupMap.Put(h.Sum64(), rowPtr) @@ -542,11 +551,11 @@ func (iw *indexHashJoinInnerWorker) handleHashJoinInnerWorkerPanic(r interface{} iw.wg.Done() } -func (iw *indexHashJoinInnerWorker) handleTask(ctx context.Context, cancelFunc context.CancelFunc, task *indexHashJoinTask, joinResult *indexHashJoinResult, h hash.Hash64, resultCh chan *indexHashJoinResult) error { +func (iw *indexHashJoinInnerWorker) handleTask(ctx context.Context, task *indexHashJoinTask, joinResult *indexHashJoinResult, h hash.Hash64, resultCh chan *indexHashJoinResult) error { iw.wg = &sync.WaitGroup{} iw.wg.Add(1) // TODO(XuHuaiyu): we may always use the smaller side to build the hashtable. - go util.WithRecovery(func() { iw.buildHashTableForOuterResult(ctx, cancelFunc, task, h) }, iw.handleHashJoinInnerWorkerPanic) + go util.WithRecovery(func() { iw.buildHashTableForOuterResult(ctx, task, h) }, iw.handleHashJoinInnerWorkerPanic) err := iw.fetchInnerResults(ctx, task.lookUpJoinTask) if err != nil { return err diff --git a/executor/index_lookup_merge_join.go b/executor/index_lookup_merge_join.go index cf944637ab5b3..eb9ab4e1d6342 100644 --- a/executor/index_lookup_merge_join.go +++ b/executor/index_lookup_merge_join.go @@ -281,7 +281,7 @@ func (e *IndexLookUpMergeJoin) Next(ctx context.Context, req *chunk.Chunk) error result.src <- result.chk return nil case <-ctx.Done(): - return nil + return ctx.Err() } } diff --git a/executor/index_merge_reader.go b/executor/index_merge_reader.go index 443c8a84e8857..497e25b7b379c 100644 --- a/executor/index_merge_reader.go +++ b/executor/index_merge_reader.go @@ -337,7 +337,7 @@ func (w *partialTableWorker) fetchHandles(ctx context.Context, exitCh <-chan str task := w.buildTableTask(handles, retChunk) select { case <-ctx.Done(): - return count, nil + return count, ctx.Err() case <-exitCh: return count, nil case <-finished: @@ -586,7 +586,7 @@ func (w *partialIndexWorker) fetchHandles(ctx context.Context, result distsql.Se task := w.buildTableTask(handles, retChunk) select { case <-ctx.Done(): - return count, nil + return count, ctx.Err() case <-exitCh: return count, nil case <-finished: diff --git a/executor/join_test.go b/executor/join_test.go index 5faacb2ce1cb7..6e577b1a73603 100644 --- a/executor/join_test.go +++ b/executor/join_test.go @@ -2088,3 +2088,57 @@ func (s *testSuiteJoinSerial) TestIssue18070(c *C) { err = tk.QueryToErr("select /*+ inl_merge_join(t1)*/ * from t1 join t2 on t1.a = t2.a;") c.Assert(strings.Contains(err.Error(), "Out Of Memory Quota!"), IsTrue) } + +func (s *testSuite9) TestIssue18572_1(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("drop table if exists t1") + tk.MustExec("create table t1(a int, b int, index idx(b));") + tk.MustExec("insert into t1 values(1, 1);") + tk.MustExec("insert into t1 select * from t1;") + + c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/testIndexHashJoinInnerWorkerErr", "return"), IsNil) + defer func() { + c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/testIndexHashJoinInnerWorkerErr"), IsNil) + }() + + rs, err := tk.Exec("select /*+ inl_hash_join(t1) */ * from t1 right join t1 t2 on t1.b=t2.b;") + c.Assert(err, IsNil) + _, err = session.GetRows4Test(context.Background(), nil, rs) + c.Assert(strings.Contains(err.Error(), "mockIndexHashJoinInnerWorkerErr"), IsTrue) +} + +func (s *testSuite9) TestIssue18572_2(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("drop table if exists t1") + tk.MustExec("create table t1(a int, b int, index idx(b));") + tk.MustExec("insert into t1 values(1, 1);") + tk.MustExec("insert into t1 select * from t1;") + + c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/testIndexHashJoinOuterWorkerErr", "return"), IsNil) + defer func() { + c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/testIndexHashJoinOuterWorkerErr"), IsNil) + }() + + rs, err := tk.Exec("select /*+ inl_hash_join(t1) */ * from t1 right join t1 t2 on t1.b=t2.b;") + c.Assert(err, IsNil) + _, err = session.GetRows4Test(context.Background(), nil, rs) + c.Assert(strings.Contains(err.Error(), "mockIndexHashJoinOuterWorkerErr"), IsTrue) +} + +func (s *testSuite9) TestIssue18572_3(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("drop table if exists t1") + tk.MustExec("create table t1(a int, b int, index idx(b));") + tk.MustExec("insert into t1 values(1, 1);") + tk.MustExec("insert into t1 select * from t1;") + + c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/testIndexHashJoinBuildErr", "return"), IsNil) + defer func() { + c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/testIndexHashJoinBuildErr"), IsNil) + }() + + rs, err := tk.Exec("select /*+ inl_hash_join(t1) */ * from t1 right join t1 t2 on t1.b=t2.b;") + c.Assert(err, IsNil) + _, err = session.GetRows4Test(context.Background(), nil, rs) + c.Assert(strings.Contains(err.Error(), "mockIndexHashJoinBuildErr"), IsTrue) +}