Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: avoid using columnEvaluator for the Projectin build by buildProjtion4Union (#8142) #8165

Merged
merged 3 commits into from
Nov 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -981,7 +981,7 @@ func (b *executorBuilder) buildProjBelowAgg(aggFuncs []*aggregation.AggFuncDesc,

return &ProjectionExec{
baseExecutor: newBaseExecutor(b.ctx, expression.NewSchema(projSchemaCols...), projFromID, src),
evaluatorSuit: expression.NewEvaluatorSuit(projExprs),
evaluatorSuit: expression.NewEvaluatorSuite(projExprs, false),
}
}

Expand Down Expand Up @@ -1112,7 +1112,7 @@ func (b *executorBuilder) buildProjection(v *plannercore.PhysicalProjection) Exe
e := &ProjectionExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), childExec),
numWorkers: b.ctx.GetSessionVars().ProjectionConcurrency,
evaluatorSuit: expression.NewEvaluatorSuit(v.Exprs),
evaluatorSuit: expression.NewEvaluatorSuite(v.Exprs, v.AvoidColumnEvaluator),
calculateNoDelay: v.CalculateNoDelay,
}

Expand Down
7 changes: 7 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1044,6 +1044,13 @@ func (s *testSuite) TestUnion(c *C) {
tk.MustExec("create table t(a int, b int)")
tk.MustExec("insert into t value(1 ,2)")
tk.MustQuery("select a, b from (select a, 0 as d, b from t union all select a, 0 as d, b from t) test;").Check(testkit.Rows("1 2", "1 2"))

// #issue 8141
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1(a int, b int)")
tk.MustExec("insert into t1 value(1,2),(1,1),(2,2),(2,2),(3,2),(3,2)")
tk.MustExec("set @@tidb_max_chunk_size=2;")
tk.MustQuery("select count(*) from (select a as c, a as d from t1 union all select a, b from t1) t;").Check(testkit.Rows("12"))
}

func (s *testSuite) TestNeighbouringProj(c *C) {
Expand Down
4 changes: 2 additions & 2 deletions executor/projection.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type projectionOutput struct {
type ProjectionExec struct {
baseExecutor

evaluatorSuit *expression.EvaluatorSuit
evaluatorSuit *expression.EvaluatorSuite
calculateNoDelay bool

prepared bool
Expand Down Expand Up @@ -295,7 +295,7 @@ func (f *projectionInputFetcher) run(ctx context.Context) {

type projectionWorker struct {
sctx sessionctx.Context
evaluatorSuit *expression.EvaluatorSuit
evaluatorSuit *expression.EvaluatorSuite
globalFinishCh <-chan struct{}
inputGiveBackCh chan<- *projectionInput

Expand Down
42 changes: 21 additions & 21 deletions expression/evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,36 +64,36 @@ func (e *defaultEvaluator) run(ctx sessionctx.Context, input, output *chunk.Chun
return nil
}

// EvaluatorSuit is responsible for the evaluation of a list of expressions.
// EvaluatorSuite is responsible for the evaluation of a list of expressions.
// It separates them to "column" and "other" expressions and evaluates "other"
// expressions before "column" expressions.
type EvaluatorSuit struct {
type EvaluatorSuite struct {
*columnEvaluator // Evaluator for column expressions.
*defaultEvaluator // Evaluator for other expressions.
}

// NewEvaluatorSuit creates an EvaluatorSuit to evaluate all the exprs.
func NewEvaluatorSuit(exprs []Expression) *EvaluatorSuit {
e := &EvaluatorSuit{}
// NewEvaluatorSuite creates an EvaluatorSuite to evaluate all the exprs.
// avoidColumnEvaluator can be removed after column pool is supported.
func NewEvaluatorSuite(exprs []Expression, avoidColumnEvaluator bool) *EvaluatorSuite {
e := &EvaluatorSuite{}

for i, expr := range exprs {
switch x := expr.(type) {
case *Column:
for i := 0; i < len(exprs); i++ {
if col, isCol := exprs[i].(*Column); isCol && !avoidColumnEvaluator {
if e.columnEvaluator == nil {
e.columnEvaluator = &columnEvaluator{inputIdxToOutputIdxes: make(map[int][]int)}
}
inputIdx, outputIdx := x.Index, i
inputIdx, outputIdx := col.Index, i
e.columnEvaluator.inputIdxToOutputIdxes[inputIdx] = append(e.columnEvaluator.inputIdxToOutputIdxes[inputIdx], outputIdx)
default:
if e.defaultEvaluator == nil {
e.defaultEvaluator = &defaultEvaluator{
outputIdxes: make([]int, 0, len(exprs)),
exprs: make([]Expression, 0, len(exprs)),
}
continue
}
if e.defaultEvaluator == nil {
e.defaultEvaluator = &defaultEvaluator{
outputIdxes: make([]int, 0, len(exprs)),
exprs: make([]Expression, 0, len(exprs)),
}
e.defaultEvaluator.exprs = append(e.defaultEvaluator.exprs, x)
e.defaultEvaluator.outputIdxes = append(e.defaultEvaluator.outputIdxes, i)
}
e.defaultEvaluator.exprs = append(e.defaultEvaluator.exprs, exprs[i])
e.defaultEvaluator.outputIdxes = append(e.defaultEvaluator.outputIdxes, i)
}

if e.defaultEvaluator != nil {
Expand All @@ -102,14 +102,14 @@ func NewEvaluatorSuit(exprs []Expression) *EvaluatorSuit {
return e
}

// Vectorizable checks whether this EvaluatorSuit can use vectorizd execution mode.
func (e *EvaluatorSuit) Vectorizable() bool {
// Vectorizable checks whether this EvaluatorSuite can use vectorizd execution mode.
func (e *EvaluatorSuite) Vectorizable() bool {
return e.defaultEvaluator == nil || e.defaultEvaluator.vectorizable
}

// Run evaluates all the expressions hold by this EvaluatorSuit.
// Run evaluates all the expressions hold by this EvaluatorSuite.
// NOTE: "defaultEvaluator" must be evaluated before "columnEvaluator".
func (e *EvaluatorSuit) Run(ctx sessionctx.Context, input, output *chunk.Chunk) error {
func (e *EvaluatorSuite) Run(ctx sessionctx.Context, input, output *chunk.Chunk) error {
if e.defaultEvaluator != nil {
err := e.defaultEvaluator.run(ctx, input, output)
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -692,8 +692,9 @@ func (p *LogicalProjection) exhaustPhysicalPlans(prop *property.PhysicalProperty
return nil
}
proj := PhysicalProjection{
Exprs: p.Exprs,
CalculateNoDelay: p.calculateNoDelay,
Exprs: p.Exprs,
CalculateNoDelay: p.calculateNoDelay,
AvoidColumnEvaluator: p.avoidColumnEvaluator,
}.init(p.ctx, p.stats.ScaleByExpectCnt(prop.ExpectedCnt), newProp)
proj.SetSchema(p.schema)
return []PhysicalPlan{proj}
Expand Down
2 changes: 1 addition & 1 deletion planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,7 @@ func (b *planBuilder) buildProjection4Union(u *LogicalUnionAll) {
}
}
b.optFlag |= flagEliminateProjection
proj := LogicalProjection{Exprs: exprs}.init(b.ctx)
proj := LogicalProjection{Exprs: exprs, avoidColumnEvaluator: true}.init(b.ctx)
proj.SetSchema(u.schema.Clone())
proj.SetChildren(child)
u.children[childID] = proj
Expand Down
7 changes: 7 additions & 0 deletions planner/core/logical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,13 @@ type LogicalProjection struct {
// Currently it is "true" only when the current sql query is a "DO" statement.
// See "https://dev.mysql.com/doc/refman/5.7/en/do.html" for more detail.
calculateNoDelay bool

// avoidColumnRef is a temporary variable which is ONLY used to avoid
// building columnEvaluator for the expressions of Projection which is
// built by buildProjection4Union.
// This can be removed after column pool being supported.
// Related issue: TiDB#8141(https://github.com/pingcap/tidb/issues/8141)
avoidColumnEvaluator bool
}

func (p *LogicalProjection) extractCorrelatedCols() []*expression.CorrelatedColumn {
Expand Down
5 changes: 3 additions & 2 deletions planner/core/physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,9 @@ func (ts *PhysicalTableScan) IsPartition() (bool, int64) {
type PhysicalProjection struct {
physicalSchemaProducer

Exprs []expression.Expression
CalculateNoDelay bool
Exprs []expression.Expression
CalculateNoDelay bool
AvoidColumnEvaluator bool
}

// PhysicalTopN is the physical operator of topN.
Expand Down