Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: support out join in broadcast join (#18988) #19664

Merged
merged 11 commits into from
Sep 3, 2020
Merged
17 changes: 13 additions & 4 deletions planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -1492,16 +1492,25 @@ func (p *LogicalJoin) tryToGetBroadCastJoin(prop *property.PhysicalProperty) []P
return nil
}

if p.JoinType != InnerJoin || len(p.LeftConditions) != 0 || len(p.RightConditions) != 0 || len(p.OtherConditions) != 0 || len(p.EqualConditions) == 0 {
// for left join the global idx must be 1, and for right join the global idx must be 0
if (p.JoinType != InnerJoin && p.JoinType != LeftOuterJoin && p.JoinType != RightOuterJoin) || len(p.LeftConditions) != 0 || len(p.RightConditions) != 0 || len(p.OtherConditions) != 0 || len(p.EqualConditions) == 0 {
return nil
}

if hasPrefer, idx := p.getPreferredBCJLocalIndex(); hasPrefer {
if (idx == 0 && p.JoinType == RightOuterJoin) || (idx == 1 && p.JoinType == LeftOuterJoin) {
return nil
}
return p.tryToGetBroadCastJoinByPreferGlobalIdx(prop, 1-idx)
}
results := p.tryToGetBroadCastJoinByPreferGlobalIdx(prop, 0)
results = append(results, p.tryToGetBroadCastJoinByPreferGlobalIdx(prop, 1)...)
return results
if p.JoinType == InnerJoin {
results := p.tryToGetBroadCastJoinByPreferGlobalIdx(prop, 0)
results = append(results, p.tryToGetBroadCastJoinByPreferGlobalIdx(prop, 1)...)
return results
} else if p.JoinType == LeftOuterJoin {
return p.tryToGetBroadCastJoinByPreferGlobalIdx(prop, 1)
}
return p.tryToGetBroadCastJoinByPreferGlobalIdx(prop, 0)
}

func (p *LogicalJoin) tryToGetBroadCastJoinByPreferGlobalIdx(prop *property.PhysicalProperty, preferredGlobalIndex int) []PhysicalPlan {
Expand Down
26 changes: 26 additions & 0 deletions planner/core/explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,32 @@ func (p *PhysicalMergeJoin) ExplainNormalizedInfo() string {
return p.explainInfo(true)
}

// ExplainInfo implements Plan interface.
func (p *PhysicalBroadCastJoin) ExplainInfo() string {
return p.explainInfo()
}

// ExplainNormalizedInfo implements Plan interface.
func (p *PhysicalBroadCastJoin) ExplainNormalizedInfo() string {
return p.explainInfo()
}

func (p *PhysicalBroadCastJoin) explainInfo() string {
buffer := new(bytes.Buffer)

buffer.WriteString(p.JoinType.String())

if len(p.LeftJoinKeys) > 0 {
fmt.Fprintf(buffer, ", left key:%s",
expression.ExplainColumnList(p.LeftJoinKeys))
}
if len(p.RightJoinKeys) > 0 {
fmt.Fprintf(buffer, ", right key:%s",
expression.ExplainColumnList(p.RightJoinKeys))
}
return buffer.String()
}

// ExplainInfo implements Plan interface.
func (p *PhysicalTopN) ExplainInfo() string {
buffer := bytes.NewBufferString("")
Expand Down
4 changes: 2 additions & 2 deletions planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,8 +401,8 @@ func (s *testIntegrationSerialSuite) TestBroadcastJoin(c *C) {
res.Check(testkit.Rows(output[i].Plan...))
}

// out join not supported
_, err := tk.Exec("explain select /*+ broadcast_join(fact_t, d1_t) */ count(*) from fact_t left join d1_t on fact_t.d1_k = d1_t.d1_k")
// out table of out join should not be global
_, err := tk.Exec("explain select /*+ broadcast_join(fact_t, d1_t), broadcast_join_local(d1_t) */ count(*) from fact_t left join d1_t on fact_t.d1_k = d1_t.d1_k")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[planner:1815]Internal : Can't find a proper physical plan for this query")
// join with non-equal condition not supported
Expand Down
9 changes: 8 additions & 1 deletion planner/core/plan_to_pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,15 @@ func (p *PhysicalBroadCastJoin) ToPB(ctx sessionctx.Context, storeType kv.StoreT
if err != nil {
return nil, err
}
pbJoinType := tipb.JoinType_TypeInnerJoin
switch p.JoinType {
case LeftOuterJoin:
pbJoinType = tipb.JoinType_TypeLeftOuterJoin
case RightOuterJoin:
pbJoinType = tipb.JoinType_TypeRightOuterJoin
}
join := &tipb.Join{
JoinType: tipb.JoinType_TypeInnerJoin,
JoinType: pbJoinType,
JoinExecType: tipb.JoinExecType_TypeHashJoin,
InnerIdx: int64(p.InnerChildIdx),
LeftJoinKeys: left,
Expand Down
4 changes: 3 additions & 1 deletion planner/core/testdata/integration_serial_suite_in.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
"explain select /*+ broadcast_join(fact_t,d1_t) */ count(*) from fact_t, d1_t where fact_t.d1_k = d1_t.d1_k",
"explain select /*+ broadcast_join(fact_t,d1_t,d2_t,d3_t) */ count(*) from fact_t, d1_t, d2_t, d3_t where fact_t.d1_k = d1_t.d1_k and fact_t.d2_k = d2_t.d2_k and fact_t.d3_k = d3_t.d3_k",
"explain select /*+ broadcast_join(fact_t,d1_t), broadcast_join_local(d1_t) */ count(*) from fact_t, d1_t where fact_t.d1_k = d1_t.d1_k",
"explain select /*+ broadcast_join(fact_t,d1_t,d2_t,d3_t), broadcast_join_local(d2_t) */ count(*) from fact_t, d1_t, d2_t, d3_t where fact_t.d1_k = d1_t.d1_k and fact_t.d2_k = d2_t.d2_k and fact_t.d3_k = d3_t.d3_k"
"explain select /*+ broadcast_join(fact_t,d1_t,d2_t,d3_t), broadcast_join_local(d2_t) */ count(*) from fact_t, d1_t, d2_t, d3_t where fact_t.d1_k = d1_t.d1_k and fact_t.d2_k = d2_t.d2_k and fact_t.d3_k = d3_t.d3_k",
"explain select /*+ broadcast_join(fact_t,d1_t) */ count(*) from fact_t left join d1_t on fact_t.d1_k = d1_t.d1_k",
"explain select /*+ broadcast_join(fact_t,d1_t) */ count(*) from fact_t right join d1_t on fact_t.d1_k = d1_t.d1_k"
]
},
{
Expand Down
40 changes: 32 additions & 8 deletions planner/core/testdata/integration_serial_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
"StreamAgg_32 1.00 root funcs:count(Column#14)->Column#11",
"└─TableReader_33 1.00 root data:StreamAgg_13",
" └─StreamAgg_13 1.00 cop[tiflash] funcs:count(1)->Column#14",
" └─BroadcastJoin_31 8.00 cop[tiflash] ",
" └─BroadcastJoin_31 8.00 cop[tiflash] inner join, left key:test.fact_t.d1_k, right key:test.d1_t.d1_k",
" ├─Selection_23(Build) 2.00 cop[tiflash] not(isnull(test.d1_t.d1_k))",
" │ └─TableFullScan_22 2.00 cop[tiflash] table:d1_t keep order:false, global read",
" └─Selection_21(Probe) 8.00 cop[tiflash] not(isnull(test.fact_t.d1_k))",
Expand All @@ -42,13 +42,13 @@
"StreamAgg_52 1.00 root funcs:count(Column#20)->Column#17",
"└─TableReader_53 1.00 root data:StreamAgg_17",
" └─StreamAgg_17 1.00 cop[tiflash] funcs:count(1)->Column#20",
" └─BroadcastJoin_51 8.00 cop[tiflash] ",
" └─BroadcastJoin_51 8.00 cop[tiflash] inner join, left key:test.fact_t.d3_k, right key:test.d3_t.d3_k",
" ├─Selection_43(Build) 2.00 cop[tiflash] not(isnull(test.d3_t.d3_k))",
" │ └─TableFullScan_42 2.00 cop[tiflash] table:d3_t keep order:false, global read",
" └─BroadcastJoin_33(Probe) 8.00 cop[tiflash] ",
" └─BroadcastJoin_33(Probe) 8.00 cop[tiflash] inner join, left key:test.fact_t.d2_k, right key:test.d2_t.d2_k",
" ├─Selection_29(Build) 2.00 cop[tiflash] not(isnull(test.d2_t.d2_k))",
" │ └─TableFullScan_28 2.00 cop[tiflash] table:d2_t keep order:false, global read",
" └─BroadcastJoin_37(Probe) 8.00 cop[tiflash] ",
" └─BroadcastJoin_37(Probe) 8.00 cop[tiflash] inner join, left key:test.fact_t.d1_k, right key:test.d1_t.d1_k",
" ├─Selection_27(Build) 2.00 cop[tiflash] not(isnull(test.d1_t.d1_k))",
" │ └─TableFullScan_26 2.00 cop[tiflash] table:d1_t keep order:false, global read",
" └─Selection_41(Probe) 8.00 cop[tiflash] not(isnull(test.fact_t.d1_k)), not(isnull(test.fact_t.d2_k)), not(isnull(test.fact_t.d3_k))",
Expand All @@ -61,7 +61,7 @@
"StreamAgg_25 1.00 root funcs:count(Column#14)->Column#11",
"└─TableReader_26 1.00 root data:StreamAgg_13",
" └─StreamAgg_13 1.00 cop[tiflash] funcs:count(1)->Column#14",
" └─BroadcastJoin_24 8.00 cop[tiflash] ",
" └─BroadcastJoin_24 8.00 cop[tiflash] inner join, left key:test.fact_t.d1_k, right key:test.d1_t.d1_k",
" ├─Selection_18(Build) 2.00 cop[tiflash] not(isnull(test.d1_t.d1_k))",
" │ └─TableFullScan_17 2.00 cop[tiflash] table:d1_t keep order:false",
" └─Selection_16(Probe) 8.00 cop[tiflash] not(isnull(test.fact_t.d1_k))",
Expand All @@ -74,18 +74,42 @@
"StreamAgg_36 1.00 root funcs:count(Column#20)->Column#17",
"└─TableReader_37 1.00 root data:StreamAgg_17",
" └─StreamAgg_17 1.00 cop[tiflash] funcs:count(1)->Column#20",
" └─BroadcastJoin_35 8.00 cop[tiflash] ",
" └─BroadcastJoin_35 8.00 cop[tiflash] inner join, left key:test.fact_t.d3_k, right key:test.d3_t.d3_k",
" ├─Selection_29(Build) 2.00 cop[tiflash] not(isnull(test.d3_t.d3_k))",
" │ └─TableFullScan_28 2.00 cop[tiflash] table:d3_t keep order:false, global read",
" └─BroadcastJoin_19(Probe) 8.00 cop[tiflash] ",
" └─BroadcastJoin_19(Probe) 8.00 cop[tiflash] inner join, left key:test.fact_t.d2_k, right key:test.d2_t.d2_k",
" ├─Selection_27(Build) 2.00 cop[tiflash] not(isnull(test.d2_t.d2_k))",
" │ └─TableFullScan_26 2.00 cop[tiflash] table:d2_t keep order:false",
" └─BroadcastJoin_20(Probe) 8.00 cop[tiflash] ",
" └─BroadcastJoin_20(Probe) 8.00 cop[tiflash] inner join, left key:test.fact_t.d1_k, right key:test.d1_t.d1_k",
" ├─Selection_25(Build) 2.00 cop[tiflash] not(isnull(test.d1_t.d1_k))",
" │ └─TableFullScan_24 2.00 cop[tiflash] table:d1_t keep order:false, global read",
" └─Selection_23(Probe) 8.00 cop[tiflash] not(isnull(test.fact_t.d1_k)), not(isnull(test.fact_t.d2_k)), not(isnull(test.fact_t.d3_k))",
" └─TableFullScan_22 8.00 cop[tiflash] table:fact_t keep order:false, global read"
]
},
{
"SQL": "explain select /*+ broadcast_join(fact_t,d1_t) */ count(*) from fact_t left join d1_t on fact_t.d1_k = d1_t.d1_k",
"Plan": [
"StreamAgg_23 1.00 root funcs:count(Column#14)->Column#11",
"└─TableReader_24 1.00 root data:StreamAgg_12",
" └─StreamAgg_12 1.00 cop[tiflash] funcs:count(1)->Column#14",
" └─BroadcastJoin_22 8.00 cop[tiflash] left outer join, left key:test.fact_t.d1_k, right key:test.d1_t.d1_k",
" ├─Selection_16(Build) 2.00 cop[tiflash] not(isnull(test.d1_t.d1_k))",
" │ └─TableFullScan_15 2.00 cop[tiflash] table:d1_t keep order:false, global read",
" └─TableFullScan_14(Probe) 8.00 cop[tiflash] table:fact_t keep order:false"
]
},
{
"SQL": "explain select /*+ broadcast_join(fact_t,d1_t) */ count(*) from fact_t right join d1_t on fact_t.d1_k = d1_t.d1_k",
"Plan": [
"StreamAgg_23 1.00 root funcs:count(Column#14)->Column#11",
"└─TableReader_24 1.00 root data:StreamAgg_12",
" └─StreamAgg_12 1.00 cop[tiflash] funcs:count(1)->Column#14",
" └─BroadcastJoin_22 8.00 cop[tiflash] right outer join, left key:test.fact_t.d1_k, right key:test.d1_t.d1_k",
" ├─TableFullScan_16(Build) 2.00 cop[tiflash] table:d1_t keep order:false",
" └─Selection_15(Probe) 8.00 cop[tiflash] not(isnull(test.fact_t.d1_k))",
" └─TableFullScan_14 8.00 cop[tiflash] table:fact_t keep order:false, global read"
]
}
]
},
Expand Down