Skip to content

Commit

Permalink
planner: classify logical union all into a separate file for later pk…
Browse files Browse the repository at this point in the history
…g move (#54299)

ref #51664, ref #52714
  • Loading branch information
AilinKid authored Jun 29, 2024
1 parent f0a5b1f commit 7df4f66
Show file tree
Hide file tree
Showing 9 changed files with 197 additions and 120 deletions.
1 change: 1 addition & 0 deletions pkg/planner/core/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ go_library(
"logical_plans.go",
"logical_sort.go",
"logical_top_n.go",
"logical_union_all.go",
"memtable_predicate_extractor.go",
"mock.go",
"optimizer.go",
Expand Down
3 changes: 1 addition & 2 deletions pkg/planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -3589,8 +3589,7 @@ func getLockPhysicalPlans(p *LogicalLock, prop *property.PhysicalProperty) ([]ba
return []base.PhysicalPlan{lock}, true, nil
}

// ExhaustPhysicalPlans implements LogicalPlan interface.
func (p *LogicalUnionAll) ExhaustPhysicalPlans(prop *property.PhysicalProperty) ([]base.PhysicalPlan, bool, error) {
func exhaustUnionAllPhysicalPlans(p *LogicalUnionAll, prop *property.PhysicalProperty) ([]base.PhysicalPlan, bool, error) {
// TODO: UnionAll can not pass any order, but we can change it to sort merge to keep order.
if !prop.IsSortItemEmpty() || (prop.IsFlashProp() && prop.TaskTp != property.MppTaskType) {
return nil, true, nil
Expand Down
6 changes: 0 additions & 6 deletions pkg/planner/core/logical_initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,6 @@ func (p LogicalExpand) Init(ctx base.PlanContext, offset int) *LogicalExpand {
return &p
}

// Init initializes LogicalUnionAll.
func (p LogicalUnionAll) Init(ctx base.PlanContext, offset int) *LogicalUnionAll {
p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeUnion, &p, offset)
return &p
}

// Init initializes LogicalPartitionUnionAll.
func (p LogicalPartitionUnionAll) Init(ctx base.PlanContext, offset int) *LogicalPartitionUnionAll {
p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypePartitionUnion, &p, offset)
Expand Down
5 changes: 0 additions & 5 deletions pkg/planner/core/logical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -1644,11 +1644,6 @@ func (p *LogicalIndexScan) getPKIsHandleCol(schema *expression.Schema) *expressi
return getPKIsHandleColFromSchema(p.Columns, schema, p.Source.TableInfo.PKIsHandle)
}

// LogicalUnionAll represents LogicalUnionAll plan.
type LogicalUnionAll struct {
logicalop.LogicalSchemaProducer
}

// LogicalPartitionUnionAll represents the LogicalUnionAll plan is for partition table.
type LogicalPartitionUnionAll struct {
LogicalUnionAll
Expand Down
195 changes: 195 additions & 0 deletions pkg/planner/core/logical_union_all.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package core

import (
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/planner/core/base"
"github.com/pingcap/tidb/pkg/planner/core/operator/logicalop"
"github.com/pingcap/tidb/pkg/planner/property"
"github.com/pingcap/tidb/pkg/planner/util"
"github.com/pingcap/tidb/pkg/planner/util/optimizetrace"
"github.com/pingcap/tidb/pkg/planner/util/optimizetrace/logicaltrace"
"github.com/pingcap/tidb/pkg/planner/util/utilfuncp"
"github.com/pingcap/tidb/pkg/util/plancodec"
)

// LogicalUnionAll represents LogicalUnionAll plan.
type LogicalUnionAll struct {
logicalop.LogicalSchemaProducer
}

// Init initializes LogicalUnionAll.
func (p LogicalUnionAll) Init(ctx base.PlanContext, offset int) *LogicalUnionAll {
p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeUnion, &p, offset)
return &p
}

// *************************** start implementation of logicalPlan interface ***************************

// HashCode inherits BaseLogicalPlan.LogicalPlan.<0th> implementation.

// PredicatePushDown implements base.LogicalPlan.<1st> interface.
func (p *LogicalUnionAll) PredicatePushDown(predicates []expression.Expression, opt *optimizetrace.LogicalOptimizeOp) (ret []expression.Expression, retPlan base.LogicalPlan) {
for i, proj := range p.Children() {
newExprs := make([]expression.Expression, 0, len(predicates))
newExprs = append(newExprs, predicates...)
retCond, newChild := proj.PredicatePushDown(newExprs, opt)
utilfuncp.AddSelection(p, newChild, retCond, i, opt)
}
return nil, p
}

// PruneColumns implements base.LogicalPlan.<2nd> interface.
func (p *LogicalUnionAll) PruneColumns(parentUsedCols []*expression.Column, opt *optimizetrace.LogicalOptimizeOp) (base.LogicalPlan, error) {
eCtx := p.SCtx().GetExprCtx().GetEvalCtx()
used := expression.GetUsedList(eCtx, parentUsedCols, p.Schema())
hasBeenUsed := false
for i := range used {
hasBeenUsed = hasBeenUsed || used[i]
if hasBeenUsed {
break
}
}
if !hasBeenUsed {
parentUsedCols = make([]*expression.Column, len(p.Schema().Columns))
copy(parentUsedCols, p.Schema().Columns)
for i := range used {
used[i] = true
}
}
var err error
for i, child := range p.Children() {
p.Children()[i], err = child.PruneColumns(parentUsedCols, opt)
if err != nil {
return nil, err
}
}

prunedColumns := make([]*expression.Column, 0)
for i := len(used) - 1; i >= 0; i-- {
if !used[i] {
prunedColumns = append(prunedColumns, p.Schema().Columns[i])
p.Schema().Columns = append(p.Schema().Columns[:i], p.Schema().Columns[i+1:]...)
}
}
logicaltrace.AppendColumnPruneTraceStep(p, prunedColumns, opt)
if hasBeenUsed {
// It's possible that the child operator adds extra columns to the schema.
// Currently, (*LogicalAggregation).PruneColumns() might do this.
// But we don't need such columns, so we add an extra Projection to prune this column when this happened.
for i, child := range p.Children() {
if p.Schema().Len() < child.Schema().Len() {
schema := p.Schema().Clone()
exprs := make([]expression.Expression, len(p.Schema().Columns))
for j, col := range schema.Columns {
exprs[j] = col
}
proj := LogicalProjection{Exprs: exprs, AvoidColumnEvaluator: true}.Init(p.SCtx(), p.QueryBlockOffset())
proj.SetSchema(schema)

proj.SetChildren(child)
p.Children()[i] = proj
}
}
}
return p, nil
}

// FindBestTask inherits BaseLogicalPlan.LogicalPlan.<3rd> implementation.

// BuildKeyInfo inherits BaseLogicalPlan.LogicalPlan.<4th> implementation.

// PushDownTopN implements the base.LogicalPlan.<5th> interface.
func (p *LogicalUnionAll) PushDownTopN(topNLogicalPlan base.LogicalPlan, opt *optimizetrace.LogicalOptimizeOp) base.LogicalPlan {
var topN *LogicalTopN
if topNLogicalPlan != nil {
topN = topNLogicalPlan.(*LogicalTopN)
}
for i, child := range p.Children() {
var newTopN *LogicalTopN
if topN != nil {
newTopN = LogicalTopN{Count: topN.Count + topN.Offset, PreferLimitToCop: topN.PreferLimitToCop}.Init(p.SCtx(), topN.QueryBlockOffset())
for _, by := range topN.ByItems {
newTopN.ByItems = append(newTopN.ByItems, &util.ByItems{Expr: by.Expr, Desc: by.Desc})
}
// newTopN to push down Union's child
appendNewTopNTraceStep(topN, p, opt)
}
p.Children()[i] = child.PushDownTopN(newTopN, opt)
}
if topN != nil {
return topN.AttachChild(p, opt)
}
return p
}

// DeriveTopN inherits BaseLogicalPlan.LogicalPlan.<6th> implementation.

// PredicateSimplification inherits BaseLogicalPlan.LogicalPlan.<7th> implementation.

// ConstantPropagation inherits BaseLogicalPlan.LogicalPlan.<8th> implementation.

// PullUpConstantPredicates inherits BaseLogicalPlan.LogicalPlan.<9th> implementation.

// RecursiveDeriveStats inherits BaseLogicalPlan.LogicalPlan.<10th> implementation.

// DeriveStats implement base.LogicalPlan.<11th> interface.
func (p *LogicalUnionAll) DeriveStats(childStats []*property.StatsInfo, selfSchema *expression.Schema, _ []*expression.Schema, _ [][]*expression.Column) (*property.StatsInfo, error) {
if p.StatsInfo() != nil {
return p.StatsInfo(), nil
}
p.SetStats(&property.StatsInfo{
ColNDVs: make(map[int64]float64, selfSchema.Len()),
})
for _, childProfile := range childStats {
p.StatsInfo().RowCount += childProfile.RowCount
for _, col := range selfSchema.Columns {
p.StatsInfo().ColNDVs[col.UniqueID] += childProfile.ColNDVs[col.UniqueID]
}
}
return p.StatsInfo(), nil
}

// ExtractColGroups inherits BaseLogicalPlan.LogicalPlan.<12th> implementation.

// PreparePossibleProperties implements base.LogicalPlan.<13th> interface.

// ExhaustPhysicalPlans implements base.LogicalPlan.<14th> interface.
func (p *LogicalUnionAll) ExhaustPhysicalPlans(prop *property.PhysicalProperty) ([]base.PhysicalPlan, bool, error) {
return exhaustUnionAllPhysicalPlans(p, prop)
}

// ExtractCorrelatedCols inherits BaseLogicalPlan.LogicalPlan.<15th> implementation.

// MaxOneRow inherits BaseLogicalPlan.LogicalPlan.<16th> implementation.

// Children inherits BaseLogicalPlan.LogicalPlan.<17th> implementation.

// SetChildren inherits BaseLogicalPlan.LogicalPlan.<18th> implementation.

// SetChild inherits BaseLogicalPlan.LogicalPlan.<19th> implementation.

// RollBackTaskMap inherits BaseLogicalPlan.LogicalPlan.<20th> implementation.

// CanPushToCop inherits BaseLogicalPlan.LogicalPlan.<21st> implementation.

// ExtractFD inherits BaseLogicalPlan.LogicalPlan.<22nd> implementation.

// GetBaseLogicalPlan inherits BaseLogicalPlan.LogicalPlan.<23rd> implementation.

// ConvertOuterToInnerJoin inherits BaseLogicalPlan.LogicalPlan.<24th> implementation.

// *************************** end implementation of logicalPlan interface ***************************
55 changes: 0 additions & 55 deletions pkg/planner/core/rule_column_pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,61 +140,6 @@ func pruneByItems(p base.LogicalPlan, old []*util.ByItems, opt *optimizetrace.Lo
return
}

// PruneColumns implements base.LogicalPlan interface.
func (p *LogicalUnionAll) PruneColumns(parentUsedCols []*expression.Column, opt *optimizetrace.LogicalOptimizeOp) (base.LogicalPlan, error) {
used := expression.GetUsedList(p.SCtx().GetExprCtx().GetEvalCtx(), parentUsedCols, p.Schema())
hasBeenUsed := false
for i := range used {
hasBeenUsed = hasBeenUsed || used[i]
if hasBeenUsed {
break
}
}
if !hasBeenUsed {
parentUsedCols = make([]*expression.Column, len(p.Schema().Columns))
copy(parentUsedCols, p.Schema().Columns)
for i := range used {
used[i] = true
}
}
var err error
for i, child := range p.Children() {
p.Children()[i], err = child.PruneColumns(parentUsedCols, opt)
if err != nil {
return nil, err
}
}

prunedColumns := make([]*expression.Column, 0)
for i := len(used) - 1; i >= 0; i-- {
if !used[i] {
prunedColumns = append(prunedColumns, p.Schema().Columns[i])
p.Schema().Columns = append(p.Schema().Columns[:i], p.Schema().Columns[i+1:]...)
}
}
logicaltrace.AppendColumnPruneTraceStep(p, prunedColumns, opt)
if hasBeenUsed {
// It's possible that the child operator adds extra columns to the schema.
// Currently, (*LogicalAggregation).PruneColumns() might do this.
// But we don't need such columns, so we add an extra Projection to prune this column when this happened.
for i, child := range p.Children() {
if p.Schema().Len() < child.Schema().Len() {
schema := p.Schema().Clone()
exprs := make([]expression.Expression, len(p.Schema().Columns))
for j, col := range schema.Columns {
exprs[j] = col
}
proj := LogicalProjection{Exprs: exprs, AvoidColumnEvaluator: true}.Init(p.SCtx(), p.QueryBlockOffset())
proj.SetSchema(schema)

proj.SetChildren(child)
p.Children()[i] = proj
}
}
}
return p, nil
}

// PruneColumns implements base.LogicalPlan interface.
func (p *LogicalUnionScan) PruneColumns(parentUsedCols []*expression.Column, opt *optimizetrace.LogicalOptimizeOp) (base.LogicalPlan, error) {
for i := 0; i < p.handleCols.NumCols(); i++ {
Expand Down
11 changes: 0 additions & 11 deletions pkg/planner/core/rule_predicate_push_down.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,17 +420,6 @@ func (p *LogicalProjection) PredicatePushDown(predicates []expression.Expression
return append(remained, canNotBePushed...), child
}

// PredicatePushDown implements base.LogicalPlan PredicatePushDown interface.
func (p *LogicalUnionAll) PredicatePushDown(predicates []expression.Expression, opt *optimizetrace.LogicalOptimizeOp) (ret []expression.Expression, retPlan base.LogicalPlan) {
for i, proj := range p.Children() {
newExprs := make([]expression.Expression, 0, len(predicates))
newExprs = append(newExprs, predicates...)
retCond, newChild := proj.PredicatePushDown(newExprs, opt)
utilfuncp.AddSelection(p, newChild, retCond, i, opt)
}
return nil, p
}

// PredicatePushDown implements base.LogicalPlan PredicatePushDown interface.
func (p *LogicalMaxOneRow) PredicatePushDown(predicates []expression.Expression, opt *optimizetrace.LogicalOptimizeOp) ([]expression.Expression, base.LogicalPlan) {
// MaxOneRow forbids any condition to push down.
Expand Down
24 changes: 0 additions & 24 deletions pkg/planner/core/rule_topn_push_down.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,30 +65,6 @@ func (p *LogicalCTE) PushDownTopN(topNLogicalPlan base.LogicalPlan, opt *optimiz
return p
}

// PushDownTopN implements the LogicalPlan interface.
func (p *LogicalUnionAll) PushDownTopN(topNLogicalPlan base.LogicalPlan, opt *optimizetrace.LogicalOptimizeOp) base.LogicalPlan {
var topN *LogicalTopN
if topNLogicalPlan != nil {
topN = topNLogicalPlan.(*LogicalTopN)
}
for i, child := range p.Children() {
var newTopN *LogicalTopN
if topN != nil {
newTopN = LogicalTopN{Count: topN.Count + topN.Offset, PreferLimitToCop: topN.PreferLimitToCop}.Init(p.SCtx(), topN.QueryBlockOffset())
for _, by := range topN.ByItems {
newTopN.ByItems = append(newTopN.ByItems, &util.ByItems{Expr: by.Expr, Desc: by.Desc})
}
// newTopN to push down Union's child
appendNewTopNTraceStep(topN, p, opt)
}
p.Children()[i] = child.PushDownTopN(newTopN, opt)
}
if topN != nil {
return topN.AttachChild(p, opt)
}
return p
}

// PushDownTopN implements LogicalPlan interface.
func (p *LogicalProjection) PushDownTopN(topNLogicalPlan base.LogicalPlan, opt *optimizetrace.LogicalOptimizeOp) base.LogicalPlan {
var topN *LogicalTopN
Expand Down
17 changes: 0 additions & 17 deletions pkg/planner/core/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,23 +555,6 @@ func (p *LogicalSelection) DeriveStats(childStats []*property.StatsInfo, _ *expr
return p.StatsInfo(), nil
}

// DeriveStats implement LogicalPlan DeriveStats interface.
func (p *LogicalUnionAll) DeriveStats(childStats []*property.StatsInfo, selfSchema *expression.Schema, _ []*expression.Schema, _ [][]*expression.Column) (*property.StatsInfo, error) {
if p.StatsInfo() != nil {
return p.StatsInfo(), nil
}
p.SetStats(&property.StatsInfo{
ColNDVs: make(map[int64]float64, selfSchema.Len()),
})
for _, childProfile := range childStats {
p.StatsInfo().RowCount += childProfile.RowCount
for _, col := range selfSchema.Columns {
p.StatsInfo().ColNDVs[col.UniqueID] += childProfile.ColNDVs[col.UniqueID]
}
}
return p.StatsInfo(), nil
}

func deriveLimitStats(childProfile *property.StatsInfo, limitCount float64) *property.StatsInfo {
stats := &property.StatsInfo{
RowCount: math.Min(limitCount, childProfile.RowCount),
Expand Down

0 comments on commit 7df4f66

Please sign in to comment.