-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
planner, executor: support broadcast join for tiflash engine. #17232
Conversation
…into hanfei/join-merge
"StreamAgg_32 1.00 root funcs:count(Column#14)->Column#11", | ||
"└─TableReader_33 1.00 root data:StreamAgg_13", | ||
" └─StreamAgg_13 1.00 cop[tiflash] funcs:count(1)->Column#14", | ||
" └─TypeBroadcastJoin_31 8.00 cop[tiflash] ", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TypeBroadCastJoin
-> BroadCastJoin
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
planner/util/path.go
Outdated
@@ -50,6 +50,9 @@ type AccessPath struct { | |||
|
|||
IsDNFCond bool | |||
|
|||
// IsGlobalRead indicates whether this path is a remote read path for tiflash | |||
IsGlobalRead bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IsGlobalRead is not obvious, it is not related to tiflash.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, changed to IsTiFlashGlobalRead
sessionctx/variable/varsutil.go
Outdated
@@ -451,6 +451,11 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string, scope Sc | |||
return "ON", nil | |||
} | |||
return value, ErrWrongValueForVar.GenWithStackByArgs(name, value) | |||
case TiDBOptBCJ: | |||
if (strings.EqualFold(value, "ON") || value == "1") && vars.AllowBatchCop == 0 { | |||
return value, ErrWrongValueForVar.GenWithStackByArgs("Can't set BCJ to 1 but tidb_allow_batch_cop is 0, please active batch cop at first.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BCJ -> BroadCastJoin is more direct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
sessionctx/variable/varsutil.go
Outdated
@@ -575,11 +580,15 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string, scope Sc | |||
if err != nil { | |||
return value, ErrWrongTypeForVar.GenWithStackByArgs(name) | |||
} | |||
if v == 0 && vars.AllowBCJ { | |||
return value, ErrWrongValueForVar.GenWithStackByArgs("Can't set batch cop 0 but tidb_opt_broadcast_join is 1, please set bcj 0 at first") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bcj ->
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
util/plancodec/id.go
Outdated
@@ -52,6 +52,8 @@ const ( | |||
TypeLimit = "Limit" | |||
// TypeHashJoin is the type of hash join. | |||
TypeHashJoin = "HashJoin" | |||
// TypeBroadcastJoin is the type of broad cast join. | |||
TypeBroadcastJoin = "TypeBroadcastJoin" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TypeBroadcastJoin = "BroadcastJoin"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
if prop.TaskTp == property.CopTiFlashGlobalReadTaskType { | ||
childrenReqProps[1-preferredGlobalIndex] = &property.PhysicalProperty{TaskTp: property.CopTiFlashGlobalReadTaskType, ExpectedCnt: math.MaxFloat64} | ||
} else { | ||
childrenReqProps[1-preferredGlobalIndex] = &property.PhysicalProperty{TaskTp: property.CopTiFlashLocalReadTaskType, ExpectedCnt: math.MaxFloat64} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This if
is not necessary depending on the prop.TaskTp would be either CopTiFlashGlobalReadTaskType
or CopTiFlashLocalReadTaskType
?
&property.PhysicalProperty{TaskTp: prop.TaskTP, ExpectedCnt: math.MaxFloat64}
is ok?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not ok when prop.TaskTp is rootTask. When it requires rootTask, its child can return a TiFlashTask , close it and get a root task.
if prop.ExpectedCnt < p.stats.RowCount { | ||
expCntScale := prop.ExpectedCnt / p.stats.RowCount | ||
childrenReqProps[1-baseJoin.InnerChildIdx].ExpectedCnt = p.children[1-baseJoin.InnerChildIdx].statsInfo().RowCount * expCntScale | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the meaning of these code? can you add some comments?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's referred from
tidb/planner/core/exhaust_physical_plans.go
Line 345 in adc2d04
if prop.ExpectedCnt < p.stats.RowCount { |
ExpectedCnt means expected returned rows before sql stops
, sometimes it's different from stats.rowCount if and only if there is a limit in parent operators which makes it stop early.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comments have existed in ExpectedCnt
field, so it's no need to add it here
numPairs := helper.estimate() | ||
probeCost := numPairs * sessVars.CopCPUFactor | ||
// should divided by the concurrency in tiflash, which should be the number of core in tiflash nodes. | ||
probeCost /= float64(sessVars.CopTiFlashConcurrencyFactor) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does CopTiFlashConcurrencyFactor guarentee > 0?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it's check in varsutil.go
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM after solve comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
/merge |
/run-all-tests |
Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
cherry pick to release-4.0 in PR #18801 |
#18801) Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
What problem does this PR solve?
Support Broadcast Join for TiFlash Engine.
What is changed and how it works?
Change Details and Description can be referred from : https://docs.google.com/document/d/1RXW4kEKhxVS1hRsKA9Vlr-Mn94X8dMNexbuiJqfrt4A/edit?usp=sharing
Related changes
Tipb and parser is also changed:
Check List
Side effects
TiFlash must also be updated before update for TiDB
Release note