Skip to content

Commit

Permalink
expression: remove providing EnableVectorizedExpression in `EvalCon…
Browse files Browse the repository at this point in the history
…text` (#51567)

close #51563
  • Loading branch information
lcwangchao authored Mar 7, 2024
1 parent f94a6ba commit 87bcf32
Show file tree
Hide file tree
Showing 21 changed files with 67 additions and 65 deletions.
2 changes: 1 addition & 1 deletion pkg/executor/aggregate/agg_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func GetGroupKey(ctx sessionctx.Context, input *chunk.Chunk, groupKey [][]byte,
tp = &newTp
}

if err := expression.EvalExpr(exprCtx, item, tp.EvalType(), input, buf); err != nil {
if err := expression.EvalExpr(exprCtx, ctx.GetSessionVars().EnableVectorizedExpression, item, tp.EvalType(), input, buf); err != nil {
expression.PutColumn(buf)
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1709,7 +1709,7 @@ func (b *executorBuilder) buildStreamAgg(v *plannercore.PhysicalStreamAgg) exec.
exprCtx := b.ctx.GetExprCtx()
e := &aggregate.StreamAggExec{
BaseExecutor: exec.NewBaseExecutor(b.ctx, v.Schema(), v.ID(), src),
GroupChecker: vecgroupchecker.NewVecGroupChecker(exprCtx, v.GroupByItems),
GroupChecker: vecgroupchecker.NewVecGroupChecker(exprCtx, b.ctx.GetSessionVars().EnableVectorizedExpression, v.GroupByItems),
AggFuncs: make([]aggfuncs.AggFunc, 0, len(v.AggFuncs)),
}

Expand Down Expand Up @@ -4680,7 +4680,7 @@ func (b *executorBuilder) buildWindow(v *plannercore.PhysicalWindow) exec.Execut
if b.ctx.GetSessionVars().EnablePipelinedWindowExec {
exec := &PipelinedWindowExec{
BaseExecutor: base,
groupChecker: vecgroupchecker.NewVecGroupChecker(b.ctx.GetExprCtx(), groupByItems),
groupChecker: vecgroupchecker.NewVecGroupChecker(b.ctx.GetExprCtx(), b.ctx.GetSessionVars().EnableVectorizedExpression, groupByItems),
numWindowFuncs: len(v.WindowFuncDescs),
}

Expand Down Expand Up @@ -4758,7 +4758,7 @@ func (b *executorBuilder) buildWindow(v *plannercore.PhysicalWindow) exec.Execut
}
return &WindowExec{BaseExecutor: base,
processor: processor,
groupChecker: vecgroupchecker.NewVecGroupChecker(b.ctx.GetExprCtx(), groupByItems),
groupChecker: vecgroupchecker.NewVecGroupChecker(b.ctx.GetExprCtx(), b.ctx.GetSessionVars().EnableVectorizedExpression, groupByItems),
numWindowFuncs: len(v.WindowFuncDescs),
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1621,7 +1621,7 @@ func (e *SelectionExec) Next(ctx context.Context, req *chunk.Chunk) error {
if e.childResult.NumRows() == 0 {
return nil
}
e.selected, err = expression.VectorizedFilter(e.Ctx().GetExprCtx(), e.filters, e.inputIter, e.selected)
e.selected, err = expression.VectorizedFilter(e.Ctx().GetExprCtx(), e.Ctx().GetSessionVars().EnableVectorizedExpression, e.filters, e.inputIter, e.selected)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/index_lookup_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ func (ow *outerWorker) buildTask(ctx context.Context) (*lookUpJoinTask, error) {
chk := task.outerResult.GetChunk(i)
outerMatch := make([]bool, 0, chk.NumRows())
task.memTracker.Consume(int64(cap(outerMatch)))
task.outerMatch[i], err = expression.VectorizedFilter(exprCtx, ow.filter, chunk.NewIterator4Chunk(chk), outerMatch)
task.outerMatch[i], err = expression.VectorizedFilter(exprCtx, ow.ctx.GetSessionVars().EnableVectorizedExpression, ow.filter, chunk.NewIterator4Chunk(chk), outerMatch)
if err != nil {
return task, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/index_lookup_merge_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ func (imw *innerMergeWorker) handleTask(ctx context.Context, task *lookUpMergeJo
for i := 0; i < numOuterChks; i++ {
chk := task.outerResult.GetChunk(i)
task.outerMatch[i] = make([]bool, chk.NumRows())
task.outerMatch[i], err = expression.VectorizedFilter(exprCtx, imw.outerMergeCtx.filter, chunk.NewIterator4Chunk(chk), task.outerMatch[i])
task.outerMatch[i], err = expression.VectorizedFilter(exprCtx, imw.ctx.GetSessionVars().EnableVectorizedExpression, imw.outerMergeCtx.filter, chunk.NewIterator4Chunk(chk), task.outerMatch[i])
if err != nil {
return err
}
Expand Down
12 changes: 8 additions & 4 deletions pkg/executor/internal/vecgroupchecker/vec_group_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,16 @@ type VecGroupChecker struct {

// groupCount is the count of groups in the current chunk
groupCount int

// vecEnabled indicates whether to use vectorized evaluation or not.
vecEnabled bool
}

// NewVecGroupChecker creates a new VecGroupChecker
func NewVecGroupChecker(ctx expression.EvalContext, items []expression.Expression) *VecGroupChecker {
func NewVecGroupChecker(ctx expression.EvalContext, vecEnabled bool, items []expression.Expression) *VecGroupChecker {
return &VecGroupChecker{
ctx: ctx,
vecEnabled: vecEnabled,
GroupByItems: items,
groupCount: 0,
nextGroupID: 0,
Expand Down Expand Up @@ -137,7 +141,7 @@ func (e *VecGroupChecker) SplitIntoGroups(chk *chunk.Chunk) (isFirstGroupSameAsP
}

for _, item := range e.GroupByItems {
err = e.evalGroupItemsAndResolveGroups(item, chk, numRows)
err = e.evalGroupItemsAndResolveGroups(item, e.vecEnabled, chk, numRows)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -323,7 +327,7 @@ func (e *VecGroupChecker) getFirstAndLastRowDatum(
// evalGroupItemsAndResolveGroups evaluates the chunk according to the expression item.
// And resolve the rows into groups according to the evaluation results
func (e *VecGroupChecker) evalGroupItemsAndResolveGroups(
item expression.Expression, chk *chunk.Chunk, numRows int) (err error) {
item expression.Expression, vecEnabled bool, chk *chunk.Chunk, numRows int) (err error) {
tp := item.GetType()
eType := tp.EvalType()
if e.allocateBuffer == nil {
Expand All @@ -337,7 +341,7 @@ func (e *VecGroupChecker) evalGroupItemsAndResolveGroups(
return err
}
defer e.releaseBuffer(col)
err = expression.EvalExpr(e.ctx, item, eType, chk, col)
err = expression.EvalExpr(e.ctx, vecEnabled, item, eType, chk, col)
if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestVecGroupCheckerDATARACE(t *testing.T) {
RetType: types.NewFieldTypeBuilder().SetType(mType).BuildP(),
Index: 0,
}
vgc := NewVecGroupChecker(ctx, exprs)
vgc := NewVecGroupChecker(ctx, ctx.GetSessionVars().EnableVectorizedExpression, exprs)

fts := []*types.FieldType{types.NewFieldType(mType)}
chk := chunk.New(fts, 1, 1)
Expand Down Expand Up @@ -186,7 +186,7 @@ func TestVecGroupChecker4GroupCount(t *testing.T) {
ctx := mock.NewContext()
for _, testCase := range testCases {
expr, inputChks := genTestChunk4VecGroupChecker(testCase.chunkRows, testCase.sameNum)
groupChecker := NewVecGroupChecker(ctx, expr)
groupChecker := NewVecGroupChecker(ctx, ctx.GetSessionVars().EnableVectorizedExpression, expr)
groupNum := 0
for i, inputChk := range inputChks {
flag, err := groupChecker.SplitIntoGroups(inputChk)
Expand All @@ -209,7 +209,7 @@ func TestVecGroupChecker(t *testing.T) {
Index: 0,
}
ctx := mock.NewContext()
groupChecker := NewVecGroupChecker(ctx, []expression.Expression{col0})
groupChecker := NewVecGroupChecker(ctx, ctx.GetSessionVars().EnableVectorizedExpression, []expression.Expression{col0})

chk := chunk.New([]*types.FieldType{tp}, 6, 6)
chk.Reset()
Expand Down
8 changes: 4 additions & 4 deletions pkg/executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -987,7 +987,7 @@ func (w *probeWorker) getNewJoinResult() (bool, *hashjoinWorkerResult) {
func (w *probeWorker) join2Chunk(probeSideChk *chunk.Chunk, hCtx *hashContext, joinResult *hashjoinWorkerResult,
selected []bool) (ok bool, _ *hashjoinWorkerResult) {
var err error
selected, err = expression.VectorizedFilter(w.hashJoinCtx.sessCtx.GetExprCtx(), w.hashJoinCtx.outerFilter, chunk.NewIterator4Chunk(probeSideChk), selected)
selected, err = expression.VectorizedFilter(w.hashJoinCtx.sessCtx.GetExprCtx(), w.hashJoinCtx.sessCtx.GetSessionVars().EnableVectorizedExpression, w.hashJoinCtx.outerFilter, chunk.NewIterator4Chunk(probeSideChk), selected)
if err != nil {
joinResult.err = err
return false, joinResult
Expand Down Expand Up @@ -1249,7 +1249,7 @@ func (w *buildWorker) buildHashTableForList(buildSideResultCh <-chan *chunk.Chun
if len(w.hashJoinCtx.outerFilter) == 0 {
err = w.hashJoinCtx.rowContainer.PutChunk(chk, w.hashJoinCtx.isNullEQ)
} else {
selected, err = expression.VectorizedFilter(w.hashJoinCtx.sessCtx.GetExprCtx(), w.hashJoinCtx.outerFilter, chunk.NewIterator4Chunk(chk), selected)
selected, err = expression.VectorizedFilter(w.hashJoinCtx.sessCtx.GetExprCtx(), w.hashJoinCtx.sessCtx.GetSessionVars().EnableVectorizedExpression, w.hashJoinCtx.outerFilter, chunk.NewIterator4Chunk(chk), selected)
if err != nil {
return err
}
Expand Down Expand Up @@ -1396,7 +1396,7 @@ func (e *NestedLoopApplyExec) fetchSelectedOuterRow(ctx context.Context, chk *ch
if e.outerChunk.NumRows() == 0 {
return nil, nil
}
e.outerSelected, err = expression.VectorizedFilter(e.ctx.GetExprCtx(), e.outerFilter, outerIter, e.outerSelected)
e.outerSelected, err = expression.VectorizedFilter(e.ctx.GetExprCtx(), e.ctx.GetSessionVars().EnableVectorizedExpression, e.outerFilter, outerIter, e.outerSelected)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1447,7 +1447,7 @@ func (e *NestedLoopApplyExec) fetchAllInners(ctx context.Context) error {
return nil
}

e.innerSelected, err = expression.VectorizedFilter(e.ctx.GetExprCtx(), e.innerFilter, innerIter, e.innerSelected)
e.innerSelected, err = expression.VectorizedFilter(e.ctx.GetExprCtx(), e.ctx.GetSessionVars().EnableVectorizedExpression, e.innerFilter, innerIter, e.innerSelected)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/executor/joiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func (j *baseJoiner) makeShallowJoinRow(isRightJoin bool, inner, outer chunk.Row
// indicates whether the outer row matches any inner rows.
func (j *baseJoiner) filter(input, output *chunk.Chunk, outerColLen int, lUsed, rUsed []int) (bool, error) {
var err error
j.selected, err = expression.VectorizedFilter(j.ctx.GetExprCtx(), j.conditions, chunk.NewIterator4Chunk(input), j.selected)
j.selected, err = expression.VectorizedFilter(j.ctx.GetExprCtx(), j.ctx.GetSessionVars().EnableVectorizedExpression, j.conditions, chunk.NewIterator4Chunk(input), j.selected)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -301,7 +301,7 @@ func (j *baseJoiner) filterAndCheckOuterRowStatus(
input, output *chunk.Chunk, innerColsLen int, outerRowStatus []outerRowStatusFlag,
lUsed, rUsed []int) ([]outerRowStatusFlag, error) {
var err error
j.selected, j.isNull, err = expression.VectorizedFilterConsiderNull(j.ctx.GetExprCtx(), j.conditions, chunk.NewIterator4Chunk(input), j.selected, j.isNull)
j.selected, j.isNull, err = expression.VectorizedFilterConsiderNull(j.ctx.GetExprCtx(), j.ctx.GetSessionVars().EnableVectorizedExpression, j.conditions, chunk.NewIterator4Chunk(input), j.selected, j.isNull)
if err != nil {
return nil, err
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/executor/merge_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ func (t *mergeJoinTable) init(executor *MergeJoinExec) {
for _, col := range t.joinKeys {
items = append(items, col)
}
t.groupChecker = vecgroupchecker.NewVecGroupChecker(executor.Ctx().GetExprCtx(), items)
vecEnabled := executor.Ctx().GetSessionVars().EnableVectorizedExpression
t.groupChecker = vecgroupchecker.NewVecGroupChecker(executor.Ctx().GetExprCtx(), vecEnabled, items)
t.groupRowsIter = chunk.NewIterator4Chunk(t.childChunk)

if t.isInner {
Expand Down Expand Up @@ -257,7 +258,7 @@ func (t *mergeJoinTable) fetchNextOuterGroup(ctx context.Context, exec *MergeJoi
}

t.childChunkIter.Begin()
t.filtersSelected, err = expression.VectorizedFilter(exec.Ctx().GetExprCtx(), t.filters, t.childChunkIter, t.filtersSelected)
t.filtersSelected, err = expression.VectorizedFilter(exec.Ctx().GetExprCtx(), exec.Ctx().GetSessionVars().EnableVectorizedExpression, t.filters, t.childChunkIter, t.filtersSelected)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/executor/parallel_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (e *ParallelNestedLoopApplyExec) outerWorker(ctx context.Context) {
}
e.outerList.Add(chk)
outerIter := chunk.NewIterator4Chunk(chk)
selected, err = expression.VectorizedFilter(e.Ctx().GetExprCtx(), e.outerFilter, outerIter, selected)
selected, err = expression.VectorizedFilter(e.Ctx().GetExprCtx(), e.Ctx().GetSessionVars().EnableVectorizedExpression, e.outerFilter, outerIter, selected)
if err != nil {
e.putResult(nil, err)
return
Expand Down Expand Up @@ -326,7 +326,7 @@ func (e *ParallelNestedLoopApplyExec) fetchAllInners(ctx context.Context, id int
break
}

e.innerSelected[id], err = expression.VectorizedFilter(e.Ctx().GetExprCtx(), e.innerFilter[id], innerIter, e.innerSelected[id])
e.innerSelected[id], err = expression.VectorizedFilter(e.Ctx().GetExprCtx(), e.Ctx().GetSessionVars().EnableVectorizedExpression, e.innerFilter[id], innerIter, e.innerSelected[id])
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/executor/projection.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (e *ProjectionExec) unParallelExecute(ctx context.Context, chk *chunk.Chunk
if e.childResult.NumRows() == 0 {
return nil
}
err = e.evaluatorSuit.Run(e.Ctx().GetExprCtx(), e.childResult, chk)
err = e.evaluatorSuit.Run(e.Ctx().GetExprCtx(), e.Ctx().GetSessionVars().EnableVectorizedExpression, e.childResult, chk)
return err
}

Expand Down Expand Up @@ -448,7 +448,7 @@ func (w *projectionWorker) run(ctx context.Context) {
}

mSize := output.chk.MemoryUsage() + input.chk.MemoryUsage()
err := w.evaluatorSuit.Run(w.sctx.GetExprCtx(), input.chk, output.chk)
err := w.evaluatorSuit.Run(w.sctx.GetExprCtx(), w.sctx.GetSessionVars().EnableVectorizedExpression, input.chk, output.chk)
failpoint.Inject("ConsumeRandomPanic", nil)
w.proj.memTracker.Consume(output.chk.MemoryUsage() + input.chk.MemoryUsage() - mSize)
output.done <- err
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/shuffle.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ func buildPartitionRangeSplitter(ctx sessionctx.Context, concurrency int, byItem
return &partitionRangeSplitter{
byItems: byItems,
numWorkers: concurrency,
groupChecker: vecgroupchecker.NewVecGroupChecker(ctx.GetExprCtx(), byItems),
groupChecker: vecgroupchecker.NewVecGroupChecker(ctx.GetExprCtx(), ctx.GetSessionVars().EnableVectorizedExpression, byItems),
idx: 0,
}
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/expression/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2012,6 +2012,7 @@ func BenchmarkVecEvalBool(b *testing.B) {
nulls := make([]bool, 0, 1024)
eTypes := []types.EvalType{types.ETInt, types.ETReal, types.ETDecimal, types.ETString, types.ETTimestamp, types.ETDatetime, types.ETDuration}
tNames := []string{"int", "real", "decimal", "string", "timestamp", "datetime", "duration"}
vecEnabled := ctx.GetSessionVars().EnableVectorizedExpression
for numCols := 1; numCols <= 2; numCols++ {
typeCombination := make([]types.EvalType, numCols)
var combFunc func(nCols int)
Expand All @@ -2029,7 +2030,7 @@ func BenchmarkVecEvalBool(b *testing.B) {
b.Run("Vec-"+name, func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, _, err := VecEvalBool(ctx, exprs, input, selected, nulls)
_, _, err := VecEvalBool(ctx, vecEnabled, exprs, input, selected, nulls)
if err != nil {
b.Fatal(err)
}
Expand Down Expand Up @@ -2083,7 +2084,7 @@ func BenchmarkRowBasedFilterAndVectorizedFilter(b *testing.B) {
b.Run("Vec-"+name, func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, _, err := vectorizedFilter(ctx, exprs, it, selected, nulls)
_, _, err := vectorizedFilter(ctx, ctx.GetSessionVars().EnableVectorizedExpression, exprs, it, selected, nulls)
if err != nil {
b.Fatal(err)
}
Expand Down Expand Up @@ -2118,7 +2119,7 @@ func BenchmarkRowBasedFilterAndVectorizedFilter(b *testing.B) {
b.Run("Vec-special case", func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, _, err := vectorizedFilter(ctx, []Expression{expr}, it, selected, nulls)
_, _, err := vectorizedFilter(ctx, ctx.GetSessionVars().EnableVectorizedExpression, []Expression{expr}, it, selected, nulls)
if err != nil {
panic(err)
}
Expand Down
15 changes: 6 additions & 9 deletions pkg/expression/builtin_vectorized_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -831,7 +831,7 @@ func TestVecEvalBool(t *testing.T) {
for numCols := 1; numCols <= 5; numCols++ {
for round := 0; round < 16; round++ {
exprs, input := genVecEvalBool(numCols, nil, eTypes)
selected, nulls, err := VecEvalBool(ctx, exprs, input, nil, nil)
selected, nulls, err := VecEvalBool(ctx, ctx.GetSessionVars().EnableVectorizedExpression, exprs, input, nil, nil)
require.NoError(t, err)
it := chunk.NewIterator4Chunk(input)
i := 0
Expand All @@ -856,7 +856,7 @@ func TestRowBasedFilterAndVectorizedFilter(t *testing.T) {
isNull := make([]bool, it.Len())
selected, nulls, err := rowBasedFilter(ctx, exprs, it, nil, isNull)
require.NoError(t, err)
selected2, nulls2, err2 := vectorizedFilter(ctx, exprs, it, nil, isNull)
selected2, nulls2, err2 := vectorizedFilter(ctx, ctx.GetSessionVars().EnableVectorizedExpression, exprs, it, nil, isNull)
require.NoError(t, err2)
length := it.Len()
for i := 0; i < length; i++ {
Expand All @@ -876,11 +876,9 @@ func TestVectorizedFilterConsiderNull(t *testing.T) {
exprs, input := genVecEvalBool(numCols, nil, eTypes)
it := chunk.NewIterator4Chunk(input)
isNull := make([]bool, it.Len())
ctx.GetSessionVars().EnableVectorizedExpression = false
selected, nulls, err := VectorizedFilterConsiderNull(ctx, exprs, it, nil, isNull)
selected, nulls, err := VectorizedFilterConsiderNull(ctx, false, exprs, it, nil, isNull)
require.NoError(t, err)
ctx.GetSessionVars().EnableVectorizedExpression = true
selected2, nulls2, err2 := VectorizedFilterConsiderNull(ctx, exprs, it, nil, isNull)
selected2, nulls2, err2 := VectorizedFilterConsiderNull(ctx, true, exprs, it, nil, isNull)
require.NoError(t, err2)
length := it.Len()
for i := 0; i < length; i++ {
Expand All @@ -893,11 +891,10 @@ func TestVectorizedFilterConsiderNull(t *testing.T) {
input.SetSel(randomSel)
it2 := chunk.NewIterator4Chunk(input)
isNull = isNull[:0]
ctx.GetSessionVars().EnableVectorizedExpression = false
selected3, nulls, err := VectorizedFilterConsiderNull(ctx, exprs, it2, nil, isNull)
selected3, nulls, err := VectorizedFilterConsiderNull(ctx, false, exprs, it2, nil, isNull)
require.NoError(t, err)
ctx.GetSessionVars().EnableVectorizedExpression = true
selected4, nulls2, err2 := VectorizedFilterConsiderNull(ctx, exprs, it2, nil, isNull)
selected4, nulls2, err2 := VectorizedFilterConsiderNull(ctx, true, exprs, it2, nil, isNull)
require.NoError(t, err2)
for i := 0; i < length; i++ {
require.Equal(t, nulls[i], nulls2[i])
Expand Down
Loading

0 comments on commit 87bcf32

Please sign in to comment.