Skip to content

Commit

Permalink
planner: introduce new cost formula for MPPJoins (#35643)
Browse files Browse the repository at this point in the history
ref #35240
  • Loading branch information
qw4990 authored Jun 22, 2022
1 parent 102db05 commit 56d004e
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 15 deletions.
28 changes: 19 additions & 9 deletions planner/core/plan_cost.go
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,7 @@ func (p *PhysicalApply) GetPlanCost(taskType property.TaskType, costFlag uint64)
}

// GetCost computes cost of merge join operator itself.
func (p *PhysicalMergeJoin) GetCost(lCnt, rCnt float64) float64 {
func (p *PhysicalMergeJoin) GetCost(lCnt, rCnt float64, costFlag uint64) float64 {
outerCnt := lCnt
innerCnt := rCnt
innerKeys := p.RightJoinKeys
Expand Down Expand Up @@ -766,6 +766,9 @@ func (p *PhysicalMergeJoin) GetCost(lCnt, rCnt float64) float64 {
numPairs = 0
}
}
if hasCostFlag(costFlag, CostFlagUseTrueCardinality) {
numPairs = getOperatorActRows(p)
}
sessVars := p.ctx.GetSessionVars()
probeCost := numPairs * sessVars.GetCPUFactor()
// Cost of evaluating outer filters.
Expand Down Expand Up @@ -795,13 +798,13 @@ func (p *PhysicalMergeJoin) GetPlanCost(taskType property.TaskType, costFlag uin
}
p.planCost += childCost
}
p.planCost += p.GetCost(getCardinality(p.children[0], costFlag), getCardinality(p.children[1], costFlag))
p.planCost += p.GetCost(getCardinality(p.children[0], costFlag), getCardinality(p.children[1], costFlag), costFlag)
p.planCostInit = true
return p.planCost, nil
}

// GetCost computes cost of hash join operator itself.
func (p *PhysicalHashJoin) GetCost(lCnt, rCnt float64) float64 {
func (p *PhysicalHashJoin) GetCost(lCnt, rCnt float64, isMPP bool, costFlag uint64) float64 {
buildCnt, probeCnt := lCnt, rCnt
build := p.children[0]
// Taking the right as the inner for right join or using the outer to build a hash table.
Expand All @@ -815,7 +818,11 @@ func (p *PhysicalHashJoin) GetCost(lCnt, rCnt float64) float64 {
rowSize := getAvgRowSize(build.statsInfo(), build.Schema())
spill := oomUseTmpStorage && memQuota > 0 && rowSize*buildCnt > float64(memQuota) && p.storeTp != kv.TiFlash
// Cost of building hash table.
cpuCost := buildCnt * sessVars.GetCPUFactor()
cpuFactor := sessVars.GetCPUFactor()
if isMPP && p.ctx.GetSessionVars().CostModelVersion == modelVer2 {
cpuFactor = sessVars.GetTiFlashCPUFactor() // use the dedicated TiFlash CPU Factor on modelVer2
}
cpuCost := buildCnt * cpuFactor
memoryCost := buildCnt * sessVars.GetMemoryFactor()
diskCost := buildCnt * sessVars.GetDiskFactor() * rowSize
// Number of matched row pairs regarding the equal join conditions.
Expand Down Expand Up @@ -845,16 +852,19 @@ func (p *PhysicalHashJoin) GetCost(lCnt, rCnt float64) float64 {
numPairs = 0
}
}
if hasCostFlag(costFlag, CostFlagUseTrueCardinality) {
numPairs = getOperatorActRows(p)
}
// Cost of querying hash table is cheap actually, so we just compute the cost of
// evaluating `OtherConditions` and joining row pairs.
probeCost := numPairs * sessVars.GetCPUFactor()
probeCost := numPairs * cpuFactor
probeDiskCost := numPairs * sessVars.GetDiskFactor() * rowSize
// Cost of evaluating outer filter.
if len(p.LeftConditions)+len(p.RightConditions) > 0 {
// Input outer count for the above compution should be adjusted by SelectionFactor.
probeCost *= SelectionFactor
probeDiskCost *= SelectionFactor
probeCost += probeCnt * sessVars.GetCPUFactor()
probeCost += probeCnt * cpuFactor
}
diskCost += probeDiskCost
probeCost /= float64(p.Concurrency)
Expand All @@ -864,9 +874,9 @@ func (p *PhysicalHashJoin) GetCost(lCnt, rCnt float64) float64 {
if p.UseOuterToBuild {
if spill {
// It runs in sequence when build data is on disk. See handleUnmatchedRowsFromHashTableInDisk
cpuCost += buildCnt * sessVars.GetCPUFactor()
cpuCost += buildCnt * cpuFactor
} else {
cpuCost += buildCnt * sessVars.GetCPUFactor() / float64(p.Concurrency)
cpuCost += buildCnt * cpuFactor / float64(p.Concurrency)
}
diskCost += buildCnt * sessVars.GetDiskFactor() * rowSize
}
Expand All @@ -892,7 +902,7 @@ func (p *PhysicalHashJoin) GetPlanCost(taskType property.TaskType, costFlag uint
}
p.planCost += childCost
}
p.planCost += p.GetCost(getCardinality(p.children[0], costFlag), getCardinality(p.children[1], costFlag))
p.planCost += p.GetCost(getCardinality(p.children[0], costFlag), getCardinality(p.children[1], costFlag), taskType == property.MppTaskType, costFlag)
p.planCostInit = true
return p.planCost, nil
}
Expand Down
8 changes: 4 additions & 4 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func (p *PhysicalHashJoin) attach2Task(tasks ...task) task {
p.SetChildren(lTask.plan(), rTask.plan())
task := &rootTask{
p: p,
cst: lTask.cost() + rTask.cost() + p.GetCost(lTask.count(), rTask.count()),
cst: lTask.cost() + rTask.cost() + p.GetCost(lTask.count(), rTask.count(), false, 0),
}
p.cost = task.cost()
return task
Expand Down Expand Up @@ -547,7 +547,7 @@ func (p *PhysicalHashJoin) attach2TaskForMpp(tasks ...task) task {
outerTask = rTask
}
task := &mppTask{
cst: lCost + rCost + p.GetCost(lTask.count(), rTask.count()),
cst: lCost + rCost + p.GetCost(lTask.count(), rTask.count(), false, 0),
p: p,
partTp: outerTask.partTp,
hashCols: outerTask.hashCols,
Expand Down Expand Up @@ -578,7 +578,7 @@ func (p *PhysicalHashJoin) attach2TaskForTiFlash(tasks ...task) task {
tblColHists: rTask.tblColHists,
indexPlanFinished: true,
tablePlan: p,
cst: lCost + rCost + p.GetCost(lTask.count(), rTask.count()),
cst: lCost + rCost + p.GetCost(lTask.count(), rTask.count(), false, 0),
}
p.cost = task.cst
return task
Expand All @@ -590,7 +590,7 @@ func (p *PhysicalMergeJoin) attach2Task(tasks ...task) task {
p.SetChildren(lTask.plan(), rTask.plan())
t := &rootTask{
p: p,
cst: lTask.cost() + rTask.cost() + p.GetCost(lTask.count(), rTask.count()),
cst: lTask.cost() + rTask.cost() + p.GetCost(lTask.count(), rTask.count(), 0),
}
p.cost = t.cost()
return t
Expand Down
4 changes: 2 additions & 2 deletions planner/implementation/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (impl *HashJoinImpl) CalcCost(outCount float64, children ...memo.Implementa
hashJoin := impl.plan.(*plannercore.PhysicalHashJoin)
// The children here are only used to calculate the cost.
hashJoin.SetChildren(children[0].GetPlan(), children[1].GetPlan())
selfCost := hashJoin.GetCost(children[0].GetPlan().StatsCount(), children[1].GetPlan().StatsCount())
selfCost := hashJoin.GetCost(children[0].GetPlan().StatsCount(), children[1].GetPlan().StatsCount(), false, 0)
impl.cost = selfCost + children[0].GetCost() + children[1].GetCost()
return impl.cost
}
Expand All @@ -56,7 +56,7 @@ func (impl *MergeJoinImpl) CalcCost(outCount float64, children ...memo.Implement
mergeJoin := impl.plan.(*plannercore.PhysicalMergeJoin)
// The children here are only used to calculate the cost.
mergeJoin.SetChildren(children[0].GetPlan(), children[1].GetPlan())
selfCost := mergeJoin.GetCost(children[0].GetPlan().StatsCount(), children[1].GetPlan().StatsCount())
selfCost := mergeJoin.GetCost(children[0].GetPlan().StatsCount(), children[1].GetPlan().StatsCount(), 0)
impl.cost = selfCost + children[0].GetCost() + children[1].GetCost()
return impl.cost
}
Expand Down

0 comments on commit 56d004e

Please sign in to comment.