Skip to content

Commit

Permalink
planner: append common handle columns into the schema of index merge …
Browse files Browse the repository at this point in the history
…table plan (#23933)
  • Loading branch information
eurekaka authored Apr 14, 2021
1 parent a9a5795 commit 50ea46c
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 23 deletions.
9 changes: 7 additions & 2 deletions planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -1025,13 +1025,18 @@ func (p *LogicalJoin) constructInnerIndexScanTask(
if ts.schema.ColumnIndex(col) == -1 {
ts.Schema().Append(col)
ts.Columns = append(ts.Columns, col.ToInfo())
cop.doubleReadNeedProj = true
cop.needExtraProj = true
}
}
}
// If inner cop task need keep order, the extraHandleCol should be set.
if cop.keepOrder && !ds.tableInfo.IsCommonHandle {
cop.extraHandleCol, cop.doubleReadNeedProj = ts.appendExtraHandleCol(ds)
var needExtraProj bool
cop.extraHandleCol, needExtraProj = ts.appendExtraHandleCol(ds)
cop.needExtraProj = cop.needExtraProj || needExtraProj
}
if cop.needExtraProj {
cop.originSchema = ds.schema
}
cop.tablePlan = ts
}
Expand Down
53 changes: 41 additions & 12 deletions planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -822,8 +822,10 @@ func (ds *DataSource) convertToIndexMergeScan(prop *property.PhysicalProperty, c
for _, partPath := range path.PartialIndexPaths {
var scan PhysicalPlan
var partialCost float64
var needExtraProj bool
if partPath.IsTablePath() {
scan, partialCost = ds.convertToPartialTableScan(prop, partPath)
scan, partialCost, needExtraProj = ds.convertToPartialTableScan(prop, partPath)
cop.needExtraProj = cop.needExtraProj || needExtraProj
} else {
scan, partialCost = ds.convertToPartialIndexScan(prop, partPath)
}
Expand All @@ -834,14 +836,18 @@ func (ds *DataSource) convertToIndexMergeScan(prop *property.PhysicalProperty, c
if prop.ExpectedCnt < ds.stats.RowCount {
totalRowCount *= prop.ExpectedCnt / ds.stats.RowCount
}
ts, partialCost, err := ds.buildIndexMergeTableScan(prop, path.TableFilters, totalRowCount)
ts, partialCost, needExtraProj, err := ds.buildIndexMergeTableScan(prop, path.TableFilters, totalRowCount)
if err != nil {
return nil, err
}
cop.needExtraProj = cop.needExtraProj || needExtraProj
totalCost += partialCost
cop.tablePlan = ts
cop.idxMergePartPlans = scans
cop.cst = totalCost
if cop.needExtraProj {
cop.originSchema = ds.schema
}
task = cop.convertToRootTask(ds.ctx)
return task, nil
}
Expand Down Expand Up @@ -878,9 +884,18 @@ func (ds *DataSource) convertToPartialIndexScan(prop *property.PhysicalProperty,
}

func (ds *DataSource) convertToPartialTableScan(prop *property.PhysicalProperty, path *util.AccessPath) (
tablePlan PhysicalPlan,
partialCost float64) {
tablePlan PhysicalPlan, partialCost float64, needExtraProj bool) {
ts, partialCost, rowCount := ds.getOriginalPhysicalTableScan(prop, path, false)
if ds.tableInfo.IsCommonHandle {
commonHandle := ds.handleCols.(*CommonHandleCols)
for _, col := range commonHandle.columns {
if ts.schema.ColumnIndex(col) == -1 {
ts.Schema().Append(col)
ts.Columns = append(ts.Columns, col.ToInfo())
needExtraProj = true
}
}
}
rowSize := ds.TblColHists.GetAvgRowSize(ds.ctx, ds.TblCols, false, false)
sessVars := ds.ctx.GetSessionVars()
if len(ts.filterCondition) > 0 {
Expand All @@ -893,15 +908,16 @@ func (ds *DataSource) convertToPartialTableScan(prop *property.PhysicalProperty,
tablePlan.SetChildren(ts)
partialCost += rowCount * sessVars.CopCPUFactor
partialCost += selectivity * rowCount * rowSize * sessVars.NetworkFactor
return tablePlan, partialCost
return
}
partialCost += rowCount * rowSize * sessVars.NetworkFactor
tablePlan = ts
return tablePlan, partialCost
return
}

func (ds *DataSource) buildIndexMergeTableScan(prop *property.PhysicalProperty, tableFilters []expression.Expression, totalRowCount float64) (PhysicalPlan, float64, error) {
func (ds *DataSource) buildIndexMergeTableScan(prop *property.PhysicalProperty, tableFilters []expression.Expression, totalRowCount float64) (PhysicalPlan, float64, bool, error) {
var partialCost float64
var needExtraProj bool
sessVars := ds.ctx.GetSessionVars()
ts := PhysicalTableScan{
Table: ds.tableInfo,
Expand All @@ -920,10 +936,20 @@ func (ds *DataSource) buildIndexMergeTableScan(prop *property.PhysicalProperty,
}
ts.HandleCols = NewIntHandleCols(handleCol)
}
if ds.tableInfo.IsCommonHandle {
commonHandle := ds.handleCols.(*CommonHandleCols)
for _, col := range commonHandle.columns {
if ts.schema.ColumnIndex(col) == -1 {
ts.Schema().Append(col)
ts.Columns = append(ts.Columns, col.ToInfo())
needExtraProj = true
}
}
}
var err error
ts.HandleCols, err = ts.HandleCols.ResolveIndices(ts.schema)
if err != nil {
return nil, 0, err
return nil, 0, false, err
}
if ts.Table.PKIsHandle {
if pkColInfo := ts.Table.GetPkColInfo(); pkColInfo != nil {
Expand All @@ -947,9 +973,9 @@ func (ds *DataSource) buildIndexMergeTableScan(prop *property.PhysicalProperty,
}
sel := PhysicalSelection{Conditions: tableFilters}.Init(ts.ctx, ts.stats.ScaleByExpectCnt(selectivity*totalRowCount), ts.blockOffset)
sel.SetChildren(ts)
return sel, partialCost, nil
return sel, partialCost, needExtraProj, nil
}
return ts, partialCost, nil
return ts, partialCost, needExtraProj, nil
}

func indexCoveringCol(col *expression.Column, indexCols []*expression.Column, idxColLens []int) bool {
Expand Down Expand Up @@ -1046,22 +1072,25 @@ func (ds *DataSource) convertToIndexScan(prop *property.PhysicalProperty, candid
ts := cop.tablePlan.(*PhysicalTableScan)
ts.Schema().Append(col)
ts.Columns = append(ts.Columns, col.ToInfo())
cop.doubleReadNeedProj = true
cop.needExtraProj = true
}
}
}
if candidate.isMatchProp {
if cop.tablePlan != nil && !ds.tableInfo.IsCommonHandle {
col, isNew := cop.tablePlan.(*PhysicalTableScan).appendExtraHandleCol(ds)
cop.extraHandleCol = col
cop.doubleReadNeedProj = isNew
cop.needExtraProj = cop.needExtraProj || isNew
}
cop.keepOrder = true
// IndexScan on partition table can't keep order.
if ds.tableInfo.GetPartitionInfo() != nil {
return invalidTask, nil
}
}
if cop.needExtraProj {
cop.originSchema = ds.schema
}
// prop.IsEmpty() would always return true when coming to here,
// so we can just use prop.ExpectedCnt as parameter of addPushedDownSelection.
finalStats := ds.stats.ScaleByExpectCnt(prop.ExpectedCnt)
Expand Down
19 changes: 19 additions & 0 deletions planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3170,6 +3170,25 @@ func (s *testIntegrationSuite) TestGetVarExprWithBitLiteral(c *C) {
tk.MustQuery("execute stmt using @a;").Check(testkit.Rows("1"))
}

func (s *testIntegrationSuite) TestIndexMergeClusterIndex(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test;")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (c1 float, c2 int, c3 int, primary key (c1) /*T![clustered_index] CLUSTERED */, key idx_1 (c2), key idx_2 (c3))")
tk.MustExec("insert into t values(1.0,1,2),(2.0,2,1),(3.0,1,1),(4.0,2,2)")
tk.MustQuery("select /*+ use_index_merge(t) */ c3 from t where c3 = 1 or c2 = 1").Sort().Check(testkit.Rows(
"1",
"1",
"2",
))
tk.MustExec("drop table t")
tk.MustExec("create table t (a int, b int, c int, primary key (a,b) /*T![clustered_index] CLUSTERED */, key idx_c(c))")
tk.MustExec("insert into t values (0,1,2)")
tk.MustQuery("select /*+ use_index_merge(t) */ c from t where c > 10 or a < 1").Check(testkit.Rows(
"2",
))
}

func (s *testIntegrationSuite) TestMultiColMaxOneRow(c *C) {
tk := testkit.NewTestKit(c, s.store)

Expand Down
27 changes: 18 additions & 9 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,11 @@ type copTask struct {
indexPlanFinished bool
// keepOrder indicates if the plan scans data by order.
keepOrder bool
// doubleReadNeedProj means an extra prune is needed because
// in double read case, it may output one more column for handle(row id).
doubleReadNeedProj bool
// needExtraProj means an extra prune is needed because
// in double read / index merge cases, they may output one more column for handle(row id).
needExtraProj bool
// originSchema is the target schema to be projected to when needExtraProj is true.
originSchema *expression.Schema

extraHandleCol *expression.Column
commonHandleCols []*expression.Column
Expand Down Expand Up @@ -883,8 +885,8 @@ func buildIndexLookUpTask(ctx sessionctx.Context, t *copTask) *rootTask {
sortCPUCost := (tableRows * math.Log2(batchSize) * sessVars.CPUFactor) / numTblWorkers
newTask.cst += sortCPUCost
}
if t.doubleReadNeedProj {
schema := p.IndexPlans[0].(*PhysicalIndexScan).dataSourceSchema
if t.needExtraProj {
schema := t.originSchema
proj := PhysicalProjection{Exprs: expression.Column2Exprs(schema.Columns)}.Init(ctx, p.stats, t.tablePlan.SelectBlockOffset(), nil)
proj.SetSchema(schema)
proj.SetChildren(p)
Expand Down Expand Up @@ -948,6 +950,13 @@ func (t *copTask) convertToRootTaskImpl(ctx sessionctx.Context) *rootTask {
p.PartitionInfo = t.partitionInfo
setTableScanToTableRowIDScan(p.tablePlan)
newTask.p = p
if t.needExtraProj {
schema := t.originSchema
proj := PhysicalProjection{Exprs: expression.Column2Exprs(schema.Columns)}.Init(ctx, p.stats, t.idxMergePartPlans[0].SelectBlockOffset(), nil)
proj.SetSchema(schema)
proj.SetChildren(p)
newTask.p = proj
}
return newTask
}
if t.indexPlan != nil && t.tablePlan != nil {
Expand Down Expand Up @@ -1706,13 +1715,13 @@ func (p *PhysicalStreamAgg) attach2Task(tasks ...task) task {
cop.finishIndexPlan()
partialAgg.SetChildren(cop.tablePlan)
cop.tablePlan = partialAgg
// If doubleReadNeedProj is true, a projection will be created above the PhysicalIndexLookUpReader to make sure
// If needExtraProj is true, a projection will be created above the PhysicalIndexLookUpReader to make sure
// the schema is the same as the original DataSource schema.
// However, we pushed down the agg here, the partial agg was placed on the top of tablePlan, and the final
// agg will be placed above the PhysicalIndexLookUpReader, and the schema will be set correctly for them.
// If we add the projection again, the projection will be between the PhysicalIndexLookUpReader and
// the partial agg, and the schema will be broken.
cop.doubleReadNeedProj = false
cop.needExtraProj = false
} else {
partialAgg.SetChildren(cop.indexPlan)
cop.indexPlan = partialAgg
Expand Down Expand Up @@ -1840,13 +1849,13 @@ func (p *PhysicalHashAgg) attach2Task(tasks ...task) task {
cop.finishIndexPlan()
partialAgg.SetChildren(cop.tablePlan)
cop.tablePlan = partialAgg
// If doubleReadNeedProj is true, a projection will be created above the PhysicalIndexLookUpReader to make sure
// If needExtraProj is true, a projection will be created above the PhysicalIndexLookUpReader to make sure
// the schema is the same as the original DataSource schema.
// However, we pushed down the agg here, the partial agg was placed on the top of tablePlan, and the final
// agg will be placed above the PhysicalIndexLookUpReader, and the schema will be set correctly for them.
// If we add the projection again, the projection will be between the PhysicalIndexLookUpReader and
// the partial agg, and the schema will be broken.
cop.doubleReadNeedProj = false
cop.needExtraProj = false
} else {
partialAgg.SetChildren(cop.indexPlan)
cop.indexPlan = partialAgg
Expand Down

0 comments on commit 50ea46c

Please sign in to comment.