Skip to content

Commit

Permalink
fix ci
Browse files Browse the repository at this point in the history
  • Loading branch information
XuHuaiyu committed Jul 15, 2020
1 parent a84b516 commit 2949841
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 13 deletions.
20 changes: 10 additions & 10 deletions executor/index_lookup_hash_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,8 @@ import (
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/ranger"
"go.uber.org/zap"
)

// numResChkHold indicates the number of resource chunks that an inner worker
Expand Down Expand Up @@ -456,7 +454,7 @@ func (iw *indexHashJoinInnerWorker) run(ctx context.Context, cancelFunc context.
if task.keepOuterOrder {
resultCh = task.resultCh
}
err := iw.handleTask(ctx, cancelFunc, task, joinResult, h, resultCh)
err := iw.handleTask(ctx, task, joinResult, h, resultCh)
if err != nil {
joinResult.err = err
break
Expand All @@ -473,7 +471,7 @@ func (iw *indexHashJoinInnerWorker) run(ctx context.Context, cancelFunc context.
}
}
failpoint.Inject("testIndexHashJoinInnerWorkerErr", func() {
joinResult = &indexHashJoinResult{err: errors.New("mockIndexHashJoinInnerWorkerErr")}
joinResult.err = errors.New("mockIndexHashJoinInnerWorkerErr")
})
if joinResult.err != nil {
resultCh <- joinResult
Expand Down Expand Up @@ -504,7 +502,7 @@ func (iw *indexHashJoinInnerWorker) getNewJoinResult(ctx context.Context) (*inde
return joinResult, ok
}

func (iw *indexHashJoinInnerWorker) buildHashTableForOuterResult(ctx context.Context, cancelFunc context.CancelFunc, task *indexHashJoinTask, h hash.Hash64) {
func (iw *indexHashJoinInnerWorker) buildHashTableForOuterResult(ctx context.Context, task *indexHashJoinTask, h hash.Hash64) {
buf, numChks := make([]byte, 1), task.outerResult.NumChunks()
task.lookupMap = newRowHashMap(task.outerResult.Len())
for chkIdx := 0; chkIdx < numChks; chkIdx++ {
Expand All @@ -524,10 +522,12 @@ func (iw *indexHashJoinInnerWorker) buildHashTableForOuterResult(ctx context.Con
}
h.Reset()
err := codec.HashChunkRow(iw.ctx.GetSessionVars().StmtCtx, h, row, iw.outerCtx.rowTypes, keyColIdx, buf)
failpoint.Inject("testIndexHashJoinBuildErr", func() {
err = errors.New("mockIndexHashJoinBuildErr")
})
if err != nil {
cancelFunc()
logutil.Logger(ctx).Error("indexHashJoinInnerWorker.buildHashTableForOuterResult failed", zap.Error(err))
return
// This panic will be recovered by the invoker.
panic(err.Error())
}
rowPtr := chunk.RowPtr{ChkIdx: uint32(chkIdx), RowIdx: uint32(rowIdx)}
task.lookupMap.Put(h.Sum64(), rowPtr)
Expand All @@ -551,11 +551,11 @@ func (iw *indexHashJoinInnerWorker) handleHashJoinInnerWorkerPanic(r interface{}
iw.wg.Done()
}

func (iw *indexHashJoinInnerWorker) handleTask(ctx context.Context, cancelFunc context.CancelFunc, task *indexHashJoinTask, joinResult *indexHashJoinResult, h hash.Hash64, resultCh chan *indexHashJoinResult) error {
func (iw *indexHashJoinInnerWorker) handleTask(ctx context.Context, task *indexHashJoinTask, joinResult *indexHashJoinResult, h hash.Hash64, resultCh chan *indexHashJoinResult) error {
iw.wg = &sync.WaitGroup{}
iw.wg.Add(1)
// TODO(XuHuaiyu): we may always use the smaller side to build the hashtable.
go util.WithRecovery(func() { iw.buildHashTableForOuterResult(ctx, cancelFunc, task, h) }, iw.handleHashJoinInnerWorkerPanic)
go util.WithRecovery(func() { iw.buildHashTableForOuterResult(ctx, task, h) }, iw.handleHashJoinInnerWorkerPanic)
err := iw.fetchInnerResults(ctx, task.lookUpJoinTask)
if err != nil {
return err
Expand Down
31 changes: 28 additions & 3 deletions executor/index_lookup_merge_join_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package executor_test

import (
"strings"

"context"
. "github.com/pingcap/check"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/util/testkit"
"strings"
)

func (s *testSuite9) TestIndexLookupMergeJoinHang(c *C) {
Expand Down Expand Up @@ -60,7 +63,9 @@ func (s *testSuite9) TestIssue18572_1(c *C) {
c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/testIndexHashJoinInnerWorkerErr"), IsNil)
}()

err := tk.QueryToErr("select /*+ inl_hash_join(t1) */ * from t1 right join t1 t2 on t1.b=t2.b;")
rs, err := tk.Exec("select /*+ inl_hash_join(t1) */ * from t1 right join t1 t2 on t1.b=t2.b;")
c.Assert(err, IsNil)
_, err = session.GetRows4Test(context.Background(), nil, rs)
c.Assert(strings.Contains(err.Error(), "mockIndexHashJoinInnerWorkerErr"), IsTrue)
}

Expand All @@ -76,6 +81,26 @@ func (s *testSuite9) TestIssue18572_2(c *C) {
c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/testIndexHashJoinOuterWorkerErr"), IsNil)
}()

err := tk.QueryToErr("select /*+ inl_hash_join(t1) */ * from t1 right join t1 t2 on t1.b=t2.b;")
rs, err := tk.Exec("select /*+ inl_hash_join(t1) */ * from t1 right join t1 t2 on t1.b=t2.b;")
c.Assert(err, IsNil)
_, err = session.GetRows4Test(context.Background(), nil, rs)
c.Assert(strings.Contains(err.Error(), "mockIndexHashJoinOuterWorkerErr"), IsTrue)
}

func (s *testSuite9) TestIssue18572_3(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1(a int, b int, index idx(b));")
tk.MustExec("insert into t1 values(1, 1);")
tk.MustExec("insert into t1 select * from t1;")

c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/testIndexHashJoinBuildErr", "return"), IsNil)
defer func() {
c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/testIndexHashJoinBuildErr"), IsNil)
}()

rs, err := tk.Exec("select /*+ inl_hash_join(t1) */ * from t1 right join t1 t2 on t1.b=t2.b;")
c.Assert(err, IsNil)
_, err = session.GetRows4Test(context.Background(), nil, rs)
c.Assert(strings.Contains(err.Error(), "mockIndexHashJoinBuildErr"), IsTrue)
}

0 comments on commit 2949841

Please sign in to comment.