From f9f72688fb1e378b752108ffcd2e691ac1bf6ebd Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Fri, 6 Jan 2023 19:28:21 +0800 Subject: [PATCH] *: fix static pruning partition table in disaggregated tiflash mode (#40238) close pingcap/tidb#40239 --- domain/domain.go | 6 ++-- executor/mpp_gather.go | 4 ++- executor/tiflashtest/tiflash_test.go | 24 +++++++++++++++ kv/mpp.go | 3 +- planner/core/find_best_task.go | 3 +- planner/core/fragment.go | 45 +++++++++++++++++++--------- 6 files changed, 65 insertions(+), 20 deletions(-) diff --git a/domain/domain.go b/domain/domain.go index 34eb4a80742e6..5be4c1f6f028a 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -1427,7 +1427,7 @@ func (do *Domain) LoadSysVarCacheLoop(ctx sessionctx.Context) error { // WatchTiFlashComputeNodeChange create a routine to watch if the topology of tiflash_compute node is changed. // TODO: tiflashComputeNodeKey is not put to etcd yet(finish this when AutoScaler is done) // -// store cache will only be invalidated every 30 seconds. +// store cache will only be invalidated every n seconds. func (do *Domain) WatchTiFlashComputeNodeChange() error { var watchCh clientv3.WatchChan if do.etcdClient != nil { @@ -1468,8 +1468,8 @@ func (do *Domain) WatchTiFlashComputeNodeChange() error { case tikv.Storage: logCount++ s.GetRegionCache().InvalidateTiFlashComputeStores() - if logCount == 60 { - // Print log every 60*duration seconds. + if logCount == 6 { + // Print log every 6*duration seconds. logutil.BgLogger().Debug("tiflash_compute store cache invalied, will update next query", zap.Bool("watched", watched)) logCount = 0 } diff --git a/executor/mpp_gather.go b/executor/mpp_gather.go index eee019bd0de47..eba5498f8869d 100644 --- a/executor/mpp_gather.go +++ b/executor/mpp_gather.go @@ -81,7 +81,9 @@ func (e *MPPGather) appendMPPDispatchReq(pf *plannercore.Fragment) error { for _, mppTask := range pf.ExchangeSender.Tasks { if mppTask.PartitionTableIDs != nil { err = updateExecutorTableID(context.Background(), dagReq.RootExecutor, true, mppTask.PartitionTableIDs) - } else { + } else if !mppTask.IsDisaggregatedTiFlashStaticPrune { + // If isDisaggregatedTiFlashStaticPrune is true, it means this TableScan is under PartitionUnoin, + // tableID in TableScan is already the physical table id of this partition, no need to update again. err = updateExecutorTableID(context.Background(), dagReq.RootExecutor, true, []int64{mppTask.TableID}) } if err != nil { diff --git a/executor/tiflashtest/tiflash_test.go b/executor/tiflashtest/tiflash_test.go index ca52fb48fd788..e8cd94d889188 100644 --- a/executor/tiflashtest/tiflash_test.go +++ b/executor/tiflashtest/tiflash_test.go @@ -1308,4 +1308,28 @@ func TestDisaggregatedTiFlashQuery(t *testing.T) { failpoint.Enable("github.com/pingcap/tidb/planner/core/testDisaggregatedTiFlashQuery", fmt.Sprintf("return(%s)", needCheckTiFlashComputeNode)) defer failpoint.Disable("github.com/pingcap/tidb/planner/core/testDisaggregatedTiFlashQuery") tk.MustExec("explain select max( tbl_1.col_1 ) as r0 , sum( tbl_1.col_1 ) as r1 , sum( tbl_1.col_8 ) as r2 from tbl_1 where tbl_1.col_8 != 68 or tbl_1.col_3 between null and 939 order by r0,r1,r2;") + + tk.MustExec("set @@tidb_partition_prune_mode = 'static';") + tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"") + tk.MustExec("create table t1(c1 int, c2 int) partition by hash(c1) partitions 3") + tk.MustExec("insert into t1 values(1, 1), (2, 2), (3, 3)") + tk.MustExec("alter table t1 set tiflash replica 1") + tb = external.GetTableByName(t, tk, "test", "t1") + err = domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true) + require.NoError(t, err) + tk.MustQuery("explain select * from t1 where c1 < 2").Check(testkit.Rows( + "PartitionUnion_10 9970.00 root ", + "├─TableReader_15 3323.33 root data:ExchangeSender_14", + "│ └─ExchangeSender_14 3323.33 mpp[tiflash] ExchangeType: PassThrough", + "│ └─Selection_13 3323.33 mpp[tiflash] lt(test.t1.c1, 2)", + "│ └─TableFullScan_12 10000.00 mpp[tiflash] table:t1, partition:p0 keep order:false, stats:pseudo", + "├─TableReader_19 3323.33 root data:ExchangeSender_18", + "│ └─ExchangeSender_18 3323.33 mpp[tiflash] ExchangeType: PassThrough", + "│ └─Selection_17 3323.33 mpp[tiflash] lt(test.t1.c1, 2)", + "│ └─TableFullScan_16 10000.00 mpp[tiflash] table:t1, partition:p1 keep order:false, stats:pseudo", + "└─TableReader_23 3323.33 root data:ExchangeSender_22", + " └─ExchangeSender_22 3323.33 mpp[tiflash] ExchangeType: PassThrough", + " └─Selection_21 3323.33 mpp[tiflash] lt(test.t1.c1, 2)", + " └─TableFullScan_20 10000.00 mpp[tiflash] table:t1, partition:p2 keep order:false, stats:pseudo")) + // tk.MustQuery("select * from t1 where c1 < 2").Check(testkit.Rows("1 1")) } diff --git a/kv/mpp.go b/kv/mpp.go index 32e9186506067..de0a8e8654528 100644 --- a/kv/mpp.go +++ b/kv/mpp.go @@ -42,7 +42,8 @@ type MPPTask struct { MppQueryID MPPQueryID TableID int64 // physical table id - PartitionTableIDs []int64 + PartitionTableIDs []int64 + IsDisaggregatedTiFlashStaticPrune bool } // ToPB generates the pb structure. diff --git a/planner/core/find_best_task.go b/planner/core/find_best_task.go index aff1c29997fbd..37e58a6e09327 100644 --- a/planner/core/find_best_task.go +++ b/planner/core/find_best_task.go @@ -2001,8 +2001,9 @@ func (ds *DataSource) convertToTableScan(prop *property.PhysicalProperty, candid if ts.KeepOrder { return invalidTask, nil } - if prop.MPPPartitionTp != property.AnyType || ts.isPartition { + if prop.MPPPartitionTp != property.AnyType || (ts.isPartition && !canMppConvertToRootForDisaggregatedTiFlash) { // If ts is a single partition, then this partition table is in static-only prune, then we should not choose mpp execution. + // But in disaggregated tiflash mode, we can only use mpp, so we add ExchangeSender and ExchangeReceiver above TableScan for static pruning partition table. ds.SCtx().GetSessionVars().RaiseWarningWhenMPPEnforced("MPP mode may be blocked because table `" + ds.tableInfo.Name.O + "`is a partition table which is not supported when `@@tidb_partition_prune_mode=static`.") return invalidTask, nil } diff --git a/planner/core/fragment.go b/planner/core/fragment.go index 7e86696ccc4d6..2496d463f2e24 100644 --- a/planner/core/fragment.go +++ b/planner/core/fragment.go @@ -21,6 +21,7 @@ import ( "unsafe" "github.com/pingcap/errors" + "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/distsql" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/infoschema" @@ -375,17 +376,30 @@ func (e *mppTaskGenerator) constructMPPTasksImpl(ctx context.Context, ts *Physic var allPartitionsIDs []int64 var err error splitedRanges, _ := distsql.SplitRangesAcrossInt64Boundary(ts.Ranges, false, false, ts.Table.IsCommonHandle) + // True when: + // 0. Is disaggregated tiflash. because in non-disaggregated tiflash, we dont use mpp for static pruning. + // 1. Is partition table. + // 2. Dynamic prune is not used. + var isDisaggregatedTiFlashStaticPrune bool if ts.Table.GetPartitionInfo() != nil { + isDisaggregatedTiFlashStaticPrune = config.GetGlobalConfig().DisaggregatedTiFlash && + !e.ctx.GetSessionVars().StmtCtx.UseDynamicPartitionPrune() + tmp, _ := e.is.TableByID(ts.Table.ID) tbl := tmp.(table.PartitionedTable) - var partitions []table.PhysicalTable - partitions, err = partitionPruning(e.ctx, tbl, ts.PartitionInfo.PruningConds, ts.PartitionInfo.PartitionNames, ts.PartitionInfo.Columns, ts.PartitionInfo.ColumnNames) - if err != nil { - return nil, errors.Trace(err) + if !isDisaggregatedTiFlashStaticPrune { + var partitions []table.PhysicalTable + partitions, err = partitionPruning(e.ctx, tbl, ts.PartitionInfo.PruningConds, ts.PartitionInfo.PartitionNames, ts.PartitionInfo.Columns, ts.PartitionInfo.ColumnNames) + if err != nil { + return nil, errors.Trace(err) + } + req, allPartitionsIDs, err = e.constructMPPBuildTaskReqForPartitionedTable(ts, splitedRanges, partitions) + } else { + singlePartTbl := tbl.GetPartition(ts.physicalTableID) + req, err = e.constructMPPBuildTaskForNonPartitionTable(singlePartTbl.GetPhysicalID(), ts.Table.IsCommonHandle, splitedRanges) } - req, allPartitionsIDs, err = e.constructMPPBuildTaskReqForPartitionedTable(ts, splitedRanges, partitions) } else { - req, err = e.constructMPPBuildTaskForNonPartitionTable(ts, splitedRanges) + req, err = e.constructMPPBuildTaskForNonPartitionTable(ts.Table.ID, ts.Table.IsCommonHandle, splitedRanges) } if err != nil { return nil, errors.Trace(err) @@ -403,12 +417,15 @@ func (e *mppTaskGenerator) constructMPPTasksImpl(ctx context.Context, ts *Physic tasks := make([]*kv.MPPTask, 0, len(metas)) for _, meta := range metas { - task := &kv.MPPTask{Meta: meta, - ID: AllocMPPTaskID(e.ctx), - StartTs: e.startTS, - MppQueryID: e.mppQueryID, - TableID: ts.Table.ID, - PartitionTableIDs: allPartitionsIDs} + task := &kv.MPPTask{ + Meta: meta, + ID: AllocMPPTaskID(e.ctx), + StartTs: e.startTS, + MppQueryID: e.mppQueryID, + TableID: ts.Table.ID, + PartitionTableIDs: allPartitionsIDs, + IsDisaggregatedTiFlashStaticPrune: isDisaggregatedTiFlashStaticPrune, + } tasks = append(tasks, task) } return tasks, nil @@ -435,8 +452,8 @@ func (e *mppTaskGenerator) constructMPPBuildTaskReqForPartitionedTable(ts *Physi return &kv.MPPBuildTasksRequest{PartitionIDAndRanges: partitionIDAndRanges}, allPartitionsIDs, nil } -func (e *mppTaskGenerator) constructMPPBuildTaskForNonPartitionTable(ts *PhysicalTableScan, splitedRanges []*ranger.Range) (*kv.MPPBuildTasksRequest, error) { - kvRanges, err := distsql.TableHandleRangesToKVRanges(e.ctx.GetSessionVars().StmtCtx, []int64{ts.Table.ID}, ts.Table.IsCommonHandle, splitedRanges, nil) +func (e *mppTaskGenerator) constructMPPBuildTaskForNonPartitionTable(tid int64, isCommonHandle bool, splitedRanges []*ranger.Range) (*kv.MPPBuildTasksRequest, error) { + kvRanges, err := distsql.TableHandleRangesToKVRanges(e.ctx.GetSessionVars().StmtCtx, []int64{tid}, isCommonHandle, splitedRanges, nil) if err != nil { return nil, errors.Trace(err) }