Skip to content

Commit

Permalink
executor: some minor refine for hash join v2 build (#54429)
Browse files Browse the repository at this point in the history
ref #53127
  • Loading branch information
windtalker authored Jul 18, 2024
1 parent a48a4a2 commit e858f55
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 53 deletions.
2 changes: 1 addition & 1 deletion pkg/executor/join/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ go_test(
],
embed = [":join"],
flaky = True,
shard_count = 45,
shard_count = 46,
deps = [
"//pkg/config",
"//pkg/domain",
Expand Down
15 changes: 9 additions & 6 deletions pkg/executor/join/hash_join_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (htc *hashTableContext) reset() {
htc.memoryTracker.Detach()
}

func (htc *hashTableContext) getCurrentRowSegment(workerID, partitionID int, tableMeta *TableMeta, allowCreate bool) *rowTableSegment {
func (htc *hashTableContext) getCurrentRowSegment(workerID, partitionID int, tableMeta *TableMeta, allowCreate bool, firstSegSizeHint uint) *rowTableSegment {
if htc.rowTables[workerID][partitionID] == nil {
htc.rowTables[workerID][partitionID] = newRowTable(tableMeta)
}
Expand All @@ -80,18 +80,21 @@ func (htc *hashTableContext) getCurrentRowSegment(workerID, partitionID int, tab
if !allowCreate {
panic("logical error, should not reach here")
}
seg := newRowTableSegment()
// do not pre-allocate too many memory for the first seg because for query that only has a few rows, it may waste memory and may hurt the performance in high concurrency scenarios
rowSizeHint := maxRowTableSegmentSize
if segNum == 0 {
rowSizeHint = int(firstSegSizeHint)
}
seg := newRowTableSegment(uint(rowSizeHint))
htc.rowTables[workerID][partitionID].segments = append(htc.rowTables[workerID][partitionID].segments, seg)
segNum++
}
return htc.rowTables[workerID][partitionID].segments[segNum-1]
}

func (htc *hashTableContext) finalizeCurrentSeg(workerID, partitionID int, builder *rowTableBuilder) {
seg := htc.getCurrentRowSegment(workerID, partitionID, nil, false)
seg.rowStartOffset = append(seg.rowStartOffset, builder.startPosInRawData[partitionID]...)
builder.crrntSizeOfRowTable[partitionID] = 0
builder.startPosInRawData[partitionID] = builder.startPosInRawData[partitionID][:0]
seg := htc.getCurrentRowSegment(workerID, partitionID, nil, false, 0)
builder.rowNumberInCurrentRowTableSeg[partitionID] = 0
failpoint.Inject("finalizeCurrentSegPanic", nil)
seg.finalized = true
htc.memoryTracker.Consume(seg.totalUsedBytes())
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/join/hash_table_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ func createMockRowTable(maxRowsPerSeg int, segmentCount int, fixedSize bool) *ro
meta: nil,
}
for i := 0; i < segmentCount; i++ {
rowSeg := newRowTableSegment()
// no empty segment is allowed
rows := maxRowsPerSeg
if !fixedSize {
rows = int(rand.Int31n(int32(maxRowsPerSeg)) + 1)
}
rowSeg := newRowTableSegment(uint(rows))
rowSeg.rawData = make([]byte, rows)
for j := 0; j < rows; j++ {
rowSeg.rowStartOffset = append(rowSeg.rowStartOffset, uint64(j))
Expand Down
71 changes: 27 additions & 44 deletions pkg/executor/join/join_row_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type rowTableSegment struct {
hashValues []uint64 // the hash value of each rows
rowStartOffset []uint64 // the start address of each row
validJoinKeyPos []int // the pos of rows that need to be inserted into hash table, used in hash table build
finalized bool
finalized bool // after finalized is set to true, no further modification is allowed
}

func (rts *rowTableSegment) totalUsedBytes() int64 {
Expand All @@ -84,13 +84,15 @@ func (rts *rowTableSegment) getRowPointer(index int) unsafe.Pointer {

const maxRowTableSegmentSize = 1024

func newRowTableSegment() *rowTableSegment {
// 64 MB
const maxRowTableSegmentByteSize = 64 * 1024 * 1024

func newRowTableSegment(rowSizeHint uint) *rowTableSegment {
return &rowTableSegment{
// TODO: @XuHuaiyu if joinKeyIsInlined, the cap of rawData can be calculated
rawData: make([]byte, 0),
hashValues: make([]uint64, 0, maxRowTableSegmentSize),
rowStartOffset: make([]uint64, 0, maxRowTableSegmentSize),
validJoinKeyPos: make([]int, 0, maxRowTableSegmentSize),
hashValues: make([]uint64, 0, rowSizeHint),
rowStartOffset: make([]uint64, 0, rowSizeHint),
validJoinKeyPos: make([]int, 0, rowSizeHint),
}
}

Expand Down Expand Up @@ -487,47 +489,24 @@ type rowTableBuilder struct {
selRows []int
usedRows []int
hashValue []uint64
firstSegRowSizeHint uint
// filterVector and nullKeyVector is indexed by physical row index because the return vector of VectorizedFilter is based on physical row index
filterVector []bool // if there is filter before probe, filterVector saves the filter result
nullKeyVector []bool // nullKeyVector[i] = true if any of the key is null

crrntSizeOfRowTable []int64
// store the start position of each row in the rawData,
// we'll use this temp array to get the address of each row at the end
startPosInRawData [][]uint64
rowNumberInCurrentRowTableSeg []int64
}

func createRowTableBuilder(buildKeyIndex []int, buildKeyTypes []*types.FieldType, partitionNumber int, hasNullableKey bool, hasFilter bool, keepFilteredRows bool) *rowTableBuilder {
builder := &rowTableBuilder{
buildKeyIndex: buildKeyIndex,
buildKeyTypes: buildKeyTypes,
crrntSizeOfRowTable: make([]int64, partitionNumber),
startPosInRawData: make([][]uint64, partitionNumber),
hasNullableKey: hasNullableKey,
hasFilter: hasFilter,
keepFilteredRows: keepFilteredRows,
}
builder.initBuffer()
return builder
}

func (b *rowTableBuilder) initBuffer() {
b.serializedKeyVectorBuffer = make([][]byte, chunk.InitialCapacity)
b.partIdxVector = make([]int, 0, chunk.InitialCapacity)
b.hashValue = make([]uint64, 0, chunk.InitialCapacity)
if b.hasFilter {
b.filterVector = make([]bool, 0, chunk.InitialCapacity)
}
if b.hasNullableKey {
b.nullKeyVector = make([]bool, 0, chunk.InitialCapacity)
for i := 0; i < chunk.InitialCapacity; i++ {
b.nullKeyVector = append(b.nullKeyVector, false)
}
}
b.selRows = make([]int, 0, chunk.InitialCapacity)
for i := 0; i < chunk.InitialCapacity; i++ {
b.selRows = append(b.selRows, i)
buildKeyIndex: buildKeyIndex,
buildKeyTypes: buildKeyTypes,
rowNumberInCurrentRowTableSeg: make([]int64, partitionNumber),
hasNullableKey: hasNullableKey,
hasFilter: hasFilter,
keepFilteredRows: keepFilteredRows,
}
return builder
}

func (b *rowTableBuilder) initHashValueAndPartIndexForOneChunk(partitionNumber uint64) {
Expand All @@ -550,6 +529,7 @@ func (b *rowTableBuilder) initHashValueAndPartIndexForOneChunk(partitionNumber u

func (b *rowTableBuilder) processOneChunk(chk *chunk.Chunk, typeCtx types.Context, hashJoinCtx *HashJoinCtxV2, workerID int) error {
b.ResetBuffer(chk)
b.firstSegRowSizeHint = max(uint(1), uint(float64(len(b.usedRows))/float64(hashJoinCtx.PartitionNumber)*float64(1.2)))
var err error
if b.hasFilter {
b.filterVector, err = expression.VectorizedFilter(hashJoinCtx.SessCtx.GetExprCtx().GetEvalCtx(), hashJoinCtx.SessCtx.GetSessionVars().EnableVectorizedExpression, hashJoinCtx.BuildFilter, chunk.NewIterator4Chunk(chk), b.filterVector)
Expand Down Expand Up @@ -630,8 +610,7 @@ func newRowTable(meta *TableMeta) *rowTable {

func (b *rowTableBuilder) appendRemainingRowLocations(workerID int, htCtx *hashTableContext) {
for partID := 0; partID < int(htCtx.hashTable.partitionNumber); partID++ {
startPosInRawData := b.startPosInRawData[partID]
if len(startPosInRawData) > 0 {
if b.rowNumberInCurrentRowTableSeg[partID] > 0 {
htCtx.finalizeCurrentSeg(workerID, partID, b)
}
}
Expand Down Expand Up @@ -725,20 +704,24 @@ func (b *rowTableBuilder) appendToRowTable(chk *chunk.Chunk, hashJoinCtx *HashJo
if !hasValidKey && !b.keepFilteredRows {
continue
}
// need append the row to rowTable
var (
row = chk.GetRow(logicalRowIndex)
partIdx = b.partIdxVector[logicalRowIndex]
seg *rowTableSegment
)
if b.crrntSizeOfRowTable[partIdx] >= maxRowTableSegmentSize {
seg = hashJoinCtx.hashTableContext.getCurrentRowSegment(workerID, partIdx, hashJoinCtx.hashTableMeta, true, b.firstSegRowSizeHint)
// first check if current seg is full
if b.rowNumberInCurrentRowTableSeg[partIdx] >= maxRowTableSegmentSize || len(seg.rawData) >= maxRowTableSegmentByteSize {
// finalize current seg and create a new seg
hashJoinCtx.hashTableContext.finalizeCurrentSeg(workerID, partIdx, b)
seg = hashJoinCtx.hashTableContext.getCurrentRowSegment(workerID, partIdx, hashJoinCtx.hashTableMeta, true, b.firstSegRowSizeHint)
}
seg = hashJoinCtx.hashTableContext.getCurrentRowSegment(workerID, partIdx, hashJoinCtx.hashTableMeta, true)
if hasValidKey {
seg.validJoinKeyPos = append(seg.validJoinKeyPos, len(seg.hashValues))
}
seg.hashValues = append(seg.hashValues, b.hashValue[logicalRowIndex])
b.startPosInRawData[partIdx] = append(b.startPosInRawData[partIdx], uint64(len(seg.rawData)))
seg.rowStartOffset = append(seg.rowStartOffset, uint64(len(seg.rawData)))
rowLength := 0
// fill next_row_ptr field
rowLength += fillNextRowPtr(seg)
Expand All @@ -752,7 +735,7 @@ func (b *rowTableBuilder) appendToRowTable(chk *chunk.Chunk, hashJoinCtx *HashJo
if rowLength%8 != 0 {
seg.rawData = append(seg.rawData, fakeAddrPlaceHolder[:8-rowLength%8]...)
}
b.crrntSizeOfRowTable[partIdx]++
b.rowNumberInCurrentRowTableSeg[partIdx]++
}
return nil
}
Expand Down
49 changes: 48 additions & 1 deletion pkg/executor/join/row_table_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ func checkKeys(t *testing.T, withSelCol bool, buildFilter expression.CNFExprs, b
hashJoinCtx.initHashTableContext()
hashJoinCtx.SessCtx = mock.NewContext()
err := builder.processOneChunk(chk, hashJoinCtx.SessCtx.GetSessionVars().StmtCtx.TypeCtx(), hashJoinCtx, 0)
builder.appendRemainingRowLocations(0, hashJoinCtx.hashTableContext)
require.NoError(t, err, "processOneChunk returns error")
builder.appendRemainingRowLocations(0, hashJoinCtx.hashTableContext)
require.Equal(t, chk.NumRows(), len(builder.usedRows))
rowTables := hashJoinCtx.hashTableContext.rowTables[0]
checkRowLocationAlignment(t, rowTables)
Expand Down Expand Up @@ -159,6 +159,53 @@ func checkKeys(t *testing.T, withSelCol bool, buildFilter expression.CNFExprs, b
}
}

func TestLargeColumn(t *testing.T) {
intTp := types.NewFieldType(mysql.TypeLonglong)
stringTp := types.NewFieldType(mysql.TypeVarString)
buildKeyIndex := []int{0, 1}
buildKeyTypes := []*types.FieldType{intTp, stringTp}
buildTypes := []*types.FieldType{intTp, stringTp}
probeKeyTypes := []*types.FieldType{intTp, stringTp}

meta := newTableMeta(buildKeyIndex, buildTypes, buildKeyTypes, probeKeyTypes, nil, []int{1}, false)
buildSchema := &expression.Schema{}
for _, tp := range buildTypes {
buildSchema.Append(&expression.Column{
RetType: tp,
})
}
builder := createRowTableBuilder(buildKeyIndex, buildKeyTypes, 1, true, false, false)
rows := 2048
chk := chunk.NewEmptyChunk(buildTypes)
// each string value is 256k
stringValue := make([]byte, 1024*256)
for i := 0; i < rows; i++ {
// first column is int
chk.AppendInt64(0, int64(i))
chk.AppendBytes(1, stringValue)
}

hashJoinCtx := &HashJoinCtxV2{
PartitionNumber: 1,
hashTableMeta: meta,
}
hashJoinCtx.Concurrency = 1
hashJoinCtx.initHashTableContext()
hashJoinCtx.SessCtx = mock.NewContext()
err := builder.processOneChunk(chk, hashJoinCtx.SessCtx.GetSessionVars().StmtCtx.TypeCtx(), hashJoinCtx, 0)
require.NoError(t, err, "processOneChunk returns error")
builder.appendRemainingRowLocations(0, hashJoinCtx.hashTableContext)
require.Equal(t, chk.NumRows(), len(builder.usedRows))
rowTables := hashJoinCtx.hashTableContext.rowTables[0]
checkRowLocationAlignment(t, rowTables)
for _, rowTable := range rowTables {
for _, seg := range rowTable.segments {
require.True(t, len(seg.rawData) < maxRowTableSegmentByteSize*2)
require.True(t, len(seg.hashValues) < maxRowTableSegmentSize)
}
}
}

func TestKey(t *testing.T) {
intTp := types.NewFieldType(mysql.TypeLonglong)
uintTp := types.NewFieldType(mysql.TypeLonglong)
Expand Down

0 comments on commit e858f55

Please sign in to comment.