From e23abd520555c2b4d6fed94dc723fb69f3f2da27 Mon Sep 17 00:00:00 2001 From: Kenan Yao Date: Wed, 7 Apr 2021 13:12:53 +0800 Subject: [PATCH] planner: append common handle columns into the schema of index merge table plan --- planner/core/exhaust_physical_plans.go | 9 ++++- planner/core/find_best_task.go | 53 ++++++++++++++++++++------ planner/core/integration_test.go | 19 +++++++++ planner/core/task.go | 27 ++++++++----- 4 files changed, 85 insertions(+), 23 deletions(-) diff --git a/planner/core/exhaust_physical_plans.go b/planner/core/exhaust_physical_plans.go index e4d4f96e9d133..0ffd8937950ae 100644 --- a/planner/core/exhaust_physical_plans.go +++ b/planner/core/exhaust_physical_plans.go @@ -1025,13 +1025,18 @@ func (p *LogicalJoin) constructInnerIndexScanTask( if ts.schema.ColumnIndex(col) == -1 { ts.Schema().Append(col) ts.Columns = append(ts.Columns, col.ToInfo()) - cop.doubleReadNeedProj = true + cop.needExtraProj = true } } } // If inner cop task need keep order, the extraHandleCol should be set. if cop.keepOrder && !ds.tableInfo.IsCommonHandle { - cop.extraHandleCol, cop.doubleReadNeedProj = ts.appendExtraHandleCol(ds) + var needExtraProj bool + cop.extraHandleCol, needExtraProj = ts.appendExtraHandleCol(ds) + cop.needExtraProj = cop.needExtraProj || needExtraProj + } + if cop.needExtraProj { + cop.originSchema = ds.schema } cop.tablePlan = ts } diff --git a/planner/core/find_best_task.go b/planner/core/find_best_task.go index cbc233d7ffaee..3a760b66a28e8 100644 --- a/planner/core/find_best_task.go +++ b/planner/core/find_best_task.go @@ -822,8 +822,10 @@ func (ds *DataSource) convertToIndexMergeScan(prop *property.PhysicalProperty, c for _, partPath := range path.PartialIndexPaths { var scan PhysicalPlan var partialCost float64 + var needExtraProj bool if partPath.IsTablePath() { - scan, partialCost = ds.convertToPartialTableScan(prop, partPath) + scan, partialCost, needExtraProj = ds.convertToPartialTableScan(prop, partPath) + cop.needExtraProj = cop.needExtraProj || needExtraProj } else { scan, partialCost = ds.convertToPartialIndexScan(prop, partPath) } @@ -834,14 +836,18 @@ func (ds *DataSource) convertToIndexMergeScan(prop *property.PhysicalProperty, c if prop.ExpectedCnt < ds.stats.RowCount { totalRowCount *= prop.ExpectedCnt / ds.stats.RowCount } - ts, partialCost, err := ds.buildIndexMergeTableScan(prop, path.TableFilters, totalRowCount) + ts, partialCost, needExtraProj, err := ds.buildIndexMergeTableScan(prop, path.TableFilters, totalRowCount) if err != nil { return nil, err } + cop.needExtraProj = cop.needExtraProj || needExtraProj totalCost += partialCost cop.tablePlan = ts cop.idxMergePartPlans = scans cop.cst = totalCost + if cop.needExtraProj { + cop.originSchema = ds.schema + } task = cop.convertToRootTask(ds.ctx) return task, nil } @@ -878,9 +884,18 @@ func (ds *DataSource) convertToPartialIndexScan(prop *property.PhysicalProperty, } func (ds *DataSource) convertToPartialTableScan(prop *property.PhysicalProperty, path *util.AccessPath) ( - tablePlan PhysicalPlan, - partialCost float64) { + tablePlan PhysicalPlan, partialCost float64, needExtraProj bool) { ts, partialCost, rowCount := ds.getOriginalPhysicalTableScan(prop, path, false) + if ds.tableInfo.IsCommonHandle { + commonHandle := ds.handleCols.(*CommonHandleCols) + for _, col := range commonHandle.columns { + if ts.schema.ColumnIndex(col) == -1 { + ts.Schema().Append(col) + ts.Columns = append(ts.Columns, col.ToInfo()) + needExtraProj = true + } + } + } rowSize := ds.TblColHists.GetAvgRowSize(ds.ctx, ds.TblCols, false, false) sessVars := ds.ctx.GetSessionVars() if len(ts.filterCondition) > 0 { @@ -893,15 +908,16 @@ func (ds *DataSource) convertToPartialTableScan(prop *property.PhysicalProperty, tablePlan.SetChildren(ts) partialCost += rowCount * sessVars.CopCPUFactor partialCost += selectivity * rowCount * rowSize * sessVars.NetworkFactor - return tablePlan, partialCost + return } partialCost += rowCount * rowSize * sessVars.NetworkFactor tablePlan = ts - return tablePlan, partialCost + return } -func (ds *DataSource) buildIndexMergeTableScan(prop *property.PhysicalProperty, tableFilters []expression.Expression, totalRowCount float64) (PhysicalPlan, float64, error) { +func (ds *DataSource) buildIndexMergeTableScan(prop *property.PhysicalProperty, tableFilters []expression.Expression, totalRowCount float64) (PhysicalPlan, float64, bool, error) { var partialCost float64 + var needExtraProj bool sessVars := ds.ctx.GetSessionVars() ts := PhysicalTableScan{ Table: ds.tableInfo, @@ -920,10 +936,20 @@ func (ds *DataSource) buildIndexMergeTableScan(prop *property.PhysicalProperty, } ts.HandleCols = NewIntHandleCols(handleCol) } + if ds.tableInfo.IsCommonHandle { + commonHandle := ds.handleCols.(*CommonHandleCols) + for _, col := range commonHandle.columns { + if ts.schema.ColumnIndex(col) == -1 { + ts.Schema().Append(col) + ts.Columns = append(ts.Columns, col.ToInfo()) + needExtraProj = true + } + } + } var err error ts.HandleCols, err = ts.HandleCols.ResolveIndices(ts.schema) if err != nil { - return nil, 0, err + return nil, 0, false, err } if ts.Table.PKIsHandle { if pkColInfo := ts.Table.GetPkColInfo(); pkColInfo != nil { @@ -947,9 +973,9 @@ func (ds *DataSource) buildIndexMergeTableScan(prop *property.PhysicalProperty, } sel := PhysicalSelection{Conditions: tableFilters}.Init(ts.ctx, ts.stats.ScaleByExpectCnt(selectivity*totalRowCount), ts.blockOffset) sel.SetChildren(ts) - return sel, partialCost, nil + return sel, partialCost, needExtraProj, nil } - return ts, partialCost, nil + return ts, partialCost, needExtraProj, nil } func indexCoveringCol(col *expression.Column, indexCols []*expression.Column, idxColLens []int) bool { @@ -1046,7 +1072,7 @@ func (ds *DataSource) convertToIndexScan(prop *property.PhysicalProperty, candid ts := cop.tablePlan.(*PhysicalTableScan) ts.Schema().Append(col) ts.Columns = append(ts.Columns, col.ToInfo()) - cop.doubleReadNeedProj = true + cop.needExtraProj = true } } } @@ -1054,7 +1080,7 @@ func (ds *DataSource) convertToIndexScan(prop *property.PhysicalProperty, candid if cop.tablePlan != nil && !ds.tableInfo.IsCommonHandle { col, isNew := cop.tablePlan.(*PhysicalTableScan).appendExtraHandleCol(ds) cop.extraHandleCol = col - cop.doubleReadNeedProj = isNew + cop.needExtraProj = cop.needExtraProj || isNew } cop.keepOrder = true // IndexScan on partition table can't keep order. @@ -1062,6 +1088,9 @@ func (ds *DataSource) convertToIndexScan(prop *property.PhysicalProperty, candid return invalidTask, nil } } + if cop.needExtraProj { + cop.originSchema = ds.schema + } // prop.IsEmpty() would always return true when coming to here, // so we can just use prop.ExpectedCnt as parameter of addPushedDownSelection. finalStats := ds.stats.ScaleByExpectCnt(prop.ExpectedCnt) diff --git a/planner/core/integration_test.go b/planner/core/integration_test.go index b470217be1a7a..7881ec05ff2be 100644 --- a/planner/core/integration_test.go +++ b/planner/core/integration_test.go @@ -3170,6 +3170,25 @@ func (s *testIntegrationSuite) TestGetVarExprWithBitLiteral(c *C) { tk.MustQuery("execute stmt using @a;").Check(testkit.Rows("1")) } +func (s *testIntegrationSuite) TestIndexMergeClusterIndex(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test;") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t (c1 float, c2 int, c3 int, primary key (c1) /*T![clustered_index] CLUSTERED */, key idx_1 (c2), key idx_2 (c3))") + tk.MustExec("insert into t values(1.0,1,2),(2.0,2,1),(3.0,1,1),(4.0,2,2)") + tk.MustQuery("select /*+ use_index_merge(t) */ c3 from t where c3 = 1 or c2 = 1").Sort().Check(testkit.Rows( + "1", + "1", + "2", + )) + tk.MustExec("drop table t") + tk.MustExec("create table t (a int, b int, c int, primary key (a,b) /*T![clustered_index] CLUSTERED */, key idx_c(c))") + tk.MustExec("insert into t values (0,1,2)") + tk.MustQuery("select /*+ use_index_merge(t) */ c from t where c > 10 or a < 1").Check(testkit.Rows( + "2", + )) +} + func (s *testIntegrationSuite) TestMultiColMaxOneRow(c *C) { tk := testkit.NewTestKit(c, s.store) diff --git a/planner/core/task.go b/planner/core/task.go index cb8ff41e4f815..2c04a83e161cb 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -64,9 +64,11 @@ type copTask struct { indexPlanFinished bool // keepOrder indicates if the plan scans data by order. keepOrder bool - // doubleReadNeedProj means an extra prune is needed because - // in double read case, it may output one more column for handle(row id). - doubleReadNeedProj bool + // needExtraProj means an extra prune is needed because + // in double read / index merge cases, they may output one more column for handle(row id). + needExtraProj bool + // originSchema is the target schema to be projected to when needExtraProj is true. + originSchema *expression.Schema extraHandleCol *expression.Column commonHandleCols []*expression.Column @@ -883,8 +885,8 @@ func buildIndexLookUpTask(ctx sessionctx.Context, t *copTask) *rootTask { sortCPUCost := (tableRows * math.Log2(batchSize) * sessVars.CPUFactor) / numTblWorkers newTask.cst += sortCPUCost } - if t.doubleReadNeedProj { - schema := p.IndexPlans[0].(*PhysicalIndexScan).dataSourceSchema + if t.needExtraProj { + schema := t.originSchema proj := PhysicalProjection{Exprs: expression.Column2Exprs(schema.Columns)}.Init(ctx, p.stats, t.tablePlan.SelectBlockOffset(), nil) proj.SetSchema(schema) proj.SetChildren(p) @@ -948,6 +950,13 @@ func (t *copTask) convertToRootTaskImpl(ctx sessionctx.Context) *rootTask { p.PartitionInfo = t.partitionInfo setTableScanToTableRowIDScan(p.tablePlan) newTask.p = p + if t.needExtraProj { + schema := t.originSchema + proj := PhysicalProjection{Exprs: expression.Column2Exprs(schema.Columns)}.Init(ctx, p.stats, t.idxMergePartPlans[0].SelectBlockOffset(), nil) + proj.SetSchema(schema) + proj.SetChildren(p) + newTask.p = proj + } return newTask } if t.indexPlan != nil && t.tablePlan != nil { @@ -1706,13 +1715,13 @@ func (p *PhysicalStreamAgg) attach2Task(tasks ...task) task { cop.finishIndexPlan() partialAgg.SetChildren(cop.tablePlan) cop.tablePlan = partialAgg - // If doubleReadNeedProj is true, a projection will be created above the PhysicalIndexLookUpReader to make sure + // If needExtraProj is true, a projection will be created above the PhysicalIndexLookUpReader to make sure // the schema is the same as the original DataSource schema. // However, we pushed down the agg here, the partial agg was placed on the top of tablePlan, and the final // agg will be placed above the PhysicalIndexLookUpReader, and the schema will be set correctly for them. // If we add the projection again, the projection will be between the PhysicalIndexLookUpReader and // the partial agg, and the schema will be broken. - cop.doubleReadNeedProj = false + cop.needExtraProj = false } else { partialAgg.SetChildren(cop.indexPlan) cop.indexPlan = partialAgg @@ -1840,13 +1849,13 @@ func (p *PhysicalHashAgg) attach2Task(tasks ...task) task { cop.finishIndexPlan() partialAgg.SetChildren(cop.tablePlan) cop.tablePlan = partialAgg - // If doubleReadNeedProj is true, a projection will be created above the PhysicalIndexLookUpReader to make sure + // If needExtraProj is true, a projection will be created above the PhysicalIndexLookUpReader to make sure // the schema is the same as the original DataSource schema. // However, we pushed down the agg here, the partial agg was placed on the top of tablePlan, and the final // agg will be placed above the PhysicalIndexLookUpReader, and the schema will be set correctly for them. // If we add the projection again, the projection will be between the PhysicalIndexLookUpReader and // the partial agg, and the schema will be broken. - cop.doubleReadNeedProj = false + cop.needExtraProj = false } else { partialAgg.SetChildren(cop.indexPlan) cop.indexPlan = partialAgg