Skip to content

Commit

Permalink
executor: make sure buildHashTable worker exits before IndexHashJoin …
Browse files Browse the repository at this point in the history
…finish (#31334) (#31419)

close #31062
  • Loading branch information
ti-srebot authored Feb 22, 2022
1 parent 912bfea commit 95ad27d
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 2 deletions.
2 changes: 2 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ import (
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tidb/util/rowcodec"
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testleak"
"github.com/pingcap/tidb/util/testutil"
"github.com/pingcap/tidb/util/timeutil"
"github.com/pingcap/tipb/go-tipb"
Expand Down Expand Up @@ -9367,6 +9368,7 @@ func (s *testSerialSuite) TestIndexJoin31494(c *C) {
}

func (s *testSerialSuite) TestIssue28650(c *C) {
defer testleak.AfterTest(c)()
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t1, t2;")
Expand Down
15 changes: 13 additions & 2 deletions executor/index_lookup_hash_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,9 @@ func (iw *indexHashJoinInnerWorker) buildHashTableForOuterResult(ctx context.Con
for chkIdx := 0; chkIdx < numChks; chkIdx++ {
chk := task.outerResult.GetChunk(chkIdx)
numRows := chk.NumRows()
if iw.lookup.finished.Load().(bool) {
return
}
OUTER:
for rowIdx := 0; rowIdx < numRows; rowIdx++ {
if task.outerMatch != nil && !task.outerMatch[chkIdx][rowIdx] {
Expand Down Expand Up @@ -592,10 +595,13 @@ func (iw *indexHashJoinInnerWorker) fetchInnerResults(ctx context.Context, task
}

func (iw *indexHashJoinInnerWorker) handleHashJoinInnerWorkerPanic(r interface{}) {
defer func() {
iw.wg.Done()
iw.lookup.workerWg.Done()
}()
if r != nil {
iw.resultCh <- &indexHashJoinResult{err: errors.Errorf("%v", r)}
}
iw.wg.Done()
}

func (iw *indexHashJoinInnerWorker) handleTask(ctx context.Context, task *indexHashJoinTask, joinResult *indexHashJoinResult, h hash.Hash64, resultCh chan *indexHashJoinResult) error {
Expand All @@ -615,7 +621,12 @@ func (iw *indexHashJoinInnerWorker) handleTask(ctx context.Context, task *indexH
iw.wg = &sync.WaitGroup{}
iw.wg.Add(1)
// TODO(XuHuaiyu): we may always use the smaller side to build the hashtable.
go util.WithRecovery(func() { iw.buildHashTableForOuterResult(ctx, task, h) }, iw.handleHashJoinInnerWorkerPanic)
go util.WithRecovery(
func() {
iw.lookup.workerWg.Add(1)
iw.buildHashTableForOuterResult(ctx, task, h)
},
iw.handleHashJoinInnerWorkerPanic)
err := iw.fetchInnerResults(ctx, task.lookUpJoinTask)
iw.wg.Wait()
if err != nil {
Expand Down

0 comments on commit 95ad27d

Please sign in to comment.