Skip to content

Commit

Permalink
cherry pick pingcap#29375 to release-5.3
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
  • Loading branch information
wshwsh12 authored and ti-srebot committed Nov 5, 2021
1 parent 5fae7c5 commit 142a3d0
Show file tree
Hide file tree
Showing 9 changed files with 98 additions and 46 deletions.
58 changes: 37 additions & 21 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1205,15 +1205,6 @@ func (b *executorBuilder) buildHashJoin(v *plannercore.PhysicalHashJoin) Executo
}
}

// consider collations
leftTypes := make([]*types.FieldType, 0, len(retTypes(leftExec)))
for _, tp := range retTypes(leftExec) {
leftTypes = append(leftTypes, tp.Clone())
}
rightTypes := make([]*types.FieldType, 0, len(retTypes(rightExec)))
for _, tp := range retTypes(rightExec) {
rightTypes = append(rightTypes, tp.Clone())
}
leftIsBuildSide := true

e.isNullEQ = v.IsNullEQ
Expand Down Expand Up @@ -1256,24 +1247,32 @@ func (b *executorBuilder) buildHashJoin(v *plannercore.PhysicalHashJoin) Executo
}
executorCountHashJoinExec.Inc()

// We should use JoinKey to construct the type information using by hashing, instead of using the child's schema directly.
// When a hybrid type column is hashed multiple times, we need to distinguish what field types are used.
// For example, the condition `enum = int and enum = string`, we should use ETInt to hash the first column,
// and use ETString to hash the second column, although they may be the same column.
leftExecTypes, rightExecTypes := retTypes(leftExec), retTypes(rightExec)
leftTypes, rightTypes := make([]*types.FieldType, 0, len(v.LeftJoinKeys)), make([]*types.FieldType, 0, len(v.RightJoinKeys))
for i, col := range v.LeftJoinKeys {
leftTypes = append(leftTypes, leftExecTypes[col.Index].Clone())
leftTypes[i].Flag = col.RetType.Flag
}
for i, col := range v.RightJoinKeys {
rightTypes = append(rightTypes, rightExecTypes[col.Index].Clone())
rightTypes[i].Flag = col.RetType.Flag
}

// consider collations
for i := range v.EqualConditions {
chs, coll := v.EqualConditions[i].CharsetAndCollation(e.ctx)
bt := leftTypes[v.LeftJoinKeys[i].Index]
bt.Charset, bt.Collate = chs, coll
pt := rightTypes[v.RightJoinKeys[i].Index]
pt.Charset, pt.Collate = chs, coll
leftTypes[i].Charset, leftTypes[i].Collate = chs, coll
rightTypes[i].Charset, rightTypes[i].Collate = chs, coll
}
if leftIsBuildSide {
e.buildTypes, e.probeTypes = leftTypes, rightTypes
} else {
e.buildTypes, e.probeTypes = rightTypes, leftTypes
}
for _, key := range e.buildKeys {
e.buildTypes[key.Index].Flag = key.RetType.Flag
}
for _, key := range e.probeKeys {
e.probeTypes[key.Index].Flag = key.RetType.Flag
}
return e
}

Expand Down Expand Up @@ -2701,6 +2700,21 @@ func (b *executorBuilder) buildIndexLookUpJoin(v *plannercore.PhysicalIndexJoin)
outerTypes[col.Index].Flag = col.RetType.Flag
}

// We should use JoinKey to construct the type information using by hashing, instead of using the child's schema directly.
// When a hybrid type column is hashed multiple times, we need to distinguish what field types are used.
// For example, the condition `enum = int and enum = string`, we should use ETInt to hash the first column,
// and use ETString to hash the second column, although they may be the same column.
innerHashTypes := make([]*types.FieldType, len(v.InnerHashKeys))
outerHashTypes := make([]*types.FieldType, len(v.OuterHashKeys))
for i, col := range v.InnerHashKeys {
innerHashTypes[i] = innerTypes[col.Index].Clone()
innerHashTypes[i].Flag = col.RetType.Flag
}
for i, col := range v.OuterHashKeys {
outerHashTypes[i] = outerTypes[col.Index].Clone()
outerHashTypes[i].Flag = col.RetType.Flag
}

var (
outerFilter []expression.Expression
leftTypes, rightTypes []*types.FieldType
Expand Down Expand Up @@ -2735,12 +2749,14 @@ func (b *executorBuilder) buildIndexLookUpJoin(v *plannercore.PhysicalIndexJoin)
e := &IndexLookUpJoin{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID(), outerExec),
outerCtx: outerCtx{
rowTypes: outerTypes,
filter: outerFilter,
rowTypes: outerTypes,
hashTypes: outerHashTypes,
filter: outerFilter,
},
innerCtx: innerCtx{
readerBuilder: &dataReaderBuilder{Plan: innerPlan, executorBuilder: b},
rowTypes: innerTypes,
hashTypes: innerHashTypes,
colLens: v.IdxColLens,
hasPrefixCol: hasPrefixCol,
},
Expand Down
7 changes: 4 additions & 3 deletions executor/hash_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (

// hashContext keeps the needed hash context of a db table in hash join.
type hashContext struct {
// allTypes one-to-one correspondence with keyColIdx
allTypes []*types.FieldType
keyColIdx []int
buf []byte
Expand Down Expand Up @@ -84,9 +85,9 @@ type hashRowContainer struct {
rowContainer *chunk.RowContainer
}

func newHashRowContainer(sCtx sessionctx.Context, estCount int, hCtx *hashContext) *hashRowContainer {
func newHashRowContainer(sCtx sessionctx.Context, estCount int, hCtx *hashContext, allTypes []*types.FieldType) *hashRowContainer {
maxChunkSize := sCtx.GetSessionVars().MaxChunkSize
rc := chunk.NewRowContainer(hCtx.allTypes, maxChunkSize)
rc := chunk.NewRowContainer(allTypes, maxChunkSize)
c := &hashRowContainer{
sc: sCtx.GetSessionVars().StmtCtx,
hCtx: hCtx,
Expand Down Expand Up @@ -171,7 +172,7 @@ func (c *hashRowContainer) PutChunkSelected(chk *chunk.Chunk, selected, ignoreNu
hCtx := c.hCtx
for keyIdx, colIdx := range c.hCtx.keyColIdx {
ignoreNull := len(ignoreNulls) > keyIdx && ignoreNulls[keyIdx]
err := codec.HashChunkSelected(c.sc, hCtx.hashVals, chk, hCtx.allTypes[colIdx], colIdx, hCtx.buf, hCtx.hasNull, selected, ignoreNull)
err := codec.HashChunkSelected(c.sc, hCtx.hashVals, chk, hCtx.allTypes[keyIdx], colIdx, hCtx.buf, hCtx.hasNull, selected, ignoreNull)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion executor/hash_table_serial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func testHashRowContainer(t *testing.T, hashFunc func() hash.Hash64, spill bool)
for i := 0; i < numRows; i++ {
hCtx.hashVals = append(hCtx.hashVals, hashFunc())
}
rowContainer := newHashRowContainer(sctx, 0, hCtx)
rowContainer := newHashRowContainer(sctx, 0, hCtx, hCtx.allTypes)
copiedRC = rowContainer.ShallowCopy()
tracker := rowContainer.GetMemTracker()
tracker.SetLabel(memory.LabelForBuildSideResult)
Expand Down
6 changes: 3 additions & 3 deletions executor/index_lookup_hash_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ func (iw *indexHashJoinInnerWorker) buildHashTableForOuterResult(ctx context.Con
}
}
h.Reset()
err := codec.HashChunkRow(iw.ctx.GetSessionVars().StmtCtx, h, row, iw.outerCtx.rowTypes, hashColIdx, buf)
err := codec.HashChunkRow(iw.ctx.GetSessionVars().StmtCtx, h, row, iw.outerCtx.hashTypes, hashColIdx, buf)
failpoint.Inject("testIndexHashJoinBuildErr", func() {
err = errors.New("mockIndexHashJoinBuildErr")
})
Expand Down Expand Up @@ -645,7 +645,7 @@ func (iw *indexHashJoinInnerWorker) doJoinUnordered(ctx context.Context, task *i

func (iw *indexHashJoinInnerWorker) getMatchedOuterRows(innerRow chunk.Row, task *indexHashJoinTask, h hash.Hash64, buf []byte) (matchedRows []chunk.Row, matchedRowPtr []chunk.RowPtr, err error) {
h.Reset()
err = codec.HashChunkRow(iw.ctx.GetSessionVars().StmtCtx, h, innerRow, iw.rowTypes, iw.hashCols, buf)
err = codec.HashChunkRow(iw.ctx.GetSessionVars().StmtCtx, h, innerRow, iw.hashTypes, iw.hashCols, buf)
if err != nil {
return nil, nil, err
}
Expand All @@ -659,7 +659,7 @@ func (iw *indexHashJoinInnerWorker) getMatchedOuterRows(innerRow chunk.Row, task
matchedRowPtr = make([]chunk.RowPtr, 0, len(iw.matchedOuterPtrs))
for _, ptr := range iw.matchedOuterPtrs {
outerRow := task.outerResult.GetRow(ptr)
ok, err := codec.EqualChunkRow(iw.ctx.GetSessionVars().StmtCtx, innerRow, iw.rowTypes, iw.keyCols, outerRow, iw.outerCtx.rowTypes, iw.outerCtx.hashCols)
ok, err := codec.EqualChunkRow(iw.ctx.GetSessionVars().StmtCtx, innerRow, iw.hashTypes, iw.hashCols, outerRow, iw.outerCtx.hashTypes, iw.outerCtx.hashCols)
if err != nil {
return nil, nil, err
}
Expand Down
10 changes: 6 additions & 4 deletions executor/index_lookup_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,17 +86,19 @@ type IndexLookUpJoin struct {
}

type outerCtx struct {
rowTypes []*types.FieldType
keyCols []int
hashCols []int
filter expression.CNFExprs
rowTypes []*types.FieldType
keyCols []int
hashTypes []*types.FieldType
hashCols []int
filter expression.CNFExprs
}

type innerCtx struct {
readerBuilder *dataReaderBuilder
rowTypes []*types.FieldType
keyCols []int
keyColIDs []int64 // the original ID in its table, used by dynamic partition pruning
hashTypes []*types.FieldType
hashCols []int
colLens []int
hasPrefixCol bool
Expand Down
8 changes: 4 additions & 4 deletions executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@ func (e *HashJoinExec) join2Chunk(workerID uint, probeSideChk *chunk.Chunk, hCtx
hCtx.initHash(probeSideChk.NumRows())
for keyIdx, i := range hCtx.keyColIdx {
ignoreNull := len(e.isNullEQ) > keyIdx && e.isNullEQ[keyIdx]
err = codec.HashChunkSelected(rowContainer.sc, hCtx.hashVals, probeSideChk, hCtx.allTypes[i], i, hCtx.buf, hCtx.hasNull, selected, ignoreNull)
err = codec.HashChunkSelected(rowContainer.sc, hCtx.hashVals, probeSideChk, hCtx.allTypes[keyIdx], i, hCtx.buf, hCtx.hasNull, selected, ignoreNull)
if err != nil {
joinResult.err = err
return false, joinResult
Expand Down Expand Up @@ -607,8 +607,8 @@ func (e *HashJoinExec) join2Chunk(workerID uint, probeSideChk *chunk.Chunk, hCtx
// join2ChunkForOuterHashJoin joins chunks when using the outer to build a hash table (refer to outer hash join)
func (e *HashJoinExec) join2ChunkForOuterHashJoin(workerID uint, probeSideChk *chunk.Chunk, hCtx *hashContext, rowContainer *hashRowContainer, joinResult *hashjoinWorkerResult) (ok bool, _ *hashjoinWorkerResult) {
hCtx.initHash(probeSideChk.NumRows())
for _, i := range hCtx.keyColIdx {
err := codec.HashChunkColumns(rowContainer.sc, hCtx.hashVals, probeSideChk, hCtx.allTypes[i], i, hCtx.buf, hCtx.hasNull)
for keyIdx, i := range hCtx.keyColIdx {
err := codec.HashChunkColumns(rowContainer.sc, hCtx.hashVals, probeSideChk, hCtx.allTypes[keyIdx], i, hCtx.buf, hCtx.hasNull)
if err != nil {
joinResult.err = err
return false, joinResult
Expand Down Expand Up @@ -656,7 +656,7 @@ func (e *HashJoinExec) Next(ctx context.Context, req *chunk.Chunk) (err error) {
allTypes: e.buildTypes,
keyColIdx: buildKeyColIdx,
}
e.rowContainer = newHashRowContainer(e.ctx, int(e.buildSideEstCount), hCtx)
e.rowContainer = newHashRowContainer(e.ctx, int(e.buildSideEstCount), hCtx, retTypes(e.buildSideExec))
// we shallow copies rowContainer for each probe worker to avoid lock contention
e.rowContainerForProbe = make([]*hashRowContainer, e.concurrency)
for i := uint(0); i < e.concurrency; i++ {
Expand Down
30 changes: 30 additions & 0 deletions expression/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10419,6 +10419,36 @@ PARTITION p20210909 VALUES LESS THAN (1631203200)
tk.MustQuery("SELECT cast(floor(hour(ts) / 4) as char) as win_start FROM perf_offline_day partition (p20210907, p20210908) GROUP BY win_start;").Check(testkit.Rows("3"))
}

<<<<<<< HEAD
=======
func (s *testIntegrationSuite) TestIssue28643(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a time(4));")
tk.MustExec("insert into t values(\"-838:59:59.000000\");")
tk.MustExec("insert into t values(\"838:59:59.000000\");")
tk.MustExec("set tidb_enable_vectorized_expression = on;")
tk.MustQuery("select hour(a) from t;").Check(testkit.Rows("838", "838"))
tk.MustExec("set tidb_enable_vectorized_expression = off;")
tk.MustQuery("select hour(a) from t;").Check(testkit.Rows("838", "838"))
}

func (s *testIntegrationSuite) TestIssue27831(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a enum(\"a\", \"b\"), b enum(\"a\", \"b\"), c bool)")
tk.MustExec("insert into t values(\"a\", \"a\", 1);")
tk.MustQuery("select * from t t1 right join t t2 on t1.a=t2.b and t1.a= t2.c;").Check(testkit.Rows("a a 1 a a 1"))

tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a enum(\"a\", \"b\"), b enum(\"a\", \"b\"), c bool, d int, index idx(d))")
tk.MustExec("insert into t values(\"a\", \"a\", 1, 1);")
tk.MustQuery("select /*+ inl_hash_join(t1) */ * from t t1 right join t t2 on t1.a=t2.b and t1.a= t2.c and t1.d=t2.d;").Check(testkit.Rows("a a 1 1 a a 1 1"))
}

>>>>>>> fa8cbd588... executor: fix wrong result for join with enum type (#29375)
func (s *testIntegrationSuite) TestIssue29434(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down
11 changes: 7 additions & 4 deletions util/codec/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -665,8 +665,8 @@ func HashChunkSelected(sc *stmtctx.StatementContext, h []hash.Hash64, chk *chunk
// If two rows are logically equal, it will generate the same bytes.
func HashChunkRow(sc *stmtctx.StatementContext, w io.Writer, row chunk.Row, allTypes []*types.FieldType, colIdx []int, buf []byte) (err error) {
var b []byte
for _, idx := range colIdx {
buf[0], b, err = encodeHashChunkRowIdx(sc, row, allTypes[idx], idx)
for i, idx := range colIdx {
buf[0], b, err = encodeHashChunkRowIdx(sc, row, allTypes[i], idx)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -688,13 +688,16 @@ func EqualChunkRow(sc *stmtctx.StatementContext,
row1 chunk.Row, allTypes1 []*types.FieldType, colIdx1 []int,
row2 chunk.Row, allTypes2 []*types.FieldType, colIdx2 []int,
) (bool, error) {
if len(colIdx1) != len(colIdx2) {
return false, errors.Errorf("Internal error: Hash columns count mismatch, col1: %d, col2: %d", len(colIdx1), len(colIdx2))
}
for i := range colIdx1 {
idx1, idx2 := colIdx1[i], colIdx2[i]
flag1, b1, err := encodeHashChunkRowIdx(sc, row1, allTypes1[idx1], idx1)
flag1, b1, err := encodeHashChunkRowIdx(sc, row1, allTypes1[i], idx1)
if err != nil {
return false, errors.Trace(err)
}
flag2, b2, err := encodeHashChunkRowIdx(sc, row2, allTypes2[idx2], idx2)
flag2, b2, err := encodeHashChunkRowIdx(sc, row2, allTypes2[i], idx2)
if err != nil {
return false, errors.Trace(err)
}
Expand Down
12 changes: 6 additions & 6 deletions util/codec/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1280,9 +1280,9 @@ func TestHashChunkColumns(t *testing.T) {
for i := 0; i < 12; i++ {
require.True(t, chk.GetRow(0).IsNull(i))
err1 := HashChunkSelected(sc, vecHash, chk, tps[i], i, buf, hasNull, sel, false)
err2 := HashChunkRow(sc, rowHash[0], chk.GetRow(0), tps, colIdx[i:i+1], buf)
err3 := HashChunkRow(sc, rowHash[1], chk.GetRow(1), tps, colIdx[i:i+1], buf)
err4 := HashChunkRow(sc, rowHash[2], chk.GetRow(2), tps, colIdx[i:i+1], buf)
err2 := HashChunkRow(sc, rowHash[0], chk.GetRow(0), tps[i:i+1], colIdx[i:i+1], buf)
err3 := HashChunkRow(sc, rowHash[1], chk.GetRow(1), tps[i:i+1], colIdx[i:i+1], buf)
err4 := HashChunkRow(sc, rowHash[2], chk.GetRow(2), tps[i:i+1], colIdx[i:i+1], buf)
require.NoError(t, err1)
require.NoError(t, err2)
require.NoError(t, err3)
Expand All @@ -1305,9 +1305,9 @@ func TestHashChunkColumns(t *testing.T) {
require.False(t, chk.GetRow(0).IsNull(i))

err1 := HashChunkSelected(sc, vecHash, chk, tps[i], i, buf, hasNull, sel, false)
err2 := HashChunkRow(sc, rowHash[0], chk.GetRow(0), tps, colIdx[i:i+1], buf)
err3 := HashChunkRow(sc, rowHash[1], chk.GetRow(1), tps, colIdx[i:i+1], buf)
err4 := HashChunkRow(sc, rowHash[2], chk.GetRow(2), tps, colIdx[i:i+1], buf)
err2 := HashChunkRow(sc, rowHash[0], chk.GetRow(0), tps[i:i+1], colIdx[i:i+1], buf)
err3 := HashChunkRow(sc, rowHash[1], chk.GetRow(1), tps[i:i+1], colIdx[i:i+1], buf)
err4 := HashChunkRow(sc, rowHash[2], chk.GetRow(2), tps[i:i+1], colIdx[i:i+1], buf)

require.NoError(t, err1)
require.NoError(t, err2)
Expand Down

0 comments on commit 142a3d0

Please sign in to comment.