Skip to content

Commit

Permalink
executor: fix wrong result for join with enum type (#29375) (#29514)
Browse files Browse the repository at this point in the history
close #27831
  • Loading branch information
ti-srebot authored Jan 29, 2022
1 parent 2780dfb commit 1d6b5bb
Show file tree
Hide file tree
Showing 9 changed files with 82 additions and 46 deletions.
58 changes: 37 additions & 21 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1178,15 +1178,6 @@ func (b *executorBuilder) buildHashJoin(v *plannercore.PhysicalHashJoin) Executo
}
}

// consider collations
leftTypes := make([]*types.FieldType, 0, len(retTypes(leftExec)))
for _, tp := range retTypes(leftExec) {
leftTypes = append(leftTypes, tp.Clone())
}
rightTypes := make([]*types.FieldType, 0, len(retTypes(rightExec)))
for _, tp := range retTypes(rightExec) {
rightTypes = append(rightTypes, tp.Clone())
}
leftIsBuildSide := true

e.isNullEQ = v.IsNullEQ
Expand Down Expand Up @@ -1229,24 +1220,32 @@ func (b *executorBuilder) buildHashJoin(v *plannercore.PhysicalHashJoin) Executo
}
executorCountHashJoinExec.Inc()

// We should use JoinKey to construct the type information using by hashing, instead of using the child's schema directly.
// When a hybrid type column is hashed multiple times, we need to distinguish what field types are used.
// For example, the condition `enum = int and enum = string`, we should use ETInt to hash the first column,
// and use ETString to hash the second column, although they may be the same column.
leftExecTypes, rightExecTypes := retTypes(leftExec), retTypes(rightExec)
leftTypes, rightTypes := make([]*types.FieldType, 0, len(v.LeftJoinKeys)), make([]*types.FieldType, 0, len(v.RightJoinKeys))
for i, col := range v.LeftJoinKeys {
leftTypes = append(leftTypes, leftExecTypes[col.Index].Clone())
leftTypes[i].Flag = col.RetType.Flag
}
for i, col := range v.RightJoinKeys {
rightTypes = append(rightTypes, rightExecTypes[col.Index].Clone())
rightTypes[i].Flag = col.RetType.Flag
}

// consider collations
for i := range v.EqualConditions {
chs, coll := v.EqualConditions[i].CharsetAndCollation(e.ctx)
bt := leftTypes[v.LeftJoinKeys[i].Index]
bt.Charset, bt.Collate = chs, coll
pt := rightTypes[v.RightJoinKeys[i].Index]
pt.Charset, pt.Collate = chs, coll
leftTypes[i].Charset, leftTypes[i].Collate = chs, coll
rightTypes[i].Charset, rightTypes[i].Collate = chs, coll
}
if leftIsBuildSide {
e.buildTypes, e.probeTypes = leftTypes, rightTypes
} else {
e.buildTypes, e.probeTypes = rightTypes, leftTypes
}
for _, key := range e.buildKeys {
e.buildTypes[key.Index].Flag = key.RetType.Flag
}
for _, key := range e.probeKeys {
e.probeTypes[key.Index].Flag = key.RetType.Flag
}
return e
}

Expand Down Expand Up @@ -2478,6 +2477,21 @@ func (b *executorBuilder) buildIndexLookUpJoin(v *plannercore.PhysicalIndexJoin)
outerTypes[col.Index].Flag = col.RetType.Flag
}

// We should use JoinKey to construct the type information using by hashing, instead of using the child's schema directly.
// When a hybrid type column is hashed multiple times, we need to distinguish what field types are used.
// For example, the condition `enum = int and enum = string`, we should use ETInt to hash the first column,
// and use ETString to hash the second column, although they may be the same column.
innerHashTypes := make([]*types.FieldType, len(v.InnerHashKeys))
outerHashTypes := make([]*types.FieldType, len(v.OuterHashKeys))
for i, col := range v.InnerHashKeys {
innerHashTypes[i] = innerTypes[col.Index].Clone()
innerHashTypes[i].Flag = col.RetType.Flag
}
for i, col := range v.OuterHashKeys {
outerHashTypes[i] = outerTypes[col.Index].Clone()
outerHashTypes[i].Flag = col.RetType.Flag
}

var (
outerFilter []expression.Expression
leftTypes, rightTypes []*types.FieldType
Expand Down Expand Up @@ -2512,12 +2526,14 @@ func (b *executorBuilder) buildIndexLookUpJoin(v *plannercore.PhysicalIndexJoin)
e := &IndexLookUpJoin{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID(), outerExec),
outerCtx: outerCtx{
rowTypes: outerTypes,
filter: outerFilter,
rowTypes: outerTypes,
hashTypes: outerHashTypes,
filter: outerFilter,
},
innerCtx: innerCtx{
readerBuilder: &dataReaderBuilder{Plan: innerPlan, executorBuilder: b},
rowTypes: innerTypes,
hashTypes: innerHashTypes,
colLens: v.IdxColLens,
hasPrefixCol: hasPrefixCol,
},
Expand Down
7 changes: 4 additions & 3 deletions executor/hash_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (

// hashContext keeps the needed hash context of a db table in hash join.
type hashContext struct {
// allTypes one-to-one correspondence with keyColIdx
allTypes []*types.FieldType
keyColIdx []int
buf []byte
Expand Down Expand Up @@ -80,9 +81,9 @@ type hashRowContainer struct {
rowContainer *chunk.RowContainer
}

func newHashRowContainer(sCtx sessionctx.Context, estCount int, hCtx *hashContext) *hashRowContainer {
func newHashRowContainer(sCtx sessionctx.Context, estCount int, hCtx *hashContext, allTypes []*types.FieldType) *hashRowContainer {
maxChunkSize := sCtx.GetSessionVars().MaxChunkSize
rc := chunk.NewRowContainer(hCtx.allTypes, maxChunkSize)
rc := chunk.NewRowContainer(allTypes, maxChunkSize)
c := &hashRowContainer{
sc: sCtx.GetSessionVars().StmtCtx,
hCtx: hCtx,
Expand Down Expand Up @@ -160,7 +161,7 @@ func (c *hashRowContainer) PutChunkSelected(chk *chunk.Chunk, selected, ignoreNu
hCtx := c.hCtx
for keyIdx, colIdx := range c.hCtx.keyColIdx {
ignoreNull := len(ignoreNulls) > keyIdx && ignoreNulls[keyIdx]
err := codec.HashChunkSelected(c.sc, hCtx.hashVals, chk, hCtx.allTypes[colIdx], colIdx, hCtx.buf, hCtx.hasNull, selected, ignoreNull)
err := codec.HashChunkSelected(c.sc, hCtx.hashVals, chk, hCtx.allTypes[keyIdx], colIdx, hCtx.buf, hCtx.hasNull, selected, ignoreNull)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion executor/hash_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (s *pkgTestSerialSuite) testHashRowContainer(c *C, hashFunc func() hash.Has
for i := 0; i < numRows; i++ {
hCtx.hashVals = append(hCtx.hashVals, hashFunc())
}
rowContainer := newHashRowContainer(sctx, 0, hCtx)
rowContainer := newHashRowContainer(sctx, 0, hCtx, hCtx.allTypes)
tracker := rowContainer.GetMemTracker()
tracker.SetLabel(memory.LabelForBuildSideResult)
if spill {
Expand Down
6 changes: 3 additions & 3 deletions executor/index_lookup_hash_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ func (iw *indexHashJoinInnerWorker) buildHashTableForOuterResult(ctx context.Con
}
}
h.Reset()
err := codec.HashChunkRow(iw.ctx.GetSessionVars().StmtCtx, h, row, iw.outerCtx.rowTypes, hashColIdx, buf)
err := codec.HashChunkRow(iw.ctx.GetSessionVars().StmtCtx, h, row, iw.outerCtx.hashTypes, hashColIdx, buf)
failpoint.Inject("testIndexHashJoinBuildErr", func() {
err = errors.New("mockIndexHashJoinBuildErr")
})
Expand Down Expand Up @@ -672,7 +672,7 @@ func (iw *indexHashJoinInnerWorker) doJoinUnordered(ctx context.Context, task *i

func (iw *indexHashJoinInnerWorker) getMatchedOuterRows(innerRow chunk.Row, task *indexHashJoinTask, h hash.Hash64, buf []byte) (matchedRows []chunk.Row, matchedRowPtr []chunk.RowPtr, err error) {
h.Reset()
err = codec.HashChunkRow(iw.ctx.GetSessionVars().StmtCtx, h, innerRow, iw.rowTypes, iw.hashCols, buf)
err = codec.HashChunkRow(iw.ctx.GetSessionVars().StmtCtx, h, innerRow, iw.hashTypes, iw.hashCols, buf)
if err != nil {
return nil, nil, err
}
Expand All @@ -686,7 +686,7 @@ func (iw *indexHashJoinInnerWorker) getMatchedOuterRows(innerRow chunk.Row, task
matchedRowPtr = make([]chunk.RowPtr, 0, len(iw.matchedOuterPtrs))
for _, ptr := range iw.matchedOuterPtrs {
outerRow := task.outerResult.GetRow(ptr)
ok, err := codec.EqualChunkRow(iw.ctx.GetSessionVars().StmtCtx, innerRow, iw.rowTypes, iw.keyCols, outerRow, iw.outerCtx.rowTypes, iw.outerCtx.hashCols)
ok, err := codec.EqualChunkRow(iw.ctx.GetSessionVars().StmtCtx, innerRow, iw.hashTypes, iw.hashCols, outerRow, iw.outerCtx.hashTypes, iw.outerCtx.hashCols)
if err != nil {
return nil, nil, err
}
Expand Down
10 changes: 6 additions & 4 deletions executor/index_lookup_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,17 +88,19 @@ type IndexLookUpJoin struct {
}

type outerCtx struct {
rowTypes []*types.FieldType
keyCols []int
hashCols []int
filter expression.CNFExprs
rowTypes []*types.FieldType
keyCols []int
hashTypes []*types.FieldType
hashCols []int
filter expression.CNFExprs
}

type innerCtx struct {
readerBuilder *dataReaderBuilder
rowTypes []*types.FieldType
keyCols []int
keyColIDs []int64 // the original ID in its table, used by dynamic partition pruning
hashTypes []*types.FieldType
hashCols []int
colLens []int
hasPrefixCol bool
Expand Down
8 changes: 4 additions & 4 deletions executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ func (e *HashJoinExec) join2Chunk(workerID uint, probeSideChk *chunk.Chunk, hCtx
hCtx.initHash(probeSideChk.NumRows())
for keyIdx, i := range hCtx.keyColIdx {
ignoreNull := len(e.isNullEQ) > keyIdx && e.isNullEQ[keyIdx]
err = codec.HashChunkSelected(e.rowContainer.sc, hCtx.hashVals, probeSideChk, hCtx.allTypes[i], i, hCtx.buf, hCtx.hasNull, selected, ignoreNull)
err = codec.HashChunkSelected(e.rowContainer.sc, hCtx.hashVals, probeSideChk, hCtx.allTypes[keyIdx], i, hCtx.buf, hCtx.hasNull, selected, ignoreNull)
if err != nil {
joinResult.err = err
return false, joinResult
Expand Down Expand Up @@ -612,8 +612,8 @@ func (e *HashJoinExec) join2Chunk(workerID uint, probeSideChk *chunk.Chunk, hCtx
// join2ChunkForOuterHashJoin joins chunks when using the outer to build a hash table (refer to outer hash join)
func (e *HashJoinExec) join2ChunkForOuterHashJoin(workerID uint, probeSideChk *chunk.Chunk, hCtx *hashContext, joinResult *hashjoinWorkerResult) (ok bool, _ *hashjoinWorkerResult) {
hCtx.initHash(probeSideChk.NumRows())
for _, i := range hCtx.keyColIdx {
err := codec.HashChunkColumns(e.rowContainer.sc, hCtx.hashVals, probeSideChk, hCtx.allTypes[i], i, hCtx.buf, hCtx.hasNull)
for keyIdx, i := range hCtx.keyColIdx {
err := codec.HashChunkColumns(e.rowContainer.sc, hCtx.hashVals, probeSideChk, hCtx.allTypes[keyIdx], i, hCtx.buf, hCtx.hasNull)
if err != nil {
joinResult.err = err
return false, joinResult
Expand Down Expand Up @@ -740,7 +740,7 @@ func (e *HashJoinExec) buildHashTableForList(buildSideResultCh <-chan *chunk.Chu
}
var err error
var selected []bool
e.rowContainer = newHashRowContainer(e.ctx, int(e.buildSideEstCount), hCtx)
e.rowContainer = newHashRowContainer(e.ctx, int(e.buildSideEstCount), hCtx, retTypes(e.buildSideExec))
e.rowContainer.GetMemTracker().AttachTo(e.memTracker)
e.rowContainer.GetMemTracker().SetLabel(memory.LabelForBuildSideResult)
e.rowContainer.GetDiskTracker().AttachTo(e.diskTracker)
Expand Down
14 changes: 14 additions & 0 deletions expression/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9904,3 +9904,17 @@ func (s *testIntegrationSuite) TestIssue28643(c *C) {
tk.MustExec("set tidb_enable_vectorized_expression = off;")
tk.MustQuery("select hour(a) from t;").Check(testkit.Rows("838", "838"))
}

func (s *testIntegrationSuite) TestIssue27831(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a enum(\"a\", \"b\"), b enum(\"a\", \"b\"), c bool)")
tk.MustExec("insert into t values(\"a\", \"a\", 1);")
tk.MustQuery("select * from t t1 right join t t2 on t1.a=t2.b and t1.a= t2.c;").Check(testkit.Rows("a a 1 a a 1"))

tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a enum(\"a\", \"b\"), b enum(\"a\", \"b\"), c bool, d int, index idx(d))")
tk.MustExec("insert into t values(\"a\", \"a\", 1, 1);")
tk.MustQuery("select /*+ inl_hash_join(t1) */ * from t t1 right join t t2 on t1.a=t2.b and t1.a= t2.c and t1.d=t2.d;").Check(testkit.Rows("a a 1 1 a a 1 1"))
}
11 changes: 7 additions & 4 deletions util/codec/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,8 +664,8 @@ func HashChunkSelected(sc *stmtctx.StatementContext, h []hash.Hash64, chk *chunk
// If two rows are logically equal, it will generate the same bytes.
func HashChunkRow(sc *stmtctx.StatementContext, w io.Writer, row chunk.Row, allTypes []*types.FieldType, colIdx []int, buf []byte) (err error) {
var b []byte
for _, idx := range colIdx {
buf[0], b, err = encodeHashChunkRowIdx(sc, row, allTypes[idx], idx)
for i, idx := range colIdx {
buf[0], b, err = encodeHashChunkRowIdx(sc, row, allTypes[i], idx)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -687,13 +687,16 @@ func EqualChunkRow(sc *stmtctx.StatementContext,
row1 chunk.Row, allTypes1 []*types.FieldType, colIdx1 []int,
row2 chunk.Row, allTypes2 []*types.FieldType, colIdx2 []int,
) (bool, error) {
if len(colIdx1) != len(colIdx2) {
return false, errors.Errorf("Internal error: Hash columns count mismatch, col1: %d, col2: %d", len(colIdx1), len(colIdx2))
}
for i := range colIdx1 {
idx1, idx2 := colIdx1[i], colIdx2[i]
flag1, b1, err := encodeHashChunkRowIdx(sc, row1, allTypes1[idx1], idx1)
flag1, b1, err := encodeHashChunkRowIdx(sc, row1, allTypes1[i], idx1)
if err != nil {
return false, errors.Trace(err)
}
flag2, b2, err := encodeHashChunkRowIdx(sc, row2, allTypes2[idx2], idx2)
flag2, b2, err := encodeHashChunkRowIdx(sc, row2, allTypes2[i], idx2)
if err != nil {
return false, errors.Trace(err)
}
Expand Down
12 changes: 6 additions & 6 deletions util/codec/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1247,9 +1247,9 @@ func (s *testCodecSuite) TestHashChunkColumns(c *C) {
for i := 0; i < 12; i++ {
c.Assert(chk.GetRow(0).IsNull(i), Equals, true)
err1 := HashChunkSelected(sc, vecHash, chk, tps[i], i, buf, hasNull, sel, false)
err2 := HashChunkRow(sc, rowHash[0], chk.GetRow(0), tps, colIdx[i:i+1], buf)
err3 := HashChunkRow(sc, rowHash[1], chk.GetRow(1), tps, colIdx[i:i+1], buf)
err4 := HashChunkRow(sc, rowHash[2], chk.GetRow(2), tps, colIdx[i:i+1], buf)
err2 := HashChunkRow(sc, rowHash[0], chk.GetRow(0), tps[i:i+1], colIdx[i:i+1], buf)
err3 := HashChunkRow(sc, rowHash[1], chk.GetRow(1), tps[i:i+1], colIdx[i:i+1], buf)
err4 := HashChunkRow(sc, rowHash[2], chk.GetRow(2), tps[i:i+1], colIdx[i:i+1], buf)
c.Assert(err1, IsNil)
c.Assert(err2, IsNil)
c.Assert(err3, IsNil)
Expand All @@ -1272,9 +1272,9 @@ func (s *testCodecSuite) TestHashChunkColumns(c *C) {

c.Assert(chk.GetRow(0).IsNull(i), Equals, false)
err1 := HashChunkSelected(sc, vecHash, chk, tps[i], i, buf, hasNull, sel, false)
err2 := HashChunkRow(sc, rowHash[0], chk.GetRow(0), tps, colIdx[i:i+1], buf)
err3 := HashChunkRow(sc, rowHash[1], chk.GetRow(1), tps, colIdx[i:i+1], buf)
err4 := HashChunkRow(sc, rowHash[2], chk.GetRow(2), tps, colIdx[i:i+1], buf)
err2 := HashChunkRow(sc, rowHash[0], chk.GetRow(0), tps[i:i+1], colIdx[i:i+1], buf)
err3 := HashChunkRow(sc, rowHash[1], chk.GetRow(1), tps[i:i+1], colIdx[i:i+1], buf)
err4 := HashChunkRow(sc, rowHash[2], chk.GetRow(2), tps[i:i+1], colIdx[i:i+1], buf)
c.Assert(err1, IsNil)
c.Assert(err2, IsNil)
c.Assert(err3, IsNil)
Expand Down

0 comments on commit 1d6b5bb

Please sign in to comment.