Skip to content

Commit

Permalink
executor: make sure buildHashTable worker exits before IndexHashJoin …
Browse files Browse the repository at this point in the history
…finish (#31334) (#31420)

close #31062
  • Loading branch information
ti-srebot authored Jan 7, 2022
1 parent 2850c4e commit 9fb7634
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 2 deletions.
1 change: 1 addition & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9529,6 +9529,7 @@ func (s *testSuiteP1) TestIssue29412(c *C) {
}

func (s *testSerialSuite) TestIssue28650(c *C) {
defer testleak.AfterTest(c)()
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t1, t2;")
Expand Down
15 changes: 13 additions & 2 deletions executor/index_lookup_hash_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,9 @@ func (iw *indexHashJoinInnerWorker) buildHashTableForOuterResult(ctx context.Con
for chkIdx := 0; chkIdx < numChks; chkIdx++ {
chk := task.outerResult.GetChunk(chkIdx)
numRows := chk.NumRows()
if iw.lookup.finished.Load().(bool) {
return
}
OUTER:
for rowIdx := 0; rowIdx < numRows; rowIdx++ {
if task.outerMatch != nil && !task.outerMatch[chkIdx][rowIdx] {
Expand Down Expand Up @@ -597,10 +600,13 @@ func (iw *indexHashJoinInnerWorker) fetchInnerResults(ctx context.Context, task
}

func (iw *indexHashJoinInnerWorker) handleHashJoinInnerWorkerPanic(r interface{}) {
defer func() {
iw.wg.Done()
iw.lookup.workerWg.Done()
}()
if r != nil {
iw.resultCh <- &indexHashJoinResult{err: errors.Errorf("%v", r)}
}
iw.wg.Done()
}

func (iw *indexHashJoinInnerWorker) handleTask(ctx context.Context, task *indexHashJoinTask, joinResult *indexHashJoinResult, h hash.Hash64, resultCh chan *indexHashJoinResult) error {
Expand All @@ -620,7 +626,12 @@ func (iw *indexHashJoinInnerWorker) handleTask(ctx context.Context, task *indexH
iw.wg = &sync.WaitGroup{}
iw.wg.Add(1)
// TODO(XuHuaiyu): we may always use the smaller side to build the hashtable.
go util.WithRecovery(func() { iw.buildHashTableForOuterResult(ctx, task, h) }, iw.handleHashJoinInnerWorkerPanic)
go util.WithRecovery(
func() {
iw.lookup.workerWg.Add(1)
iw.buildHashTableForOuterResult(ctx, task, h)
},
iw.handleHashJoinInnerWorkerPanic)
err := iw.fetchInnerResults(ctx, task.lookUpJoinTask)
iw.wg.Wait()
if err != nil {
Expand Down

0 comments on commit 9fb7634

Please sign in to comment.