From a84b516427e691d52d08d97111e2533069112d00 Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Wed, 15 Jul 2020 13:13:16 +0800 Subject: [PATCH 1/4] executor: return error from indexHashJoin worker to the main thread --- executor/index_lookup_hash_join.go | 25 ++++++++++++------ executor/index_lookup_merge_join_test.go | 33 ++++++++++++++++++++++++ 2 files changed, 50 insertions(+), 8 deletions(-) diff --git a/executor/index_lookup_hash_join.go b/executor/index_lookup_hash_join.go index b490ed51eb59f..5ffa7259cd1bc 100644 --- a/executor/index_lookup_hash_join.go +++ b/executor/index_lookup_hash_join.go @@ -21,6 +21,7 @@ import ( "sync" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/expression" plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/util" @@ -158,7 +159,7 @@ func (e *IndexNestedLoopHashJoin) startWorkers(ctx context.Context) { } e.workerWg.Add(1) ow := e.newOuterWorker(innerCh) - go util.WithRecovery(func() { ow.run(workerCtx, cancelFunc) }, e.finishJoinWorkers) + go util.WithRecovery(func() { ow.run(workerCtx) }, e.finishJoinWorkers) if !e.keepOuterOrder { e.resultCh = make(chan *indexHashJoinResult, concurrency) @@ -308,16 +309,22 @@ func (e *IndexNestedLoopHashJoin) Close() error { return e.baseExecutor.Close() } -func (ow *indexHashJoinOuterWorker) run(ctx context.Context, cancelFunc context.CancelFunc) { +func (ow *indexHashJoinOuterWorker) run(ctx context.Context) { defer close(ow.innerCh) for { task, err := ow.buildTask(ctx) - if task == nil { + failpoint.Inject("testIndexHashJoinOuterWorkerErr", func() { + err = errors.New("mockIndexHashJoinOuterWorkerErr") + }) + if err != nil { + task = &indexHashJoinTask{err: err} + ow.pushToChan(ctx, task, ow.innerCh) + if ow.keepOuterOrder { + ow.pushToChan(ctx, task, ow.taskCh) + } return } - if err != nil { - cancelFunc() - logutil.Logger(ctx).Error("indexHashJoinOuterWorker.run failed", zap.Error(err)) + if task == nil { return } if finished := ow.pushToChan(ctx, task, ow.innerCh); finished { @@ -465,9 +472,11 @@ func (iw *indexHashJoinInnerWorker) run(ctx context.Context, cancelFunc context. } } } + failpoint.Inject("testIndexHashJoinInnerWorkerErr", func() { + joinResult = &indexHashJoinResult{err: errors.New("mockIndexHashJoinInnerWorkerErr")} + }) if joinResult.err != nil { - cancelFunc() - logutil.Logger(ctx).Error("indexHashJoinInnerWorker.run failed", zap.Error(joinResult.err)) + resultCh <- joinResult return } // When task.keepOuterOrder is TRUE(resultCh != iw.resultCh), the last diff --git a/executor/index_lookup_merge_join_test.go b/executor/index_lookup_merge_join_test.go index 32df311a27433..0cc856e663a1c 100644 --- a/executor/index_lookup_merge_join_test.go +++ b/executor/index_lookup_merge_join_test.go @@ -4,6 +4,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/util/testkit" + "strings" ) func (s *testSuite9) TestIndexLookupMergeJoinHang(c *C) { @@ -46,3 +47,35 @@ func (s *testSuite9) TestIssue18068(c *C) { tk.MustExec("select /*+ inl_merge_join(s)*/ 1 from t join s on t.a = s.a limit 1") tk.MustExec("select /*+ inl_merge_join(s)*/ 1 from t join s on t.a = s.a limit 1") } + +func (s *testSuite9) TestIssue18572_1(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("drop table if exists t1") + tk.MustExec("create table t1(a int, b int, index idx(b));") + tk.MustExec("insert into t1 values(1, 1);") + tk.MustExec("insert into t1 select * from t1;") + + c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/testIndexHashJoinInnerWorkerErr", "return"), IsNil) + defer func() { + c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/testIndexHashJoinInnerWorkerErr"), IsNil) + }() + + err := tk.QueryToErr("select /*+ inl_hash_join(t1) */ * from t1 right join t1 t2 on t1.b=t2.b;") + c.Assert(strings.Contains(err.Error(), "mockIndexHashJoinInnerWorkerErr"), IsTrue) +} + +func (s *testSuite9) TestIssue18572_2(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("drop table if exists t1") + tk.MustExec("create table t1(a int, b int, index idx(b));") + tk.MustExec("insert into t1 values(1, 1);") + tk.MustExec("insert into t1 select * from t1;") + + c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/testIndexHashJoinOuterWorkerErr", "return"), IsNil) + defer func() { + c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/testIndexHashJoinOuterWorkerErr"), IsNil) + }() + + err := tk.QueryToErr("select /*+ inl_hash_join(t1) */ * from t1 right join t1 t2 on t1.b=t2.b;") + c.Assert(strings.Contains(err.Error(), "mockIndexHashJoinOuterWorkerErr"), IsTrue) +} From 2949841b6a6c39719947895cd483e690d874c458 Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Wed, 15 Jul 2020 14:13:49 +0800 Subject: [PATCH 2/4] fix ci --- executor/index_lookup_hash_join.go | 20 +++++++-------- executor/index_lookup_merge_join_test.go | 31 +++++++++++++++++++++--- 2 files changed, 38 insertions(+), 13 deletions(-) diff --git a/executor/index_lookup_hash_join.go b/executor/index_lookup_hash_join.go index 5ffa7259cd1bc..4a6f066942efd 100644 --- a/executor/index_lookup_hash_join.go +++ b/executor/index_lookup_hash_join.go @@ -28,10 +28,8 @@ import ( "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/codec" "github.com/pingcap/tidb/util/execdetails" - "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/ranger" - "go.uber.org/zap" ) // numResChkHold indicates the number of resource chunks that an inner worker @@ -456,7 +454,7 @@ func (iw *indexHashJoinInnerWorker) run(ctx context.Context, cancelFunc context. if task.keepOuterOrder { resultCh = task.resultCh } - err := iw.handleTask(ctx, cancelFunc, task, joinResult, h, resultCh) + err := iw.handleTask(ctx, task, joinResult, h, resultCh) if err != nil { joinResult.err = err break @@ -473,7 +471,7 @@ func (iw *indexHashJoinInnerWorker) run(ctx context.Context, cancelFunc context. } } failpoint.Inject("testIndexHashJoinInnerWorkerErr", func() { - joinResult = &indexHashJoinResult{err: errors.New("mockIndexHashJoinInnerWorkerErr")} + joinResult.err = errors.New("mockIndexHashJoinInnerWorkerErr") }) if joinResult.err != nil { resultCh <- joinResult @@ -504,7 +502,7 @@ func (iw *indexHashJoinInnerWorker) getNewJoinResult(ctx context.Context) (*inde return joinResult, ok } -func (iw *indexHashJoinInnerWorker) buildHashTableForOuterResult(ctx context.Context, cancelFunc context.CancelFunc, task *indexHashJoinTask, h hash.Hash64) { +func (iw *indexHashJoinInnerWorker) buildHashTableForOuterResult(ctx context.Context, task *indexHashJoinTask, h hash.Hash64) { buf, numChks := make([]byte, 1), task.outerResult.NumChunks() task.lookupMap = newRowHashMap(task.outerResult.Len()) for chkIdx := 0; chkIdx < numChks; chkIdx++ { @@ -524,10 +522,12 @@ func (iw *indexHashJoinInnerWorker) buildHashTableForOuterResult(ctx context.Con } h.Reset() err := codec.HashChunkRow(iw.ctx.GetSessionVars().StmtCtx, h, row, iw.outerCtx.rowTypes, keyColIdx, buf) + failpoint.Inject("testIndexHashJoinBuildErr", func() { + err = errors.New("mockIndexHashJoinBuildErr") + }) if err != nil { - cancelFunc() - logutil.Logger(ctx).Error("indexHashJoinInnerWorker.buildHashTableForOuterResult failed", zap.Error(err)) - return + // This panic will be recovered by the invoker. + panic(err.Error()) } rowPtr := chunk.RowPtr{ChkIdx: uint32(chkIdx), RowIdx: uint32(rowIdx)} task.lookupMap.Put(h.Sum64(), rowPtr) @@ -551,11 +551,11 @@ func (iw *indexHashJoinInnerWorker) handleHashJoinInnerWorkerPanic(r interface{} iw.wg.Done() } -func (iw *indexHashJoinInnerWorker) handleTask(ctx context.Context, cancelFunc context.CancelFunc, task *indexHashJoinTask, joinResult *indexHashJoinResult, h hash.Hash64, resultCh chan *indexHashJoinResult) error { +func (iw *indexHashJoinInnerWorker) handleTask(ctx context.Context, task *indexHashJoinTask, joinResult *indexHashJoinResult, h hash.Hash64, resultCh chan *indexHashJoinResult) error { iw.wg = &sync.WaitGroup{} iw.wg.Add(1) // TODO(XuHuaiyu): we may always use the smaller side to build the hashtable. - go util.WithRecovery(func() { iw.buildHashTableForOuterResult(ctx, cancelFunc, task, h) }, iw.handleHashJoinInnerWorkerPanic) + go util.WithRecovery(func() { iw.buildHashTableForOuterResult(ctx, task, h) }, iw.handleHashJoinInnerWorkerPanic) err := iw.fetchInnerResults(ctx, task.lookUpJoinTask) if err != nil { return err diff --git a/executor/index_lookup_merge_join_test.go b/executor/index_lookup_merge_join_test.go index 0cc856e663a1c..31783c6ead81e 100644 --- a/executor/index_lookup_merge_join_test.go +++ b/executor/index_lookup_merge_join_test.go @@ -1,10 +1,13 @@ package executor_test import ( + "strings" + + "context" . "github.com/pingcap/check" "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/util/testkit" - "strings" ) func (s *testSuite9) TestIndexLookupMergeJoinHang(c *C) { @@ -60,7 +63,9 @@ func (s *testSuite9) TestIssue18572_1(c *C) { c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/testIndexHashJoinInnerWorkerErr"), IsNil) }() - err := tk.QueryToErr("select /*+ inl_hash_join(t1) */ * from t1 right join t1 t2 on t1.b=t2.b;") + rs, err := tk.Exec("select /*+ inl_hash_join(t1) */ * from t1 right join t1 t2 on t1.b=t2.b;") + c.Assert(err, IsNil) + _, err = session.GetRows4Test(context.Background(), nil, rs) c.Assert(strings.Contains(err.Error(), "mockIndexHashJoinInnerWorkerErr"), IsTrue) } @@ -76,6 +81,26 @@ func (s *testSuite9) TestIssue18572_2(c *C) { c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/testIndexHashJoinOuterWorkerErr"), IsNil) }() - err := tk.QueryToErr("select /*+ inl_hash_join(t1) */ * from t1 right join t1 t2 on t1.b=t2.b;") + rs, err := tk.Exec("select /*+ inl_hash_join(t1) */ * from t1 right join t1 t2 on t1.b=t2.b;") + c.Assert(err, IsNil) + _, err = session.GetRows4Test(context.Background(), nil, rs) c.Assert(strings.Contains(err.Error(), "mockIndexHashJoinOuterWorkerErr"), IsTrue) } + +func (s *testSuite9) TestIssue18572_3(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("drop table if exists t1") + tk.MustExec("create table t1(a int, b int, index idx(b));") + tk.MustExec("insert into t1 values(1, 1);") + tk.MustExec("insert into t1 select * from t1;") + + c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/testIndexHashJoinBuildErr", "return"), IsNil) + defer func() { + c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/testIndexHashJoinBuildErr"), IsNil) + }() + + rs, err := tk.Exec("select /*+ inl_hash_join(t1) */ * from t1 right join t1 t2 on t1.b=t2.b;") + c.Assert(err, IsNil) + _, err = session.GetRows4Test(context.Background(), nil, rs) + c.Assert(strings.Contains(err.Error(), "mockIndexHashJoinBuildErr"), IsTrue) +} From 53841b191150f19a8121c96ae505692d7ff6c606 Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Wed, 15 Jul 2020 14:28:57 +0800 Subject: [PATCH 3/4] make check --- executor/index_lookup_merge_join_test.go | 58 ------------------------ executor/join_test.go | 54 ++++++++++++++++++++++ 2 files changed, 54 insertions(+), 58 deletions(-) diff --git a/executor/index_lookup_merge_join_test.go b/executor/index_lookup_merge_join_test.go index 31783c6ead81e..32df311a27433 100644 --- a/executor/index_lookup_merge_join_test.go +++ b/executor/index_lookup_merge_join_test.go @@ -1,12 +1,8 @@ package executor_test import ( - "strings" - - "context" . "github.com/pingcap/check" "github.com/pingcap/failpoint" - "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/util/testkit" ) @@ -50,57 +46,3 @@ func (s *testSuite9) TestIssue18068(c *C) { tk.MustExec("select /*+ inl_merge_join(s)*/ 1 from t join s on t.a = s.a limit 1") tk.MustExec("select /*+ inl_merge_join(s)*/ 1 from t join s on t.a = s.a limit 1") } - -func (s *testSuite9) TestIssue18572_1(c *C) { - tk := testkit.NewTestKitWithInit(c, s.store) - tk.MustExec("drop table if exists t1") - tk.MustExec("create table t1(a int, b int, index idx(b));") - tk.MustExec("insert into t1 values(1, 1);") - tk.MustExec("insert into t1 select * from t1;") - - c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/testIndexHashJoinInnerWorkerErr", "return"), IsNil) - defer func() { - c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/testIndexHashJoinInnerWorkerErr"), IsNil) - }() - - rs, err := tk.Exec("select /*+ inl_hash_join(t1) */ * from t1 right join t1 t2 on t1.b=t2.b;") - c.Assert(err, IsNil) - _, err = session.GetRows4Test(context.Background(), nil, rs) - c.Assert(strings.Contains(err.Error(), "mockIndexHashJoinInnerWorkerErr"), IsTrue) -} - -func (s *testSuite9) TestIssue18572_2(c *C) { - tk := testkit.NewTestKitWithInit(c, s.store) - tk.MustExec("drop table if exists t1") - tk.MustExec("create table t1(a int, b int, index idx(b));") - tk.MustExec("insert into t1 values(1, 1);") - tk.MustExec("insert into t1 select * from t1;") - - c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/testIndexHashJoinOuterWorkerErr", "return"), IsNil) - defer func() { - c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/testIndexHashJoinOuterWorkerErr"), IsNil) - }() - - rs, err := tk.Exec("select /*+ inl_hash_join(t1) */ * from t1 right join t1 t2 on t1.b=t2.b;") - c.Assert(err, IsNil) - _, err = session.GetRows4Test(context.Background(), nil, rs) - c.Assert(strings.Contains(err.Error(), "mockIndexHashJoinOuterWorkerErr"), IsTrue) -} - -func (s *testSuite9) TestIssue18572_3(c *C) { - tk := testkit.NewTestKitWithInit(c, s.store) - tk.MustExec("drop table if exists t1") - tk.MustExec("create table t1(a int, b int, index idx(b));") - tk.MustExec("insert into t1 values(1, 1);") - tk.MustExec("insert into t1 select * from t1;") - - c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/testIndexHashJoinBuildErr", "return"), IsNil) - defer func() { - c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/testIndexHashJoinBuildErr"), IsNil) - }() - - rs, err := tk.Exec("select /*+ inl_hash_join(t1) */ * from t1 right join t1 t2 on t1.b=t2.b;") - c.Assert(err, IsNil) - _, err = session.GetRows4Test(context.Background(), nil, rs) - c.Assert(strings.Contains(err.Error(), "mockIndexHashJoinBuildErr"), IsTrue) -} diff --git a/executor/join_test.go b/executor/join_test.go index 86577c42dc9e3..d2d2af884077b 100644 --- a/executor/join_test.go +++ b/executor/join_test.go @@ -2090,3 +2090,57 @@ func (s *testSuiteJoinSerial) TestIssue18070(c *C) { err = tk.QueryToErr("select /*+ inl_merge_join(t1)*/ * from t1 join t2 on t1.a = t2.a;") c.Assert(strings.Contains(err.Error(), "Out Of Memory Quota!"), IsTrue) } + +func (s *testSuite9) TestIssue18572_1(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("drop table if exists t1") + tk.MustExec("create table t1(a int, b int, index idx(b));") + tk.MustExec("insert into t1 values(1, 1);") + tk.MustExec("insert into t1 select * from t1;") + + c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/testIndexHashJoinInnerWorkerErr", "return"), IsNil) + defer func() { + c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/testIndexHashJoinInnerWorkerErr"), IsNil) + }() + + rs, err := tk.Exec("select /*+ inl_hash_join(t1) */ * from t1 right join t1 t2 on t1.b=t2.b;") + c.Assert(err, IsNil) + _, err = session.GetRows4Test(context.Background(), nil, rs) + c.Assert(strings.Contains(err.Error(), "mockIndexHashJoinInnerWorkerErr"), IsTrue) +} + +func (s *testSuite9) TestIssue18572_2(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("drop table if exists t1") + tk.MustExec("create table t1(a int, b int, index idx(b));") + tk.MustExec("insert into t1 values(1, 1);") + tk.MustExec("insert into t1 select * from t1;") + + c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/testIndexHashJoinOuterWorkerErr", "return"), IsNil) + defer func() { + c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/testIndexHashJoinOuterWorkerErr"), IsNil) + }() + + rs, err := tk.Exec("select /*+ inl_hash_join(t1) */ * from t1 right join t1 t2 on t1.b=t2.b;") + c.Assert(err, IsNil) + _, err = session.GetRows4Test(context.Background(), nil, rs) + c.Assert(strings.Contains(err.Error(), "mockIndexHashJoinOuterWorkerErr"), IsTrue) +} + +func (s *testSuite9) TestIssue18572_3(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("drop table if exists t1") + tk.MustExec("create table t1(a int, b int, index idx(b));") + tk.MustExec("insert into t1 values(1, 1);") + tk.MustExec("insert into t1 select * from t1;") + + c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/testIndexHashJoinBuildErr", "return"), IsNil) + defer func() { + c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/testIndexHashJoinBuildErr"), IsNil) + }() + + rs, err := tk.Exec("select /*+ inl_hash_join(t1) */ * from t1 right join t1 t2 on t1.b=t2.b;") + c.Assert(err, IsNil) + _, err = session.GetRows4Test(context.Background(), nil, rs) + c.Assert(strings.Contains(err.Error(), "mockIndexHashJoinBuildErr"), IsTrue) +} From 71d334431166204f6f7da06750a4b8d7ec522bca Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Wed, 15 Jul 2020 16:01:34 +0800 Subject: [PATCH 4/4] address comment --- executor/index_lookup_hash_join.go | 4 ++-- executor/index_lookup_merge_join.go | 2 +- executor/index_merge_reader.go | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/executor/index_lookup_hash_join.go b/executor/index_lookup_hash_join.go index 4a6f066942efd..f1f5799129ca8 100644 --- a/executor/index_lookup_hash_join.go +++ b/executor/index_lookup_hash_join.go @@ -229,7 +229,7 @@ func (e *IndexNestedLoopHashJoin) Next(ctx context.Context, req *chunk.Chunk) er return result.err } case <-ctx.Done(): - return nil + return ctx.Err() } req.SwapColumns(result.chk) result.src <- result.chk @@ -255,7 +255,7 @@ func (e *IndexNestedLoopHashJoin) runInOrder(ctx context.Context, req *chunk.Chu return result.err } case <-ctx.Done(): - return nil + return ctx.Err() } req.SwapColumns(result.chk) result.src <- result.chk diff --git a/executor/index_lookup_merge_join.go b/executor/index_lookup_merge_join.go index 8d25b7803e2ca..ab9183aa84b83 100644 --- a/executor/index_lookup_merge_join.go +++ b/executor/index_lookup_merge_join.go @@ -281,7 +281,7 @@ func (e *IndexLookUpMergeJoin) Next(ctx context.Context, req *chunk.Chunk) error result.src <- result.chk return nil case <-ctx.Done(): - return nil + return ctx.Err() } } diff --git a/executor/index_merge_reader.go b/executor/index_merge_reader.go index af6ec3729d556..b3e2c6529f7ed 100644 --- a/executor/index_merge_reader.go +++ b/executor/index_merge_reader.go @@ -336,7 +336,7 @@ func (w *partialTableWorker) fetchHandles(ctx context.Context, exitCh <-chan str task := w.buildTableTask(handles, retChunk) select { case <-ctx.Done(): - return count, nil + return count, ctx.Err() case <-exitCh: return count, nil case <-finished: @@ -581,7 +581,7 @@ func (w *partialIndexWorker) fetchHandles(ctx context.Context, result distsql.Se task := w.buildTableTask(handles, retChunk) select { case <-ctx.Done(): - return count, nil + return count, ctx.Err() case <-exitCh: return count, nil case <-finished: