Skip to content

Commit

Permalink
executor: prevent sending cop request for show columns (#36613) (#36820)
Browse files Browse the repository at this point in the history
close #36426, ref #36496
  • Loading branch information
ti-srebot authored Aug 10, 2022
1 parent f5a0bbe commit 14066b2
Show file tree
Hide file tree
Showing 12 changed files with 62 additions and 35 deletions.
2 changes: 1 addition & 1 deletion executor/prepared.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (e *PrepareExec) Next(ctx context.Context, req *chunk.Chunk) error {
var p plannercore.Plan
e.ctx.GetSessionVars().PlanID = 0
e.ctx.GetSessionVars().PlanColumnID = 0
destBuilder, _ := plannercore.NewPlanBuilder(e.ctx, e.is, &hint.BlockHintProcessor{})
destBuilder, _ := plannercore.NewPlanBuilder().Init(e.ctx, e.is, &hint.BlockHintProcessor{})
p, err = destBuilder.Build(ctx, stmt)
if err != nil {
return err
Expand Down
3 changes: 2 additions & 1 deletion executor/show.go
Original file line number Diff line number Diff line change
Expand Up @@ -1693,7 +1693,8 @@ func (e *ShowExec) fetchShowBuiltins() error {
func tryFillViewColumnType(ctx context.Context, sctx sessionctx.Context, is infoschema.InfoSchema, dbName model.CIStr, tbl *model.TableInfo) error {
if tbl.IsView() {
// Retrieve view columns info.
planBuilder, _ := plannercore.NewPlanBuilder(sctx, is, &hint.BlockHintProcessor{})
planBuilder, _ := plannercore.NewPlanBuilder(
plannercore.PlanBuilderOptNoExecution{}).Init(sctx, is, &hint.BlockHintProcessor{})
if viewLogicalPlan, err := planBuilder.BuildDataSourceFromView(ctx, dbName, tbl); err == nil {
viewSchema := viewLogicalPlan.Schema()
viewOutputNames := viewLogicalPlan.OutputNames()
Expand Down
6 changes: 3 additions & 3 deletions planner/core/expression_rewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func rewriteAstExpr(sctx sessionctx.Context, expr ast.ExprNode, schema *expressi
if sctx.GetSessionVars().TxnCtx.InfoSchema != nil {
is = sctx.GetSessionVars().TxnCtx.InfoSchema.(infoschema.InfoSchema)
}
b, savedBlockNames := NewPlanBuilder(sctx, is, &hint.BlockHintProcessor{})
b, savedBlockNames := NewPlanBuilder().Init(sctx, is, &hint.BlockHintProcessor{})
fakePlan := LogicalTableDual{}.Init(sctx, 0)
if schema != nil {
fakePlan.schema = schema
Expand Down Expand Up @@ -780,7 +780,7 @@ func (er *expressionRewriter) handleExistSubquery(ctx context.Context, v *ast.Ex
return v, true
}
np = er.popExistsSubPlan(np)
if len(ExtractCorrelatedCols4LogicalPlan(np)) > 0 {
if er.b.disableSubQueryPreprocessing || len(ExtractCorrelatedCols4LogicalPlan(np)) > 0 {
er.p, er.err = er.b.buildSemiApply(er.p, np, nil, er.asScalar, v.Not)
if er.err != nil || !er.asScalar {
return v, true
Expand Down Expand Up @@ -949,7 +949,7 @@ func (er *expressionRewriter) handleScalarSubquery(ctx context.Context, v *ast.S
return v, true
}
np = er.b.buildMaxOneRow(np)
if len(ExtractCorrelatedCols4LogicalPlan(np)) > 0 {
if er.b.disableSubQueryPreprocessing || len(ExtractCorrelatedCols4LogicalPlan(np)) > 0 {
er.p = er.b.buildApplyWithJoinType(er.p, np, LeftOuterJoin)
if np.Schema().Len() > 1 {
newCols := make([]expression.Expression, 0, np.Schema().Len())
Expand Down
2 changes: 1 addition & 1 deletion planner/core/indexmerge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (s *testIndexMergeSuite) TestIndexMergePathGeneration(c *C) {
stmt, err := s.ParseOneStmt(tc, "", "")
c.Assert(err, IsNil, comment)
Preprocess(s.ctx, stmt, s.is)
builder, _ := NewPlanBuilder(MockContext(), s.is, &hint.BlockHintProcessor{})
builder, _ := NewPlanBuilder().Init(MockContext(), s.is, &hint.BlockHintProcessor{})
p, err := builder.Build(ctx, stmt)
if err != nil {
s.testdata.OnRecord(func() {
Expand Down
20 changes: 10 additions & 10 deletions planner/core/logical_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1163,7 +1163,7 @@ func (s *testPlanSuite) TestVisitInfo(c *C) {
stmt, err := s.ParseOneStmt(tt.sql, "", "")
c.Assert(err, IsNil, comment)
Preprocess(s.ctx, stmt, s.is)
builder, _ := NewPlanBuilder(MockContext(), s.is, &hint.BlockHintProcessor{})
builder, _ := NewPlanBuilder().Init(MockContext(), s.is, &hint.BlockHintProcessor{})
builder.ctx.GetSessionVars().SetHashJoinConcurrency(1)
_, err = builder.Build(context.TODO(), stmt)
c.Assert(err, IsNil, comment)
Expand Down Expand Up @@ -1243,7 +1243,7 @@ func (s *testPlanSuite) TestUnion(c *C) {
stmt, err := s.ParseOneStmt(tt, "", "")
c.Assert(err, IsNil, comment)
Preprocess(s.ctx, stmt, s.is)
builder, _ := NewPlanBuilder(MockContext(), s.is, &hint.BlockHintProcessor{})
builder, _ := NewPlanBuilder().Init(MockContext(), s.is, &hint.BlockHintProcessor{})
plan, err := builder.Build(ctx, stmt)
s.testData.OnRecord(func() {
output[i].Err = err != nil
Expand Down Expand Up @@ -1275,7 +1275,7 @@ func (s *testPlanSuite) TestTopNPushDown(c *C) {
stmt, err := s.ParseOneStmt(tt, "", "")
c.Assert(err, IsNil, comment)
Preprocess(s.ctx, stmt, s.is)
builder, _ := NewPlanBuilder(MockContext(), s.is, &hint.BlockHintProcessor{})
builder, _ := NewPlanBuilder().Init(MockContext(), s.is, &hint.BlockHintProcessor{})
p, err := builder.Build(ctx, stmt)
c.Assert(err, IsNil)
p, err = logicalOptimize(ctx, builder.optFlag, p.(LogicalPlan))
Expand Down Expand Up @@ -1349,7 +1349,7 @@ func (s *testPlanSuite) TestOuterJoinEliminator(c *C) {
stmt, err := s.ParseOneStmt(tt, "", "")
c.Assert(err, IsNil, comment)
Preprocess(s.ctx, stmt, s.is)
builder, _ := NewPlanBuilder(MockContext(), s.is, &hint.BlockHintProcessor{})
builder, _ := NewPlanBuilder().Init(MockContext(), s.is, &hint.BlockHintProcessor{})
p, err := builder.Build(ctx, stmt)
c.Assert(err, IsNil)
p, err = logicalOptimize(ctx, builder.optFlag, p.(LogicalPlan))
Expand Down Expand Up @@ -1385,7 +1385,7 @@ func (s *testPlanSuite) TestSelectView(c *C) {
stmt, err := s.ParseOneStmt(tt.sql, "", "")
c.Assert(err, IsNil, comment)
Preprocess(s.ctx, stmt, s.is)
builder, _ := NewPlanBuilder(MockContext(), s.is, &hint.BlockHintProcessor{})
builder, _ := NewPlanBuilder().Init(MockContext(), s.is, &hint.BlockHintProcessor{})
p, err := builder.Build(ctx, stmt)
c.Assert(err, IsNil)
p, err = logicalOptimize(ctx, builder.optFlag, p.(LogicalPlan))
Expand Down Expand Up @@ -1466,7 +1466,7 @@ func (s *testPlanSuite) optimize(ctx context.Context, sql string) (PhysicalPlan,
return nil, nil, err
}
}
builder, _ := NewPlanBuilder(sctx, s.is, &hint.BlockHintProcessor{})
builder, _ := NewPlanBuilder().Init(sctx, s.is, &hint.BlockHintProcessor{})
p, err := builder.Build(ctx, stmt)
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -1548,7 +1548,7 @@ func (s *testPlanSuite) TestSkylinePruning(c *C) {
stmt, err := s.ParseOneStmt(tt.sql, "", "")
c.Assert(err, IsNil, comment)
Preprocess(s.ctx, stmt, s.is)
builder, _ := NewPlanBuilder(MockContext(), s.is, &hint.BlockHintProcessor{})
builder, _ := NewPlanBuilder().Init(MockContext(), s.is, &hint.BlockHintProcessor{})
p, err := builder.Build(ctx, stmt)
if err != nil {
c.Assert(err.Error(), Equals, tt.result, comment)
Expand Down Expand Up @@ -1649,7 +1649,7 @@ func (s *testPlanSuite) TestUpdateEQCond(c *C) {
stmt, err := s.ParseOneStmt(tt.sql, "", "")
c.Assert(err, IsNil, comment)
Preprocess(s.ctx, stmt, s.is)
builder, _ := NewPlanBuilder(MockContext(), s.is, &hint.BlockHintProcessor{})
builder, _ := NewPlanBuilder().Init(MockContext(), s.is, &hint.BlockHintProcessor{})
p, err := builder.Build(ctx, stmt)
c.Assert(err, IsNil)
p, err = logicalOptimize(ctx, builder.optFlag, p.(LogicalPlan))
Expand All @@ -1665,7 +1665,7 @@ func (s *testPlanSuite) TestConflictedJoinTypeHints(c *C) {
stmt, err := s.ParseOneStmt(sql, "", "")
c.Assert(err, IsNil)
Preprocess(s.ctx, stmt, s.is)
builder, _ := NewPlanBuilder(MockContext(), s.is, &hint.BlockHintProcessor{})
builder, _ := NewPlanBuilder().Init(MockContext(), s.is, &hint.BlockHintProcessor{})
p, err := builder.Build(ctx, stmt)
c.Assert(err, IsNil)
p, err = logicalOptimize(ctx, builder.optFlag, p.(LogicalPlan))
Expand All @@ -1685,7 +1685,7 @@ func (s *testPlanSuite) TestSimplyOuterJoinWithOnlyOuterExpr(c *C) {
stmt, err := s.ParseOneStmt(sql, "", "")
c.Assert(err, IsNil)
Preprocess(s.ctx, stmt, s.is)
builder, _ := NewPlanBuilder(MockContext(), s.is, &hint.BlockHintProcessor{})
builder, _ := NewPlanBuilder().Init(MockContext(), s.is, &hint.BlockHintProcessor{})
p, err := builder.Build(ctx, stmt)
c.Assert(err, IsNil)
p, err = logicalOptimize(ctx, builder.optFlag, p.(LogicalPlan))
Expand Down
2 changes: 1 addition & 1 deletion planner/core/memtable_predicate_extractor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (s *extractorSuite) getLogicalMemTable(c *C, se session.Session, parser *pa
c.Assert(err, IsNil)

ctx := context.Background()
builder, _ := plannercore.NewPlanBuilder(se, s.dom.InfoSchema(), &hint.BlockHintProcessor{})
builder, _ := plannercore.NewPlanBuilder().Init(se, s.dom.InfoSchema(), &hint.BlockHintProcessor{})
plan, err := builder.Build(ctx, stmt)
c.Assert(err, IsNil)

Expand Down
2 changes: 1 addition & 1 deletion planner/core/optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ type logicalOptRule interface {
func BuildLogicalPlan(ctx context.Context, sctx sessionctx.Context, node ast.Node, is infoschema.InfoSchema) (Plan, types.NameSlice, error) {
sctx.GetSessionVars().PlanID = 0
sctx.GetSessionVars().PlanColumnID = 0
builder, _ := NewPlanBuilder(sctx, is, &utilhint.BlockHintProcessor{})
builder, _ := NewPlanBuilder().Init(sctx, is, &utilhint.BlockHintProcessor{})
p, err := builder.Build(ctx, node)
if err != nil {
return nil, nil, err
Expand Down
48 changes: 37 additions & 11 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,8 @@ type PlanBuilder struct {
// 1. use `inside insert`, `update`, `delete` or `select for update` statement
// 2. isolation level is RC
isForUpdateRead bool
// disableSubQueryPreprocessing indicates whether to pre-process uncorrelated sub-queries in rewriting stage.
disableSubQueryPreprocessing bool
}

type handleColHelper struct {
Expand Down Expand Up @@ -571,24 +573,48 @@ func (b *PlanBuilder) popSelectOffset() {
b.selectOffset = b.selectOffset[:len(b.selectOffset)-1]
}

// NewPlanBuilder creates a new PlanBuilder. Return the original PlannerSelectBlockAsName as well, callers decide if
// PlanBuilderOpt is used to adjust the plan builder.
type PlanBuilderOpt interface {
Apply(builder *PlanBuilder)
}

// PlanBuilderOptNoExecution means the plan builder should not run any executor during Build().
type PlanBuilderOptNoExecution struct{}

// Apply implements the interface PlanBuilderOpt.
func (p PlanBuilderOptNoExecution) Apply(builder *PlanBuilder) {
builder.disableSubQueryPreprocessing = true
}

// NewPlanBuilder creates a new PlanBuilder.
func NewPlanBuilder(opts ...PlanBuilderOpt) *PlanBuilder {
builder := &PlanBuilder{
colMapper: make(map[*ast.ColumnNameExpr]int),
handleHelper: &handleColHelper{id2HandleMapStack: make([]map[int64][]HandleCols, 0)},
correlatedAggMapper: make(map[*ast.AggregateFuncExpr]*expression.CorrelatedColumn),
}
for _, opt := range opts {
opt.Apply(builder)
}
return builder
}

// Init initialize a PlanBuilder.
// Return the original PlannerSelectBlockAsName as well, callers decide if
// PlannerSelectBlockAsName should be restored after using this builder.
func NewPlanBuilder(sctx sessionctx.Context, is infoschema.InfoSchema, processor *hint.BlockHintProcessor) (*PlanBuilder, []ast.HintTable) {
func (b *PlanBuilder) Init(sctx sessionctx.Context, is infoschema.InfoSchema, processor *hint.BlockHintProcessor) (*PlanBuilder, []ast.HintTable) {
savedBlockNames := sctx.GetSessionVars().PlannerSelectBlockAsName
if processor == nil {
sctx.GetSessionVars().PlannerSelectBlockAsName = nil
} else {
sctx.GetSessionVars().PlannerSelectBlockAsName = make([]ast.HintTable, processor.MaxSelectStmtOffset()+1)
}
return &PlanBuilder{
ctx: sctx,
is: is,
colMapper: make(map[*ast.ColumnNameExpr]int),
handleHelper: &handleColHelper{id2HandleMapStack: make([]map[int64][]HandleCols, 0)},
hintProcessor: processor,
correlatedAggMapper: make(map[*ast.AggregateFuncExpr]*expression.CorrelatedColumn),
isForUpdateRead: sctx.GetSessionVars().IsPessimisticReadConsistency(),
}, savedBlockNames

b.ctx = sctx
b.is = is
b.hintProcessor = processor
b.isForUpdateRead = sctx.GetSessionVars().IsPessimisticReadConsistency()
return b, savedBlockNames
}

// Build builds the ast node to a Plan.
Expand Down
4 changes: 2 additions & 2 deletions planner/core/planbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (s *testPlanBuilderSuite) TestGetPathByIndexName(c *C) {
}

func (s *testPlanBuilderSuite) TestRewriterPool(c *C) {
builder, _ := NewPlanBuilder(MockContext(), nil, &hint.BlockHintProcessor{})
builder, _ := NewPlanBuilder().Init(MockContext(), nil, &hint.BlockHintProcessor{})

// Make sure PlanBuilder.getExpressionRewriter() provides clean rewriter from pool.
// First, pick one rewriter from the pool and make it dirty.
Expand Down Expand Up @@ -168,7 +168,7 @@ func (s *testPlanBuilderSuite) TestDisableFold(c *C) {
stmt := st.(*ast.SelectStmt)
expr := stmt.Fields.Fields[0].Expr

builder, _ := NewPlanBuilder(ctx, nil, &hint.BlockHintProcessor{})
builder, _ := NewPlanBuilder().Init(ctx, nil, &hint.BlockHintProcessor{})
builder.rewriterCounter++
rewriter := builder.getExpressionRewriter(context.TODO(), nil)
c.Assert(rewriter, NotNil)
Expand Down
2 changes: 1 addition & 1 deletion planner/core/prepare_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func (s *testPlanSerialSuite) TestPrepareCacheDeferredFunction(c *C) {
stmt, err := s.ParseOneStmt(sql1, "", "")
c.Check(err, IsNil)
is := tk.Se.GetSessionVars().TxnCtx.InfoSchema.(infoschema.InfoSchema)
builder, _ := core.NewPlanBuilder(tk.Se, is, &hint.BlockHintProcessor{})
builder, _ := core.NewPlanBuilder().Init(tk.Se, is, &hint.BlockHintProcessor{})
p, err := builder.Build(ctx, stmt)
c.Check(err, IsNil)
execPlan, ok := p.(*core.Execute)
Expand Down
2 changes: 1 addition & 1 deletion planner/core/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (s *testStatsSuite) TestGroupNDVs(c *C) {
stmt, err := s.ParseOneStmt(tt, "", "")
c.Assert(err, IsNil, comment)
core.Preprocess(tk.Se, stmt, is)
builder, _ := core.NewPlanBuilder(tk.Se, is, &hint.BlockHintProcessor{})
builder, _ := core.NewPlanBuilder().Init(tk.Se, is, &hint.BlockHintProcessor{})
p, err := builder.Build(ctx, stmt)
c.Assert(err, IsNil, comment)
p, err = core.LogicalOptimize(ctx, builder.GetOptFlag(), p.(core.LogicalPlan))
Expand Down
4 changes: 2 additions & 2 deletions planner/optimize.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func optimize(ctx context.Context, sctx sessionctx.Context, node ast.Node, is in
sctx.GetSessionVars().PlanColumnID = 0
hintProcessor := &hint.BlockHintProcessor{Ctx: sctx}
node.Accept(hintProcessor)
builder, _ := plannercore.NewPlanBuilder(sctx, is, hintProcessor)
builder, _ := plannercore.NewPlanBuilder().Init(sctx, is, hintProcessor)

// reset fields about rewrite
sctx.GetSessionVars().RewritePhaseInfo.Reset()
Expand Down Expand Up @@ -420,7 +420,7 @@ func OptimizeExecStmt(ctx context.Context, sctx sessionctx.Context,
execAst *ast.ExecuteStmt, is infoschema.InfoSchema) (plannercore.Plan, error) {
defer trace.StartRegion(ctx, "Optimize").End()
var err error
builder, _ := plannercore.NewPlanBuilder(sctx, is, nil)
builder, _ := plannercore.NewPlanBuilder().Init(sctx, is, nil)
p, err := builder.Build(ctx, execAst)
if err != nil {
return nil, err
Expand Down

0 comments on commit 14066b2

Please sign in to comment.