Skip to content

Commit

Permalink
planner: refine planner code for disaggregated tiflash mode (#39813)
Browse files Browse the repository at this point in the history
close #39814
  • Loading branch information
guo-shaoge authored Dec 29, 2022
1 parent 875c002 commit b94042c
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 13 deletions.
4 changes: 4 additions & 0 deletions ddl/placement/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,8 @@ const (
// EngineLabelTiKV is the label value used in some tests. And possibly TiKV will
// set the engine label with a value of EngineLabelTiKV.
EngineLabelTiKV = "tikv"

// EngineLabelTiFlashCompute is for disaggregated tiflash mode,
// it's the lable of tiflash_compute nodes.
EngineLabelTiFlashCompute = "tiflash_compute"
)
32 changes: 32 additions & 0 deletions executor/tiflashtest/tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1277,3 +1277,35 @@ func TestDisaggregatedTiFlash(t *testing.T) {
})
tk.MustQuery("select * from t;").Check(testkit.Rows())
}

func TestDisaggregatedTiFlashQuery(t *testing.T) {
config.UpdateGlobal(func(conf *config.Config) {
conf.DisaggregatedTiFlash = true
})
defer config.UpdateGlobal(func(conf *config.Config) {
conf.DisaggregatedTiFlash = false
})

store := testkit.CreateMockStore(t, withMockTiFlash(2))
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists tbl_1")
tk.MustExec(`create table tbl_1 ( col_1 bigint not null default -1443635317331776148,
col_2 text ( 176 ) collate utf8mb4_bin not null,
col_3 decimal ( 8, 3 ),
col_4 varchar ( 128 ) collate utf8mb4_bin not null,
col_5 varchar ( 377 ) collate utf8mb4_bin,
col_6 double,
col_7 varchar ( 459 ) collate utf8mb4_bin,
col_8 tinyint default -88 ) charset utf8mb4 collate utf8mb4_bin ;`)
tk.MustExec("alter table tbl_1 set tiflash replica 1")
tb := external.GetTableByName(t, tk, "test", "tbl_1")
err := domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
require.NoError(t, err)
tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"")

needCheckTiFlashComputeNode := "false"
failpoint.Enable("github.com/pingcap/tidb/planner/core/testDisaggregatedTiFlashQuery", fmt.Sprintf("return(%s)", needCheckTiFlashComputeNode))
defer failpoint.Disable("github.com/pingcap/tidb/planner/core/testDisaggregatedTiFlashQuery")
tk.MustExec("explain select max( tbl_1.col_1 ) as r0 , sum( tbl_1.col_1 ) as r1 , sum( tbl_1.col_8 ) as r2 from tbl_1 where tbl_1.col_8 != 68 or tbl_1.col_3 between null and 939 order by r0,r1,r2;")
}
39 changes: 30 additions & 9 deletions planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -1993,8 +1993,11 @@ func (ds *DataSource) convertToTableScan(prop *property.PhysicalProperty, candid
}
}
}
// In disaggregated tiflash mode, only MPP is allowed, Cop and BatchCop is deprecated.
if prop.TaskTp == property.MppTaskType || config.GetGlobalConfig().DisaggregatedTiFlash && ts.StoreType == kv.TiFlash {
// In disaggregated tiflash mode, only MPP is allowed, cop and batchCop is deprecated.
// So if prop.TaskTp is RootTaskType, have to use mppTask then convert to rootTask.
isDisaggregatedTiFlashPath := config.GetGlobalConfig().DisaggregatedTiFlash && ts.StoreType == kv.TiFlash
canMppConvertToRootForDisaggregatedTiFlash := isDisaggregatedTiFlashPath && prop.TaskTp == property.RootTaskType && ds.SCtx().GetSessionVars().IsMPPAllowed()
if prop.TaskTp == property.MppTaskType || canMppConvertToRootForDisaggregatedTiFlash {
if ts.KeepOrder {
return invalidTask, nil
}
Expand All @@ -2010,8 +2013,9 @@ func (ds *DataSource) convertToTableScan(prop *property.PhysicalProperty, candid
}
}
mppTask := &mppTask{
p: ts,
partTp: property.AnyType,
p: ts,
partTp: property.AnyType,
tblColHists: ds.TblColHists,
}
ts.PartitionInfo = PartitionInfo{
PruningConds: pushDownNot(ds.ctx, ds.allConds),
Expand All @@ -2020,7 +2024,26 @@ func (ds *DataSource) convertToTableScan(prop *property.PhysicalProperty, candid
ColumnNames: ds.names,
}
mppTask = ts.addPushedDownSelectionToMppTask(mppTask, ds.stats.ScaleByExpectCnt(prop.ExpectedCnt))
return mppTask, nil
task = mppTask
if !mppTask.invalid() {
if prop.TaskTp == property.MppTaskType && len(mppTask.rootTaskConds) > 0 {
// If got filters cannot be pushed down to tiflash, we have to make sure it will be executed in TiDB,
// So have to return a rootTask, but prop requires mppTask, cannot meet this requirement.
task = invalidTask
} else if prop.TaskTp == property.RootTaskType {
// when got here, canMppConvertToRootForDisaggregatedTiFlash is true.
task = mppTask
task = task.convertToRootTask(ds.ctx)
if !task.invalid() {
ds.addSelection4PlanCache(task.(*rootTask), ds.stats.ScaleByExpectCnt(prop.ExpectedCnt), prop)
}
}
}
return task, nil
}
if isDisaggregatedTiFlashPath {
// prop.TaskTp is cop related, just return invalidTask.
return invalidTask, nil
}
copTask := &copTask{
tablePlan: ts,
Expand Down Expand Up @@ -2230,10 +2253,8 @@ func (ts *PhysicalTableScan) addPushedDownSelectionToMppTask(mpp *mppTask, stats
filterCondition, rootTaskConds := SplitSelCondsWithVirtualColumn(ts.filterCondition)
var newRootConds []expression.Expression
filterCondition, newRootConds = expression.PushDownExprs(ts.ctx.GetSessionVars().StmtCtx, filterCondition, ts.ctx.GetClient(), ts.StoreType)
rootTaskConds = append(rootTaskConds, newRootConds...)
if len(rootTaskConds) > 0 {
return &mppTask{}
}
mpp.rootTaskConds = append(rootTaskConds, newRootConds...)

ts.filterCondition = filterCondition
// Add filter condition to table plan now.
if len(ts.filterCondition) > 0 {
Expand Down
6 changes: 6 additions & 0 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"unsafe"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/bindinfo"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
Expand Down Expand Up @@ -1462,6 +1463,11 @@ func filterPathByIsolationRead(ctx sessionctx.Context, paths []*util.AccessPath,
// 1. path.StoreType doesn't exists in isolationReadEngines or
// 2. TiFlash is disaggregated and the number of tiflash_compute node is zero.
shouldPruneTiFlashCompute := noTiFlashComputeNode && exists && paths[i].StoreType == kv.TiFlash
failpoint.Inject("testDisaggregatedTiFlashQuery", func(val failpoint.Value) {
// Ignore check if tiflash_compute node number.
// After we support disaggregated tiflash in test framework, can delete this failpoint.
shouldPruneTiFlashCompute = val.(bool)
})
if shouldPruneTiFlashCompute {
outputComputeNodeErrMsg = true
}
Expand Down
43 changes: 39 additions & 4 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -1982,10 +1982,6 @@ func (p *PhysicalHashAgg) attach2TaskForMpp(tasks ...task) task {
}
attachPlan2Task(proj, newMpp)
return newMpp
case NoMpp:
t = mpp.convertToRootTask(p.ctx)
attachPlan2Task(p, t)
return t
default:
return invalidTask
}
Expand Down Expand Up @@ -2072,6 +2068,19 @@ type mppTask struct {

partTp property.MPPPartitionType
hashCols []*property.MPPPartitionColumn

// rootTaskConds record filters of TableScan that cannot be pushed down to TiFlash.

// For logical plan like: HashAgg -> Selection -> TableScan, if filters in Selection cannot be pushed to TiFlash.
// Planner will generate physical plan like: PhysicalHashAgg -> PhysicalSelection -> TableReader -> PhysicalTableScan(cop tiflash)
// Because planner will make mppTask invalid directly then use copTask directly.

// But in DisaggregatedTiFlash mode, cop and batchCop protocol is disabled, so we have to consider this situation for mppTask.
// When generating PhysicalTableScan, if prop.TaskTp is RootTaskType, mppTask will be converted to rootTask,
// and filters in rootTaskConds will be added in a Selection which will be executed in TiDB.
// So physical plan be like: PhysicalHashAgg -> PhysicalSelection -> TableReader -> ExchangeSender -> PhysicalTableScan(mpp tiflash)
rootTaskConds []expression.Expression
tblColHists *statistics.HistColl
}

func (t *mppTask) count() float64 {
Expand Down Expand Up @@ -2151,6 +2160,32 @@ func (t *mppTask) convertToRootTaskImpl(ctx sessionctx.Context) *rootTask {
rt := &rootTask{
p: p,
}

if len(t.rootTaskConds) > 0 {
// Some Filter cannot be pushed down to TiFlash, need to add Selection in rootTask,
// so this Selection will be executed in TiDB.
_, isTableScan := t.p.(*PhysicalTableScan)
_, isSelection := t.p.(*PhysicalSelection)
if isSelection {
_, isTableScan = t.p.Children()[0].(*PhysicalTableScan)
}
if !isTableScan {
// Need to make sure oriTaskPlan is TableScan, because rootTaskConds is part of TableScan.FilterCondition.
// It's only for TableScan. This is ensured by converting mppTask to rootTask just after TableScan is built,
// so no other operators are added into this mppTask.
logutil.BgLogger().Error("expect Selection or TableScan for mppTask.p", zap.String("mppTask.p", t.p.TP()))
return invalidTask
}
selectivity, _, err := t.tblColHists.Selectivity(ctx, t.rootTaskConds, nil)
if err != nil {
logutil.BgLogger().Debug("calculate selectivity failed, use selection factor", zap.Error(err))
selectivity = SelectionFactor
}
sel := PhysicalSelection{Conditions: t.rootTaskConds}.Init(ctx, rt.p.statsInfo().Scale(selectivity), rt.p.SelectBlockOffset())
sel.fromDataSource = true
sel.SetChildren(rt.p)
rt.p = sel
}
return rt
}

Expand Down
4 changes: 4 additions & 0 deletions store/gcworker/gc_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -912,6 +912,10 @@ func needsGCOperationForStore(store *metapb.Store) (bool, error) {
// skip physical resolve locks for it.
return false, nil

case placement.EngineLabelTiFlashCompute:
logutil.BgLogger().Debug("[gc worker] will ignore gc tiflash_compute node")
return false, nil

case placement.EngineLabelTiKV, "":
// If no engine label is set, it should be a TiKV node.
return true, nil
Expand Down

0 comments on commit b94042c

Please sign in to comment.