diff --git a/executor/index_lookup_hash_join.go b/executor/index_lookup_hash_join.go index 5ffa7259cd1bc..4a6f066942efd 100644 --- a/executor/index_lookup_hash_join.go +++ b/executor/index_lookup_hash_join.go @@ -28,10 +28,8 @@ import ( "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/codec" "github.com/pingcap/tidb/util/execdetails" - "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/ranger" - "go.uber.org/zap" ) // numResChkHold indicates the number of resource chunks that an inner worker @@ -456,7 +454,7 @@ func (iw *indexHashJoinInnerWorker) run(ctx context.Context, cancelFunc context. if task.keepOuterOrder { resultCh = task.resultCh } - err := iw.handleTask(ctx, cancelFunc, task, joinResult, h, resultCh) + err := iw.handleTask(ctx, task, joinResult, h, resultCh) if err != nil { joinResult.err = err break @@ -473,7 +471,7 @@ func (iw *indexHashJoinInnerWorker) run(ctx context.Context, cancelFunc context. } } failpoint.Inject("testIndexHashJoinInnerWorkerErr", func() { - joinResult = &indexHashJoinResult{err: errors.New("mockIndexHashJoinInnerWorkerErr")} + joinResult.err = errors.New("mockIndexHashJoinInnerWorkerErr") }) if joinResult.err != nil { resultCh <- joinResult @@ -504,7 +502,7 @@ func (iw *indexHashJoinInnerWorker) getNewJoinResult(ctx context.Context) (*inde return joinResult, ok } -func (iw *indexHashJoinInnerWorker) buildHashTableForOuterResult(ctx context.Context, cancelFunc context.CancelFunc, task *indexHashJoinTask, h hash.Hash64) { +func (iw *indexHashJoinInnerWorker) buildHashTableForOuterResult(ctx context.Context, task *indexHashJoinTask, h hash.Hash64) { buf, numChks := make([]byte, 1), task.outerResult.NumChunks() task.lookupMap = newRowHashMap(task.outerResult.Len()) for chkIdx := 0; chkIdx < numChks; chkIdx++ { @@ -524,10 +522,12 @@ func (iw *indexHashJoinInnerWorker) buildHashTableForOuterResult(ctx context.Con } h.Reset() err := codec.HashChunkRow(iw.ctx.GetSessionVars().StmtCtx, h, row, iw.outerCtx.rowTypes, keyColIdx, buf) + failpoint.Inject("testIndexHashJoinBuildErr", func() { + err = errors.New("mockIndexHashJoinBuildErr") + }) if err != nil { - cancelFunc() - logutil.Logger(ctx).Error("indexHashJoinInnerWorker.buildHashTableForOuterResult failed", zap.Error(err)) - return + // This panic will be recovered by the invoker. + panic(err.Error()) } rowPtr := chunk.RowPtr{ChkIdx: uint32(chkIdx), RowIdx: uint32(rowIdx)} task.lookupMap.Put(h.Sum64(), rowPtr) @@ -551,11 +551,11 @@ func (iw *indexHashJoinInnerWorker) handleHashJoinInnerWorkerPanic(r interface{} iw.wg.Done() } -func (iw *indexHashJoinInnerWorker) handleTask(ctx context.Context, cancelFunc context.CancelFunc, task *indexHashJoinTask, joinResult *indexHashJoinResult, h hash.Hash64, resultCh chan *indexHashJoinResult) error { +func (iw *indexHashJoinInnerWorker) handleTask(ctx context.Context, task *indexHashJoinTask, joinResult *indexHashJoinResult, h hash.Hash64, resultCh chan *indexHashJoinResult) error { iw.wg = &sync.WaitGroup{} iw.wg.Add(1) // TODO(XuHuaiyu): we may always use the smaller side to build the hashtable. - go util.WithRecovery(func() { iw.buildHashTableForOuterResult(ctx, cancelFunc, task, h) }, iw.handleHashJoinInnerWorkerPanic) + go util.WithRecovery(func() { iw.buildHashTableForOuterResult(ctx, task, h) }, iw.handleHashJoinInnerWorkerPanic) err := iw.fetchInnerResults(ctx, task.lookUpJoinTask) if err != nil { return err diff --git a/executor/index_lookup_merge_join_test.go b/executor/index_lookup_merge_join_test.go index 0cc856e663a1c..31783c6ead81e 100644 --- a/executor/index_lookup_merge_join_test.go +++ b/executor/index_lookup_merge_join_test.go @@ -1,10 +1,13 @@ package executor_test import ( + "strings" + + "context" . "github.com/pingcap/check" "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/util/testkit" - "strings" ) func (s *testSuite9) TestIndexLookupMergeJoinHang(c *C) { @@ -60,7 +63,9 @@ func (s *testSuite9) TestIssue18572_1(c *C) { c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/testIndexHashJoinInnerWorkerErr"), IsNil) }() - err := tk.QueryToErr("select /*+ inl_hash_join(t1) */ * from t1 right join t1 t2 on t1.b=t2.b;") + rs, err := tk.Exec("select /*+ inl_hash_join(t1) */ * from t1 right join t1 t2 on t1.b=t2.b;") + c.Assert(err, IsNil) + _, err = session.GetRows4Test(context.Background(), nil, rs) c.Assert(strings.Contains(err.Error(), "mockIndexHashJoinInnerWorkerErr"), IsTrue) } @@ -76,6 +81,26 @@ func (s *testSuite9) TestIssue18572_2(c *C) { c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/testIndexHashJoinOuterWorkerErr"), IsNil) }() - err := tk.QueryToErr("select /*+ inl_hash_join(t1) */ * from t1 right join t1 t2 on t1.b=t2.b;") + rs, err := tk.Exec("select /*+ inl_hash_join(t1) */ * from t1 right join t1 t2 on t1.b=t2.b;") + c.Assert(err, IsNil) + _, err = session.GetRows4Test(context.Background(), nil, rs) c.Assert(strings.Contains(err.Error(), "mockIndexHashJoinOuterWorkerErr"), IsTrue) } + +func (s *testSuite9) TestIssue18572_3(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("drop table if exists t1") + tk.MustExec("create table t1(a int, b int, index idx(b));") + tk.MustExec("insert into t1 values(1, 1);") + tk.MustExec("insert into t1 select * from t1;") + + c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/testIndexHashJoinBuildErr", "return"), IsNil) + defer func() { + c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/testIndexHashJoinBuildErr"), IsNil) + }() + + rs, err := tk.Exec("select /*+ inl_hash_join(t1) */ * from t1 right join t1 t2 on t1.b=t2.b;") + c.Assert(err, IsNil) + _, err = session.GetRows4Test(context.Background(), nil, rs) + c.Assert(strings.Contains(err.Error(), "mockIndexHashJoinBuildErr"), IsTrue) +}