Skip to content

Commit

Permalink
planner: move base physical plan into physicalop pkg. (#55131)
Browse files Browse the repository at this point in the history
ref #51664, ref #52714
  • Loading branch information
AilinKid authored Aug 9, 2024
1 parent 876268f commit 240702e
Show file tree
Hide file tree
Showing 31 changed files with 1,017 additions and 899 deletions.
2 changes: 2 additions & 0 deletions pkg/planner/core/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ go_library(
"//pkg/planner/core/metrics",
"//pkg/planner/core/operator/baseimpl",
"//pkg/planner/core/operator/logicalop",
"//pkg/planner/core/operator/physicalop",
"//pkg/planner/core/rule",
"//pkg/planner/core/rule/util",
"//pkg/planner/funcdep",
Expand Down Expand Up @@ -290,6 +291,7 @@ go_test(
"//pkg/planner",
"//pkg/planner/core/base",
"//pkg/planner/core/operator/logicalop",
"//pkg/planner/core/operator/physicalop",
"//pkg/planner/property",
"//pkg/planner/util",
"//pkg/planner/util/coretestsdk",
Expand Down
5 changes: 3 additions & 2 deletions pkg/planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/planner/core/base"
"github.com/pingcap/tidb/pkg/planner/core/operator/physicalop"
"github.com/pingcap/tidb/pkg/planner/property"
"github.com/pingcap/tidb/pkg/planner/util"
"github.com/pingcap/tidb/pkg/planner/util/costusage"
Expand Down Expand Up @@ -307,7 +308,7 @@ func (s *Simple) MemoryUsage() (sum int64) {
//
// Used for simple statements executing in coprocessor.
type PhysicalSimpleWrapper struct {
basePhysicalPlan
physicalop.BasePhysicalPlan
Inner Simple
}

Expand All @@ -317,7 +318,7 @@ func (p *PhysicalSimpleWrapper) MemoryUsage() (sum int64) {
return
}

sum = p.basePhysicalPlan.MemoryUsage() + p.Inner.MemoryUsage()
sum = p.BasePhysicalPlan.MemoryUsage() + p.Inner.MemoryUsage()
return
}

Expand Down
6 changes: 5 additions & 1 deletion pkg/planner/core/core_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func init() {
utilfuncp.GetStreamAggs = getStreamAggs
utilfuncp.GetHashAggs = getHashAggs
utilfuncp.PruneByItems = pruneByItems
utilfuncp.PushDownTopNForBaseLogicalPlan = pushDownTopNForBaseLogicalPlan
utilfuncp.FindBestTask4LogicalShow = findBestTask4LogicalShow
utilfuncp.FindBestTask4LogicalCTETable = findBestTask4LogicalCTETable
utilfuncp.FindBestTask4LogicalMemTable = findBestTask4LogicalMemTable
Expand All @@ -50,8 +51,11 @@ func init() {
utilfuncp.ExhaustPhysicalPlans4LogicalUnionScan = exhaustPhysicalPlans4LogicalUnionScan
utilfuncp.ExhaustPhysicalPlans4LogicalProjection = exhaustPhysicalPlans4LogicalProjection

utilfuncp.GetActualProbeCntFromProbeParents = getActualProbeCntFromProbeParents
utilfuncp.GetEstimatedProbeCntFromProbeParents = getEstimatedProbeCntFromProbeParents
utilfuncp.AppendCandidate4PhysicalOptimizeOp = appendCandidate4PhysicalOptimizeOp
utilfuncp.PushDownTopNForBaseLogicalPlan = pushDownTopNForBaseLogicalPlan

utilfuncp.AttachPlan2Task = attachPlan2Task

// For mv index init.
cardinality.GetTblInfoForUsedStatsByPhysicalID = getTblInfoForUsedStatsByPhysicalID
Expand Down
6 changes: 3 additions & 3 deletions pkg/planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func GetMergeJoin(p *LogicalJoin, prop *property.PhysicalProperty, schema *expre
reqProps[0].ExpectedCnt = leftStatsInfo.RowCount * expCntScale
reqProps[1].ExpectedCnt = rightStatsInfo.RowCount * expCntScale
}
mergeJoin.childrenReqProps = reqProps
mergeJoin.SetChildrenReqProps(reqProps)
_, desc := prop.AllSameOrder()
mergeJoin.Desc = desc
joins = append(joins, mergeJoin)
Expand Down Expand Up @@ -356,7 +356,7 @@ func getEnforcedMergeJoin(p *LogicalJoin, prop *property.PhysicalProperty, schem
}
enforcedPhysicalMergeJoin := PhysicalMergeJoin{basePhysicalJoin: baseJoin, Desc: desc}.Init(p.SCtx(), statsInfo.ScaleByExpectCnt(prop.ExpectedCnt), p.QueryBlockOffset())
enforcedPhysicalMergeJoin.SetSchema(schema)
enforcedPhysicalMergeJoin.childrenReqProps = []*property.PhysicalProperty{lProp, rProp}
enforcedPhysicalMergeJoin.SetChildrenReqProps([]*property.PhysicalProperty{lProp, rProp})
enforcedPhysicalMergeJoin.initCompareFuncs()
return []base.PhysicalPlan{enforcedPhysicalMergeJoin}
}
Expand Down Expand Up @@ -2965,7 +2965,7 @@ func exhaustPhysicalPlans4LogicalCTE(p *LogicalCTE, prop *property.PhysicalPrope
}.Init(p.SCtx(), p.StatsInfo())
}
pcte.SetSchema(p.Schema())
pcte.childrenReqProps = []*property.PhysicalProperty{prop.CloneEssentialFields()}
pcte.SetChildrenReqProps([]*property.PhysicalProperty{prop.CloneEssentialFields()})
return []base.PhysicalPlan{(*PhysicalCTEStorage)(pcte)}, true, nil
}

Expand Down
13 changes: 7 additions & 6 deletions pkg/planner/core/find_best_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/planner/core/base"
"github.com/pingcap/tidb/pkg/planner/core/operator/logicalop"
"github.com/pingcap/tidb/pkg/planner/core/operator/physicalop"
"github.com/pingcap/tidb/pkg/planner/property"
"github.com/pingcap/tidb/pkg/planner/util/optimizetrace"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -76,16 +77,16 @@ func (p mockLogicalPlan4Test) Init(ctx base.PlanContext) *mockLogicalPlan4Test {
func (p *mockLogicalPlan4Test) getPhysicalPlan1(prop *property.PhysicalProperty) base.PhysicalPlan {
physicalPlan1 := mockPhysicalPlan4Test{planType: 1}.Init(p.SCtx())
physicalPlan1.SetStats(&property.StatsInfo{RowCount: 1})
physicalPlan1.childrenReqProps = make([]*property.PhysicalProperty, 1)
physicalPlan1.childrenReqProps[0] = prop.CloneEssentialFields()
physicalPlan1.SetChildrenReqProps(make([]*property.PhysicalProperty, 1))
physicalPlan1.SetXthChildReqProps(0, prop.CloneEssentialFields())
return physicalPlan1
}

func (p *mockLogicalPlan4Test) getPhysicalPlan2(prop *property.PhysicalProperty) base.PhysicalPlan {
physicalPlan2 := mockPhysicalPlan4Test{planType: 2}.Init(p.SCtx())
physicalPlan2.SetStats(&property.StatsInfo{RowCount: 1})
physicalPlan2.childrenReqProps = make([]*property.PhysicalProperty, 1)
physicalPlan2.childrenReqProps[0] = property.NewPhysicalProperty(prop.TaskTp, nil, false, prop.ExpectedCnt, false)
physicalPlan2.SetChildrenReqProps(make([]*property.PhysicalProperty, 1))
physicalPlan2.SetXthChildReqProps(0, property.NewPhysicalProperty(prop.TaskTp, nil, false, prop.ExpectedCnt, false))
return physicalPlan2
}

Expand Down Expand Up @@ -115,14 +116,14 @@ func (p *mockLogicalPlan4Test) ExhaustPhysicalPlans(prop *property.PhysicalPrope
}

type mockPhysicalPlan4Test struct {
basePhysicalPlan
physicalop.BasePhysicalPlan
// 1 or 2 for physicalPlan1 or physicalPlan2.
// See the comment of mockLogicalPlan4Test.
planType int
}

func (p mockPhysicalPlan4Test) Init(ctx base.PlanContext) *mockPhysicalPlan4Test {
p.basePhysicalPlan = newBasePhysicalPlan(ctx, "mockPlan", &p, 0)
p.BasePhysicalPlan = physicalop.NewBasePhysicalPlan(ctx, "mockPlan", &p, 0)
return &p
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/planner/core/flat_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ func (f *FlatPhysicalPlan) flattenRecursively(p base.Plan, info *operatorCtx, ta
// We shallow copy the PhysicalCTE here because we don't want the probeParents (see comments in PhysicalPlan
// for details) to affect the row count display of the independent CTE plan tree.
copiedCTE := *plan
copiedCTE.probeParents = nil
copiedCTE.SetProbeParents(nil)
if info.isRoot {
// If it's executed in TiDB, we need to record it since we don't have producer and consumer
f.ctesToFlatten = append(f.ctesToFlatten, &copiedCTE)
Expand Down
5 changes: 3 additions & 2 deletions pkg/planner/core/foreign_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ import (
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/planner/core/base"
"github.com/pingcap/tidb/pkg/planner/core/operator/physicalop"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/util/dbterror/plannererrors"
)

// FKCheck indicates the foreign key constraint checker.
type FKCheck struct {
basePhysicalPlan
physicalop.BasePhysicalPlan
FK *model.FKInfo
ReferredFK *model.ReferredFKInfo
Tbl table.Table
Expand All @@ -46,7 +47,7 @@ type FKCheck struct {

// FKCascade indicates the foreign key constraint cascade behaviour.
type FKCascade struct {
basePhysicalPlan
physicalop.BasePhysicalPlan
Tp FKCascadeType
ReferredFK *model.ReferredFKInfo
ChildTable table.Table
Expand Down
22 changes: 11 additions & 11 deletions pkg/planner/core/fragment.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func (f *Fragment) init(p base.PhysicalPlan) error {
f.TableScan = x
case *PhysicalExchangeReceiver:
// TODO: after we support partial merge, we should check whether all the target exchangeReceiver is same.
f.singleton = f.singleton || x.children[0].(*PhysicalExchangeSender).ExchangeType == tipb.ExchangeType_PassThrough
f.singleton = f.singleton || x.Children()[0].(*PhysicalExchangeSender).ExchangeType == tipb.ExchangeType_PassThrough
f.ExchangeReceivers = append(f.ExchangeReceivers, x)
case *PhysicalUnionAll:
return errors.New("unexpected union all detected")
Expand Down Expand Up @@ -284,12 +284,12 @@ func (e *mppTaskGenerator) untwistPlanAndRemoveUnionAll(stack []base.PhysicalPla
e.CTEGroups[cte.CTE.IDForStorage].CTEReader = append(e.CTEGroups[cte.CTE.IDForStorage].CTEReader, cte)
}
case *PhysicalHashJoin:
stack = append(stack, x.children[1-x.InnerChildIdx])
stack = append(stack, x.Children()[1-x.InnerChildIdx])
err := e.untwistPlanAndRemoveUnionAll(stack, forest)
stack = stack[:len(stack)-1]
return errors.Trace(err)
case *PhysicalUnionAll:
for _, ch := range x.children {
for _, ch := range x.Children() {
stack = append(stack, ch)
err := e.untwistPlanAndRemoveUnionAll(stack, forest)
stack = stack[:len(stack)-1]
Expand All @@ -298,19 +298,19 @@ func (e *mppTaskGenerator) untwistPlanAndRemoveUnionAll(stack []base.PhysicalPla
}
}
case *PhysicalSequence:
lastChildIdx := len(x.children) - 1
lastChildIdx := len(x.Children()) - 1
// except the last child, those previous ones are all cte producer.
for i := 0; i < lastChildIdx; i++ {
if e.CTEGroups == nil {
e.CTEGroups = make(map[int]*cteGroupInFragment)
}
cteStorage := x.children[i].(*PhysicalCTEStorage)
cteStorage := x.Children()[i].(*PhysicalCTEStorage)
e.CTEGroups[cteStorage.CTE.IDForStorage] = &cteGroupInFragment{
CTEStorage: cteStorage,
CTEReader: make([]*PhysicalCTE, 0, 3),
}
}
stack = append(stack, x.children[lastChildIdx])
stack = append(stack, x.Children()[lastChildIdx])
err := e.untwistPlanAndRemoveUnionAll(stack, forest)
stack = stack[:len(stack)-1]
if err != nil {
Expand Down Expand Up @@ -397,7 +397,7 @@ func (e *mppTaskGenerator) generateMPPTasksForFragment(f *Fragment) (tasks []*kv
}
cteProducerTasks := make([]*kv.MPPTask, 0)
for _, cteR := range f.CTEReaders {
child := cteR.children[0]
child := cteR.Children()[0]
if _, ok := child.(*PhysicalProjection); ok {
child = child.Children()[0]
}
Expand Down Expand Up @@ -450,7 +450,7 @@ func (f *Fragment) flipCTEReader(currentPlan base.PhysicalPlan) {
func (e *mppTaskGenerator) generateTasksForCTEReader(cteReader *PhysicalCTE) (err error) {
group := e.CTEGroups[cteReader.CTE.IDForStorage]
if group.StorageFragments == nil {
group.CTEStorage.storageSender.SetChildren(group.CTEStorage.children...)
group.CTEStorage.storageSender.SetChildren(group.CTEStorage.Children()...)
group.StorageTasks, group.StorageFragments, err = e.generateMPPTasksForExchangeSender(group.CTEStorage.storageSender)
if err != nil {
return err
Expand All @@ -460,16 +460,16 @@ func (e *mppTaskGenerator) generateTasksForCTEReader(cteReader *PhysicalCTE) (er
receiver.Tasks = group.StorageTasks
receiver.frags = group.StorageFragments
cteReader.SetChildren(receiver)
receiver.SetChildren(group.CTEStorage.children[0])
receiver.SetChildren(group.CTEStorage.Children()[0])
inconsistenceNullable := false
for i, col := range cteReader.schema.Columns {
if mysql.HasNotNullFlag(col.RetType.GetFlag()) != mysql.HasNotNullFlag(group.CTEStorage.children[0].Schema().Columns[i].RetType.GetFlag()) {
if mysql.HasNotNullFlag(col.RetType.GetFlag()) != mysql.HasNotNullFlag(group.CTEStorage.Children()[0].Schema().Columns[i].RetType.GetFlag()) {
inconsistenceNullable = true
break
}
}
if inconsistenceNullable {
cols := group.CTEStorage.children[0].Schema().Clone().Columns
cols := group.CTEStorage.Children()[0].Schema().Clone().Columns
for i, col := range cols {
col.Index = i
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/planner/core/hint_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,28 +184,28 @@ func genHintsFromSingle(p base.PhysicalPlan, nodeType h.NodeType, storeType kv.S
})
}
case *PhysicalMergeJoin:
hint := genJoinMethodHintForSinglePhysicalJoin(p.SCtx(), h.HintSMJ, p.QueryBlockOffset(), nodeType, pp.children...)
hint := genJoinMethodHintForSinglePhysicalJoin(p.SCtx(), h.HintSMJ, p.QueryBlockOffset(), nodeType, pp.Children()...)
if hint != nil {
res = append(res, hint)
}
case *PhysicalHashJoin:
// TODO: support the hash_join_build and hash_join_probe hint for auto capture
hint := genJoinMethodHintForSinglePhysicalJoin(p.SCtx(), h.HintHJ, p.QueryBlockOffset(), nodeType, pp.children...)
hint := genJoinMethodHintForSinglePhysicalJoin(p.SCtx(), h.HintHJ, p.QueryBlockOffset(), nodeType, pp.Children()...)
if hint != nil {
res = append(res, hint)
}
case *PhysicalIndexJoin:
hint := genJoinMethodHintForSinglePhysicalJoin(p.SCtx(), h.HintINLJ, p.QueryBlockOffset(), nodeType, pp.children[pp.InnerChildIdx])
hint := genJoinMethodHintForSinglePhysicalJoin(p.SCtx(), h.HintINLJ, p.QueryBlockOffset(), nodeType, pp.Children()[pp.InnerChildIdx])
if hint != nil {
res = append(res, hint)
}
case *PhysicalIndexMergeJoin:
hint := genJoinMethodHintForSinglePhysicalJoin(p.SCtx(), h.HintINLMJ, p.QueryBlockOffset(), nodeType, pp.children[pp.InnerChildIdx])
hint := genJoinMethodHintForSinglePhysicalJoin(p.SCtx(), h.HintINLMJ, p.QueryBlockOffset(), nodeType, pp.Children()[pp.InnerChildIdx])
if hint != nil {
res = append(res, hint)
}
case *PhysicalIndexHashJoin:
hint := genJoinMethodHintForSinglePhysicalJoin(p.SCtx(), h.HintINLHJ, p.QueryBlockOffset(), nodeType, pp.children[pp.InnerChildIdx])
hint := genJoinMethodHintForSinglePhysicalJoin(p.SCtx(), h.HintINLHJ, p.QueryBlockOffset(), nodeType, pp.Children()[pp.InnerChildIdx])
if hint != nil {
res = append(res, hint)
}
Expand Down
Loading

0 comments on commit 240702e

Please sign in to comment.