From 9fb7634acf88938ee26953f060707571b7ddcc16 Mon Sep 17 00:00:00 2001 From: ti-srebot <66930949+ti-srebot@users.noreply.github.com> Date: Fri, 7 Jan 2022 16:36:37 +0800 Subject: [PATCH] executor: make sure buildHashTable worker exits before IndexHashJoin finish (#31334) (#31420) close pingcap/tidb#31062 --- executor/executor_test.go | 1 + executor/index_lookup_hash_join.go | 15 +++++++++++++-- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/executor/executor_test.go b/executor/executor_test.go index a16109b148434..1352f0f469d3c 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -9529,6 +9529,7 @@ func (s *testSuiteP1) TestIssue29412(c *C) { } func (s *testSerialSuite) TestIssue28650(c *C) { + defer testleak.AfterTest(c)() tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") tk.MustExec("drop table if exists t1, t2;") diff --git a/executor/index_lookup_hash_join.go b/executor/index_lookup_hash_join.go index 0beb3e59e66b1..056e321b8a294 100644 --- a/executor/index_lookup_hash_join.go +++ b/executor/index_lookup_hash_join.go @@ -561,6 +561,9 @@ func (iw *indexHashJoinInnerWorker) buildHashTableForOuterResult(ctx context.Con for chkIdx := 0; chkIdx < numChks; chkIdx++ { chk := task.outerResult.GetChunk(chkIdx) numRows := chk.NumRows() + if iw.lookup.finished.Load().(bool) { + return + } OUTER: for rowIdx := 0; rowIdx < numRows; rowIdx++ { if task.outerMatch != nil && !task.outerMatch[chkIdx][rowIdx] { @@ -597,10 +600,13 @@ func (iw *indexHashJoinInnerWorker) fetchInnerResults(ctx context.Context, task } func (iw *indexHashJoinInnerWorker) handleHashJoinInnerWorkerPanic(r interface{}) { + defer func() { + iw.wg.Done() + iw.lookup.workerWg.Done() + }() if r != nil { iw.resultCh <- &indexHashJoinResult{err: errors.Errorf("%v", r)} } - iw.wg.Done() } func (iw *indexHashJoinInnerWorker) handleTask(ctx context.Context, task *indexHashJoinTask, joinResult *indexHashJoinResult, h hash.Hash64, resultCh chan *indexHashJoinResult) error { @@ -620,7 +626,12 @@ func (iw *indexHashJoinInnerWorker) handleTask(ctx context.Context, task *indexH iw.wg = &sync.WaitGroup{} iw.wg.Add(1) // TODO(XuHuaiyu): we may always use the smaller side to build the hashtable. - go util.WithRecovery(func() { iw.buildHashTableForOuterResult(ctx, task, h) }, iw.handleHashJoinInnerWorkerPanic) + go util.WithRecovery( + func() { + iw.lookup.workerWg.Add(1) + iw.buildHashTableForOuterResult(ctx, task, h) + }, + iw.handleHashJoinInnerWorkerPanic) err := iw.fetchInnerResults(ctx, task.lookUpJoinTask) iw.wg.Wait() if err != nil {