diff --git a/executor/index_lookup_join.go b/executor/index_lookup_join.go index 270036ac91b12..f57079d2972b1 100644 --- a/executor/index_lookup_join.go +++ b/executor/index_lookup_join.go @@ -532,6 +532,10 @@ func (iw *innerWorker) buildLookUpMap(task *lookUpJoinTask) error { chk := task.innerResult.GetChunk(i) for j := 0; j < chk.NumRows(); j++ { innerRow := chk.GetRow(j) + if iw.hasNullInJoinKey(innerRow) { + continue + } + keyBuf = keyBuf[:0] for _, keyCol := range iw.keyCols { d := innerRow.GetDatum(keyCol, iw.rowTypes[keyCol]) @@ -549,6 +553,15 @@ func (iw *innerWorker) buildLookUpMap(task *lookUpJoinTask) error { return nil } +func (iw *innerWorker) hasNullInJoinKey(row chunk.Row) bool { + for _, ordinal := range iw.keyCols { + if row.IsNull(ordinal) { + return true + } + } + return false +} + // Close implements the Executor interface. func (e *IndexLookUpJoin) Close() error { if e.cancelFunc != nil { diff --git a/executor/join_test.go b/executor/join_test.go index 9a697e7633f33..0b36b2b9ad2ec 100644 --- a/executor/join_test.go +++ b/executor/join_test.go @@ -845,6 +845,14 @@ func (s *testSuite) TestIndexLookupJoin(c *C) { tk.MustExec("CREATE TABLE t(a BIGINT PRIMARY KEY, b BIGINT);") tk.MustExec("INSERT INTO t VALUES(1, 2);") tk.MustQuery("SELECT /*+ TIDB_INLJ(t1, t2) */ * FROM t t1 JOIN t t2 ON t1.a=t2.a UNION ALL SELECT /*+ TIDB_INLJ(t1, t2) */ * FROM t t1 JOIN t t2 ON t1.a=t2.a;").Check(testkit.Rows("1 2 1 2", "1 2 1 2")) + + tk.MustExec(`drop table if exists t;`) + tk.MustExec(`create table t(a decimal(6,2), index idx(a));`) + tk.MustExec(`insert into t values(1.01), (2.02), (NULL);`) + tk.MustQuery(`select /*+ TIDB_INLJ(t1) */ t1.a from t t1 join t t2 on t1.a=t2.a order by t1.a;`).Check(testkit.Rows( + `1.01`, + `2.02`, + )) } func (s *testSuite) TestMergejoinOrder(c *C) { @@ -882,4 +890,12 @@ func (s *testSuite) TestMergejoinOrder(c *C) { `2 1 2 1`, `2 2 2 2`, )) + + tk.MustExec(`drop table if exists t;`) + tk.MustExec(`create table t(a decimal(6,2), index idx(a));`) + tk.MustExec(`insert into t values(1.01), (2.02), (NULL);`) + tk.MustQuery(`select /*+ TIDB_SMJ(t1) */ t1.a from t t1 join t t2 on t1.a=t2.a order by t1.a;`).Check(testkit.Rows( + `1.01`, + `2.02`, + )) } diff --git a/executor/merge_join.go b/executor/merge_join.go index b869ce2449b1f..eaf39429f2dca 100644 --- a/executor/merge_join.go +++ b/executor/merge_join.go @@ -129,23 +129,39 @@ func (t *mergeJoinInnerTable) rowsWithSameKey() ([]chunk.Row, error) { } func (t *mergeJoinInnerTable) nextRow() (chunk.Row, error) { - if t.curRow == t.curIter.End() { - t.reallocReaderResult() - oldMemUsage := t.curResult.MemoryUsage() - err := t.reader.Next(t.ctx, t.curResult) - // error happens or no more data. - if err != nil || t.curResult.NumRows() == 0 { - t.curRow = t.curIter.End() - return t.curRow, errors.Trace(err) + for { + if t.curRow == t.curIter.End() { + t.reallocReaderResult() + oldMemUsage := t.curResult.MemoryUsage() + err := t.reader.Next(t.ctx, t.curResult) + // error happens or no more data. + if err != nil || t.curResult.NumRows() == 0 { + t.curRow = t.curIter.End() + return t.curRow, errors.Trace(err) + } + newMemUsage := t.curResult.MemoryUsage() + t.memTracker.Consume(newMemUsage - oldMemUsage) + t.curRow = t.curIter.Begin() + } + + result := t.curRow + t.curResultInUse = true + t.curRow = t.curIter.Next() + + if !t.hasNullInJoinKey(result) { + return result, nil + } + } +} + +func (t *mergeJoinInnerTable) hasNullInJoinKey(row chunk.Row) bool { + for _, col := range t.joinKeys { + ordinal := col.Index + if row.IsNull(ordinal) { + return true } - newMemUsage := t.curResult.MemoryUsage() - t.memTracker.Consume(newMemUsage - oldMemUsage) - t.curRow = t.curIter.Begin() } - result := t.curRow - t.curResultInUse = true - t.curRow = t.curIter.Next() - return result, nil + return false } // reallocReaderResult resets "t.curResult" to an empty Chunk to buffer the result of "t.reader".