diff --git a/domain/historical_stats.go b/domain/historical_stats.go index ca68319c31ba8..5d6d90feedef8 100644 --- a/domain/historical_stats.go +++ b/domain/historical_stats.go @@ -17,6 +17,7 @@ package domain import ( "github.com/pingcap/errors" "github.com/pingcap/tidb/metrics" + "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/statistics/handle" ) @@ -48,18 +49,27 @@ func (w *HistoricalStatsWorker) DumpHistoricalStats(tableID int64, statsHandle * } sctx := w.sctx is := GetDomain(sctx).InfoSchema() + isPartition := false + var tblInfo *model.TableInfo tbl, existed := is.TableByID(tableID) if !existed { - return errors.Errorf("cannot get table by id %d", tableID) + tbl, db, p := is.FindTableByPartitionID(tableID) + if tbl != nil && db != nil && p != nil { + isPartition = true + tblInfo = tbl.Meta() + } else { + return errors.Errorf("cannot get table by id %d", tableID) + } + } else { + tblInfo = tbl.Meta() } - tblInfo := tbl.Meta() dbInfo, existed := is.SchemaByTable(tblInfo) if !existed { return errors.Errorf("cannot get DBInfo by TableID %d", tableID) } - if _, err := statsHandle.RecordHistoricalStatsToStorage(dbInfo.Name.O, tblInfo); err != nil { + if _, err := statsHandle.RecordHistoricalStatsToStorage(dbInfo.Name.O, tblInfo, tableID, isPartition); err != nil { generateHistoricalStatsFailedCounter.Inc() - return errors.Errorf("record table %s.%s's historical stats failed", dbInfo.Name.O, tblInfo.Name.O) + return errors.Errorf("record table %s.%s's historical stats failed, err:%v", dbInfo.Name.O, tblInfo.Name.O, err) } generateHistoricalStatsSuccessCounter.Inc() return nil diff --git a/executor/analyze.go b/executor/analyze.go index af223b24dd4a8..705e6eed6c590 100644 --- a/executor/analyze.go +++ b/executor/analyze.go @@ -291,6 +291,8 @@ func (e *AnalyzeExec) handleResultsError(ctx context.Context, concurrency int, n } } + tableIDs := map[int64]struct{}{} + // save analyze results in single-thread. statsHandle := domain.GetDomain(e.ctx).StatsHandle() panicCnt := 0 @@ -311,6 +313,7 @@ func (e *AnalyzeExec) handleResultsError(ctx context.Context, concurrency int, n continue } handleGlobalStats(needGlobalStats, globalStatsMap, results) + tableIDs[results.TableID.GetStatisticsID()] = struct{}{} if err1 := statsHandle.SaveTableStatsToStorage(results, e.ctx.GetSessionVars().EnableAnalyzeSnapshot, handle.StatsMetaHistorySourceAnalyze); err1 != nil { tableID := results.TableID.TableID @@ -319,10 +322,6 @@ func (e *AnalyzeExec) handleResultsError(ctx context.Context, concurrency int, n finishJobWithLog(e.ctx, results.Job, err) } else { finishJobWithLog(e.ctx, results.Job, nil) - // Dump stats to historical storage. - if err := recordHistoricalStats(e.ctx, results.TableID.TableID); err != nil { - logutil.BgLogger().Error("record historical stats failed", zap.Error(err)) - } } invalidInfoSchemaStatCache(results.TableID.GetStatisticsID()) if atomic.LoadUint32(&e.ctx.GetSessionVars().Killed) == 1 { @@ -330,6 +329,13 @@ func (e *AnalyzeExec) handleResultsError(ctx context.Context, concurrency int, n return errors.Trace(ErrQueryInterrupted) } } + // Dump stats to historical storage. + for tableID := range tableIDs { + if err := recordHistoricalStats(e.ctx, tableID); err != nil { + logutil.BgLogger().Error("record historical stats failed", zap.Error(err)) + } + } + return err } @@ -348,6 +354,7 @@ func (e *AnalyzeExec) handleResultsErrorWithConcurrency(ctx context.Context, sta worker.run(ctx1, e.ctx.GetSessionVars().EnableAnalyzeSnapshot) }) } + tableIDs := map[int64]struct{}{} panicCnt := 0 var err error for panicCnt < statsConcurrency { @@ -370,6 +377,7 @@ func (e *AnalyzeExec) handleResultsErrorWithConcurrency(ctx context.Context, sta continue } handleGlobalStats(needGlobalStats, globalStatsMap, results) + tableIDs[results.TableID.GetStatisticsID()] = struct{}{} saveResultsCh <- results } close(saveResultsCh) @@ -382,6 +390,12 @@ func (e *AnalyzeExec) handleResultsErrorWithConcurrency(ctx context.Context, sta } err = errors.New(strings.Join(errMsg, ",")) } + for tableID := range tableIDs { + // Dump stats to historical storage. + if err := recordHistoricalStats(e.ctx, tableID); err != nil { + logutil.BgLogger().Error("record historical stats failed", zap.Error(err)) + } + } return err } diff --git a/executor/analyze_global_stats.go b/executor/analyze_global_stats.go index 6b11e68a3e614..e8f8d53b8adbf 100644 --- a/executor/analyze_global_stats.go +++ b/executor/analyze_global_stats.go @@ -54,7 +54,9 @@ func (e *AnalyzeExec) handleGlobalStats(ctx context.Context, needGlobalStats boo globalStatsTableIDs[globalStatsID.tableID] = struct{}{} } statsHandle := domain.GetDomain(e.ctx).StatsHandle() + tableIDs := map[int64]struct{}{} for tableID := range globalStatsTableIDs { + tableIDs[tableID] = struct{}{} tableAllPartitionStats := make(map[int64]*statistics.Table) for globalStatsID, info := range globalStatsMap { if globalStatsID.tableID != tableID { @@ -101,16 +103,18 @@ func (e *AnalyzeExec) handleGlobalStats(ctx context.Context, needGlobalStats boo logutil.Logger(ctx).Error("save global-level stats to storage failed", zap.String("info", job.JobInfo), zap.Int64("histID", hg.ID), zap.Error(err), zap.Int64("tableID", tableID)) } - // Dump stats to historical storage. - if err1 := recordHistoricalStats(e.ctx, globalStatsID.tableID); err1 != nil { - logutil.BgLogger().Error("record historical stats failed", zap.String("info", job.JobInfo), zap.Int64("histID", hg.ID), zap.Error(err1)) - } } return err }() FinishAnalyzeMergeJob(e.ctx, job, mergeStatsErr) } } + for tableID := range tableIDs { + // Dump stats to historical storage. + if err := recordHistoricalStats(e.ctx, tableID); err != nil { + logutil.BgLogger().Error("record historical stats failed", zap.Error(err)) + } + } return nil } diff --git a/executor/analyze_worker.go b/executor/analyze_worker.go index 18edc514c5d4d..688f89f5a120d 100644 --- a/executor/analyze_worker.go +++ b/executor/analyze_worker.go @@ -66,10 +66,6 @@ func (worker *analyzeSaveStatsWorker) run(ctx context.Context, analyzeSnapshot b worker.errCh <- err } else { finishJobWithLog(worker.sctx, results.Job, nil) - // Dump stats to historical storage. - if err := recordHistoricalStats(worker.sctx, results.TableID.TableID); err != nil { - logutil.BgLogger().Error("record historical stats failed", zap.Error(err)) - } } invalidInfoSchemaStatCache(results.TableID.GetStatisticsID()) if err != nil { diff --git a/executor/historical_stats_test.go b/executor/historical_stats_test.go index 809c2c862bf43..6ae23dcebb365 100644 --- a/executor/historical_stats_test.go +++ b/executor/historical_stats_test.go @@ -216,3 +216,30 @@ func TestGCOutdatedHistoryStats(t *testing.T) { tk.MustQuery(fmt.Sprintf("select count(*) from mysql.stats_history where table_id = '%d'", tableInfo.Meta().ID)).Check(testkit.Rows("0")) } + +func TestPartitionTableHistoricalStats(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("set global tidb_enable_historical_stats = 1") + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec(`CREATE TABLE t (a int, b int, index idx(b)) +PARTITION BY RANGE ( a ) ( +PARTITION p0 VALUES LESS THAN (6) +)`) + tk.MustExec("delete from mysql.stats_history") + + tk.MustExec("analyze table test.t") + // dump historical stats + h := dom.StatsHandle() + hsWorker := dom.GetHistoricalStatsWorker() + + // assert global table and partition table be dumped + tblID := hsWorker.GetOneHistoricalStatsTable() + err := hsWorker.DumpHistoricalStats(tblID, h) + require.NoError(t, err) + tblID = hsWorker.GetOneHistoricalStatsTable() + err = hsWorker.DumpHistoricalStats(tblID, h) + require.NoError(t, err) + tk.MustQuery("select count(*) from mysql.stats_history").Check(testkit.Rows("2")) +} diff --git a/statistics/handle/dump.go b/statistics/handle/dump.go index 75f4ee9ea958a..a83c6e57ee3c7 100644 --- a/statistics/handle/dump.go +++ b/statistics/handle/dump.go @@ -46,6 +46,7 @@ type JSONTable struct { Count int64 `json:"count"` ModifyCount int64 `json:"modify_count"` Partitions map[string]*JSONTable `json:"partitions"` + Version uint64 `json:"version"` } type jsonExtendedStats struct { @@ -228,6 +229,7 @@ func GenJSONTableFromStats(dbName string, tableInfo *model.TableInfo, tbl *stati Indices: make(map[string]*jsonColumn, len(tbl.Indices)), Count: tbl.Count, ModifyCount: tbl.ModifyCount, + Version: tbl.Version, } for _, col := range tbl.Columns { sc := &stmtctx.StatementContext{TimeZone: time.UTC} diff --git a/statistics/handle/handle.go b/statistics/handle/handle.go index f53f075301acb..b3a9c99298a92 100644 --- a/statistics/handle/handle.go +++ b/statistics/handle/handle.go @@ -2565,17 +2565,27 @@ func (h *Handle) GetPredicateColumns(tableID int64) ([]int64, error) { const maxColumnSize = 6 << 20 // RecordHistoricalStatsToStorage records the given table's stats data to mysql.stats_history -func (h *Handle) RecordHistoricalStatsToStorage(dbName string, tableInfo *model.TableInfo) (uint64, error) { +func (h *Handle) RecordHistoricalStatsToStorage(dbName string, tableInfo *model.TableInfo, physicalID int64, isPartition bool) (uint64, error) { ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) - js, err := h.DumpStatsToJSON(dbName, tableInfo, nil, true) + var js *JSONTable + var err error + if isPartition { + js, err = h.tableStatsToJSON(dbName, tableInfo, physicalID, 0) + } else { + js, err = h.DumpStatsToJSON(dbName, tableInfo, nil, true) + } if err != nil { return 0, errors.Trace(err) } version := uint64(0) - for _, value := range js.Columns { - version = uint64(*value.StatsVer) - if version != 0 { - break + if len(js.Partitions) == 0 { + version = js.Version + } else { + for _, p := range js.Partitions { + version = p.Version + if version != 0 { + break + } } } blocks, err := JSONTableToBlocks(js, maxColumnSize) @@ -2596,7 +2606,7 @@ func (h *Handle) RecordHistoricalStatsToStorage(dbName string, tableInfo *model. const sql = "INSERT INTO mysql.stats_history(table_id, stats_data, seq_no, version, create_time) VALUES (%?, %?, %?, %?, %?)" for i := 0; i < len(blocks); i++ { - if _, err := exec.ExecuteInternal(ctx, sql, tableInfo.ID, blocks[i], i, version, ts); err != nil { + if _, err := exec.ExecuteInternal(ctx, sql, physicalID, blocks[i], i, version, ts); err != nil { return version, errors.Trace(err) } } diff --git a/statistics/handle/handle_test.go b/statistics/handle/handle_test.go index 9bb80498bc90f..ac9936bed11fe 100644 --- a/statistics/handle/handle_test.go +++ b/statistics/handle/handle_test.go @@ -3299,7 +3299,7 @@ func TestRecordHistoricalStatsToStorage(t *testing.T) { tableInfo, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t")) require.NoError(t, err) - version, err := dom.StatsHandle().RecordHistoricalStatsToStorage("t", tableInfo.Meta()) + version, err := dom.StatsHandle().RecordHistoricalStatsToStorage("t", tableInfo.Meta(), tableInfo.Meta().ID, false) require.NoError(t, err) rows := tk.MustQuery(fmt.Sprintf("select count(*) from mysql.stats_history where version = '%d'", version)).Rows()