diff --git a/executor/index_lookup_hash_join.go b/executor/index_lookup_hash_join.go index d9d12dde14c30..4fb2abf4509aa 100644 --- a/executor/index_lookup_hash_join.go +++ b/executor/index_lookup_hash_join.go @@ -337,6 +337,7 @@ func (ow *indexHashJoinOuterWorker) run(ctx context.Context) { defer trace.StartRegion(ctx, "IndexHashJoinOuterWorker").End() defer close(ow.innerCh) for { + failpoint.Inject("TestIssue30211", nil) task, err := ow.buildTask(ctx) failpoint.Inject("testIndexHashJoinOuterWorkerErr", func() { err = errors.New("mockIndexHashJoinOuterWorkerErr") diff --git a/executor/index_lookup_join.go b/executor/index_lookup_join.go index 306527e08e1eb..aa155b0d4c610 100644 --- a/executor/index_lookup_join.go +++ b/executor/index_lookup_join.go @@ -27,6 +27,7 @@ import ( "unsafe" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/parser/terror" @@ -362,6 +363,7 @@ func (ow *outerWorker) run(ctx context.Context, wg *sync.WaitGroup) { task := &lookUpJoinTask{doneCh: make(chan error, 1)} err := errors.Errorf("%v", r) task.doneCh <- err + ow.pushToChan(ctx, task, ow.resultCh) ow.lookup.ctxCancelReason.Store(err) ow.lookup.cancelFunc() } @@ -370,6 +372,7 @@ func (ow *outerWorker) run(ctx context.Context, wg *sync.WaitGroup) { wg.Done() }() for { + failpoint.Inject("TestIssue30211", nil) task, err := ow.buildTask(ctx) if err != nil { task.doneCh <- err diff --git a/executor/join_test.go b/executor/join_test.go index 35d5a914d6470..e55708bfebd8a 100644 --- a/executor/join_test.go +++ b/executor/join_test.go @@ -2610,3 +2610,39 @@ func (s *testSuiteJoinSerial) TestIssue25902(c *C) { tk.MustQuery("select * from tt1 where ts in (select ts from tt2);").Check(testkit.Rows()) tk.MustExec("set @@session.time_zone = @tmp;") } + +func (s *testSuiteJoinSerial) TestIssue30211(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t1, t2;") + tk.MustExec("create table t1(a int, index(a));") + tk.MustExec("create table t2(a int, index(a));") + func() { + fpName := "github.com/pingcap/tidb/executor/TestIssue30211" + c.Assert(failpoint.Enable(fpName, `panic("TestIssue30211 IndexJoinPanic")`), IsNil) + defer func() { + c.Assert(failpoint.Disable(fpName), IsNil) + }() + err := tk.QueryToErr("select /*+ inl_join(t1) */ * from t1 join t2 on t1.a = t2.a;").Error() + c.Assert(err, Matches, "failpoint panic: TestIssue30211 IndexJoinPanic") + + err = tk.QueryToErr("select /*+ inl_hash_join(t1) */ * from t1 join t2 on t1.a = t2.a;").Error() + c.Assert(err, Matches, "failpoint panic: TestIssue30211 IndexJoinPanic") + }() + tk.MustExec("insert into t1 values(1),(2);") + tk.MustExec("insert into t2 values(1),(1),(2),(2);") + tk.MustExec("set @@tidb_mem_quota_query=8000;") + tk.MustExec("set tidb_index_join_batch_size = 1;") + config.UpdateGlobal(func(conf *config.Config) { + conf.OOMAction = config.OOMActionCancel + }) + defer func() { + config.UpdateGlobal(func(conf *config.Config) { + conf.OOMAction = config.OOMActionLog + }) + }() + err := tk.QueryToErr("select /*+ inl_join(t1) */ * from t1 join t2 on t1.a = t2.a;").Error() + c.Assert(strings.Contains(err, "Out Of Memory Quota"), IsTrue) + err = tk.QueryToErr("select /*+ inl_hash_join(t1) */ * from t1 join t2 on t1.a = t2.a;").Error() + c.Assert(strings.Contains(err, "Out Of Memory Quota"), IsTrue) +}