Skip to content

Commit

Permalink
*: fix static pruning partition table in disaggregated tiflash mode (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
guo-shaoge authored Jan 6, 2023
1 parent f600fc6 commit f9f7268
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 20 deletions.
6 changes: 3 additions & 3 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1427,7 +1427,7 @@ func (do *Domain) LoadSysVarCacheLoop(ctx sessionctx.Context) error {
// WatchTiFlashComputeNodeChange create a routine to watch if the topology of tiflash_compute node is changed.
// TODO: tiflashComputeNodeKey is not put to etcd yet(finish this when AutoScaler is done)
//
// store cache will only be invalidated every 30 seconds.
// store cache will only be invalidated every n seconds.
func (do *Domain) WatchTiFlashComputeNodeChange() error {
var watchCh clientv3.WatchChan
if do.etcdClient != nil {
Expand Down Expand Up @@ -1468,8 +1468,8 @@ func (do *Domain) WatchTiFlashComputeNodeChange() error {
case tikv.Storage:
logCount++
s.GetRegionCache().InvalidateTiFlashComputeStores()
if logCount == 60 {
// Print log every 60*duration seconds.
if logCount == 6 {
// Print log every 6*duration seconds.
logutil.BgLogger().Debug("tiflash_compute store cache invalied, will update next query", zap.Bool("watched", watched))
logCount = 0
}
Expand Down
4 changes: 3 additions & 1 deletion executor/mpp_gather.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ func (e *MPPGather) appendMPPDispatchReq(pf *plannercore.Fragment) error {
for _, mppTask := range pf.ExchangeSender.Tasks {
if mppTask.PartitionTableIDs != nil {
err = updateExecutorTableID(context.Background(), dagReq.RootExecutor, true, mppTask.PartitionTableIDs)
} else {
} else if !mppTask.IsDisaggregatedTiFlashStaticPrune {
// If isDisaggregatedTiFlashStaticPrune is true, it means this TableScan is under PartitionUnoin,
// tableID in TableScan is already the physical table id of this partition, no need to update again.
err = updateExecutorTableID(context.Background(), dagReq.RootExecutor, true, []int64{mppTask.TableID})
}
if err != nil {
Expand Down
24 changes: 24 additions & 0 deletions executor/tiflashtest/tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1308,4 +1308,28 @@ func TestDisaggregatedTiFlashQuery(t *testing.T) {
failpoint.Enable("github.com/pingcap/tidb/planner/core/testDisaggregatedTiFlashQuery", fmt.Sprintf("return(%s)", needCheckTiFlashComputeNode))
defer failpoint.Disable("github.com/pingcap/tidb/planner/core/testDisaggregatedTiFlashQuery")
tk.MustExec("explain select max( tbl_1.col_1 ) as r0 , sum( tbl_1.col_1 ) as r1 , sum( tbl_1.col_8 ) as r2 from tbl_1 where tbl_1.col_8 != 68 or tbl_1.col_3 between null and 939 order by r0,r1,r2;")

tk.MustExec("set @@tidb_partition_prune_mode = 'static';")
tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"")
tk.MustExec("create table t1(c1 int, c2 int) partition by hash(c1) partitions 3")
tk.MustExec("insert into t1 values(1, 1), (2, 2), (3, 3)")
tk.MustExec("alter table t1 set tiflash replica 1")
tb = external.GetTableByName(t, tk, "test", "t1")
err = domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
require.NoError(t, err)
tk.MustQuery("explain select * from t1 where c1 < 2").Check(testkit.Rows(
"PartitionUnion_10 9970.00 root ",
"├─TableReader_15 3323.33 root data:ExchangeSender_14",
"│ └─ExchangeSender_14 3323.33 mpp[tiflash] ExchangeType: PassThrough",
"│ └─Selection_13 3323.33 mpp[tiflash] lt(test.t1.c1, 2)",
"│ └─TableFullScan_12 10000.00 mpp[tiflash] table:t1, partition:p0 keep order:false, stats:pseudo",
"├─TableReader_19 3323.33 root data:ExchangeSender_18",
"│ └─ExchangeSender_18 3323.33 mpp[tiflash] ExchangeType: PassThrough",
"│ └─Selection_17 3323.33 mpp[tiflash] lt(test.t1.c1, 2)",
"│ └─TableFullScan_16 10000.00 mpp[tiflash] table:t1, partition:p1 keep order:false, stats:pseudo",
"└─TableReader_23 3323.33 root data:ExchangeSender_22",
" └─ExchangeSender_22 3323.33 mpp[tiflash] ExchangeType: PassThrough",
" └─Selection_21 3323.33 mpp[tiflash] lt(test.t1.c1, 2)",
" └─TableFullScan_20 10000.00 mpp[tiflash] table:t1, partition:p2 keep order:false, stats:pseudo"))
// tk.MustQuery("select * from t1 where c1 < 2").Check(testkit.Rows("1 1"))
}
3 changes: 2 additions & 1 deletion kv/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ type MPPTask struct {
MppQueryID MPPQueryID
TableID int64 // physical table id

PartitionTableIDs []int64
PartitionTableIDs []int64
IsDisaggregatedTiFlashStaticPrune bool
}

// ToPB generates the pb structure.
Expand Down
3 changes: 2 additions & 1 deletion planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -2001,8 +2001,9 @@ func (ds *DataSource) convertToTableScan(prop *property.PhysicalProperty, candid
if ts.KeepOrder {
return invalidTask, nil
}
if prop.MPPPartitionTp != property.AnyType || ts.isPartition {
if prop.MPPPartitionTp != property.AnyType || (ts.isPartition && !canMppConvertToRootForDisaggregatedTiFlash) {
// If ts is a single partition, then this partition table is in static-only prune, then we should not choose mpp execution.
// But in disaggregated tiflash mode, we can only use mpp, so we add ExchangeSender and ExchangeReceiver above TableScan for static pruning partition table.
ds.SCtx().GetSessionVars().RaiseWarningWhenMPPEnforced("MPP mode may be blocked because table `" + ds.tableInfo.Name.O + "`is a partition table which is not supported when `@@tidb_partition_prune_mode=static`.")
return invalidTask, nil
}
Expand Down
45 changes: 31 additions & 14 deletions planner/core/fragment.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"unsafe"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
Expand Down Expand Up @@ -375,17 +376,30 @@ func (e *mppTaskGenerator) constructMPPTasksImpl(ctx context.Context, ts *Physic
var allPartitionsIDs []int64
var err error
splitedRanges, _ := distsql.SplitRangesAcrossInt64Boundary(ts.Ranges, false, false, ts.Table.IsCommonHandle)
// True when:
// 0. Is disaggregated tiflash. because in non-disaggregated tiflash, we dont use mpp for static pruning.
// 1. Is partition table.
// 2. Dynamic prune is not used.
var isDisaggregatedTiFlashStaticPrune bool
if ts.Table.GetPartitionInfo() != nil {
isDisaggregatedTiFlashStaticPrune = config.GetGlobalConfig().DisaggregatedTiFlash &&
!e.ctx.GetSessionVars().StmtCtx.UseDynamicPartitionPrune()

tmp, _ := e.is.TableByID(ts.Table.ID)
tbl := tmp.(table.PartitionedTable)
var partitions []table.PhysicalTable
partitions, err = partitionPruning(e.ctx, tbl, ts.PartitionInfo.PruningConds, ts.PartitionInfo.PartitionNames, ts.PartitionInfo.Columns, ts.PartitionInfo.ColumnNames)
if err != nil {
return nil, errors.Trace(err)
if !isDisaggregatedTiFlashStaticPrune {
var partitions []table.PhysicalTable
partitions, err = partitionPruning(e.ctx, tbl, ts.PartitionInfo.PruningConds, ts.PartitionInfo.PartitionNames, ts.PartitionInfo.Columns, ts.PartitionInfo.ColumnNames)
if err != nil {
return nil, errors.Trace(err)
}
req, allPartitionsIDs, err = e.constructMPPBuildTaskReqForPartitionedTable(ts, splitedRanges, partitions)
} else {
singlePartTbl := tbl.GetPartition(ts.physicalTableID)
req, err = e.constructMPPBuildTaskForNonPartitionTable(singlePartTbl.GetPhysicalID(), ts.Table.IsCommonHandle, splitedRanges)
}
req, allPartitionsIDs, err = e.constructMPPBuildTaskReqForPartitionedTable(ts, splitedRanges, partitions)
} else {
req, err = e.constructMPPBuildTaskForNonPartitionTable(ts, splitedRanges)
req, err = e.constructMPPBuildTaskForNonPartitionTable(ts.Table.ID, ts.Table.IsCommonHandle, splitedRanges)
}
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -403,12 +417,15 @@ func (e *mppTaskGenerator) constructMPPTasksImpl(ctx context.Context, ts *Physic

tasks := make([]*kv.MPPTask, 0, len(metas))
for _, meta := range metas {
task := &kv.MPPTask{Meta: meta,
ID: AllocMPPTaskID(e.ctx),
StartTs: e.startTS,
MppQueryID: e.mppQueryID,
TableID: ts.Table.ID,
PartitionTableIDs: allPartitionsIDs}
task := &kv.MPPTask{
Meta: meta,
ID: AllocMPPTaskID(e.ctx),
StartTs: e.startTS,
MppQueryID: e.mppQueryID,
TableID: ts.Table.ID,
PartitionTableIDs: allPartitionsIDs,
IsDisaggregatedTiFlashStaticPrune: isDisaggregatedTiFlashStaticPrune,
}
tasks = append(tasks, task)
}
return tasks, nil
Expand All @@ -435,8 +452,8 @@ func (e *mppTaskGenerator) constructMPPBuildTaskReqForPartitionedTable(ts *Physi
return &kv.MPPBuildTasksRequest{PartitionIDAndRanges: partitionIDAndRanges}, allPartitionsIDs, nil
}

func (e *mppTaskGenerator) constructMPPBuildTaskForNonPartitionTable(ts *PhysicalTableScan, splitedRanges []*ranger.Range) (*kv.MPPBuildTasksRequest, error) {
kvRanges, err := distsql.TableHandleRangesToKVRanges(e.ctx.GetSessionVars().StmtCtx, []int64{ts.Table.ID}, ts.Table.IsCommonHandle, splitedRanges, nil)
func (e *mppTaskGenerator) constructMPPBuildTaskForNonPartitionTable(tid int64, isCommonHandle bool, splitedRanges []*ranger.Range) (*kv.MPPBuildTasksRequest, error) {
kvRanges, err := distsql.TableHandleRangesToKVRanges(e.ctx.GetSessionVars().StmtCtx, []int64{tid}, isCommonHandle, splitedRanges, nil)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down

0 comments on commit f9f7268

Please sign in to comment.