Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statistics: support historical stats dump partition table #40310

Merged
merged 7 commits into from
Jan 5, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions domain/historical_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package domain
import (
"github.com/pingcap/errors"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/statistics/handle"
)
Expand Down Expand Up @@ -48,18 +49,27 @@ func (w *HistoricalStatsWorker) DumpHistoricalStats(tableID int64, statsHandle *
}
sctx := w.sctx
is := GetDomain(sctx).InfoSchema()
isPartition := false
var tblInfo *model.TableInfo
tbl, existed := is.TableByID(tableID)
if !existed {
return errors.Errorf("cannot get table by id %d", tableID)
tbl, db, p := is.FindTableByPartitionID(tableID)
if tbl != nil && db != nil && p != nil {
isPartition = true
tblInfo = tbl.Meta()
} else {
return errors.Errorf("cannot get table by id %d", tableID)
}
} else {
tblInfo = tbl.Meta()
}
tblInfo := tbl.Meta()
dbInfo, existed := is.SchemaByTable(tblInfo)
if !existed {
return errors.Errorf("cannot get DBInfo by TableID %d", tableID)
}
if _, err := statsHandle.RecordHistoricalStatsToStorage(dbInfo.Name.O, tblInfo); err != nil {
if _, err := statsHandle.RecordHistoricalStatsToStorage(dbInfo.Name.O, tblInfo, tableID, isPartition); err != nil {
generateHistoricalStatsFailedCounter.Inc()
return errors.Errorf("record table %s.%s's historical stats failed", dbInfo.Name.O, tblInfo.Name.O)
return errors.Errorf("record table %s.%s's historical stats failed, err:%v", dbInfo.Name.O, tblInfo.Name.O, err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need generateHistoricalStatsFailedCounter here anymore?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.

}
generateHistoricalStatsSuccessCounter.Inc()
return nil
Expand Down
22 changes: 18 additions & 4 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,8 @@ func (e *AnalyzeExec) handleResultsError(ctx context.Context, concurrency int, n
}
}

tableIDs := map[int64]struct{}{}

// save analyze results in single-thread.
statsHandle := domain.GetDomain(e.ctx).StatsHandle()
panicCnt := 0
Expand All @@ -311,6 +313,7 @@ func (e *AnalyzeExec) handleResultsError(ctx context.Context, concurrency int, n
continue
}
handleGlobalStats(needGlobalStats, globalStatsMap, results)
tableIDs[results.TableID.GetStatisticsID()] = struct{}{}

if err1 := statsHandle.SaveTableStatsToStorage(results, e.ctx.GetSessionVars().EnableAnalyzeSnapshot, handle.StatsMetaHistorySourceAnalyze); err1 != nil {
tableID := results.TableID.TableID
Expand All @@ -319,17 +322,20 @@ func (e *AnalyzeExec) handleResultsError(ctx context.Context, concurrency int, n
finishJobWithLog(e.ctx, results.Job, err)
} else {
finishJobWithLog(e.ctx, results.Job, nil)
// Dump stats to historical storage.
if err := recordHistoricalStats(e.ctx, results.TableID.TableID); err != nil {
logutil.BgLogger().Error("record historical stats failed", zap.Error(err))
}
}
invalidInfoSchemaStatCache(results.TableID.GetStatisticsID())
if atomic.LoadUint32(&e.ctx.GetSessionVars().Killed) == 1 {
finishJobWithLog(e.ctx, results.Job, ErrQueryInterrupted)
return errors.Trace(ErrQueryInterrupted)
}
}
// Dump stats to historical storage.
for tableID := range tableIDs {
if err := recordHistoricalStats(e.ctx, tableID); err != nil {
logutil.BgLogger().Error("record historical stats failed", zap.Error(err))
}
}

return err
}

Expand All @@ -348,6 +354,7 @@ func (e *AnalyzeExec) handleResultsErrorWithConcurrency(ctx context.Context, sta
worker.run(ctx1, e.ctx.GetSessionVars().EnableAnalyzeSnapshot)
})
}
tableIDs := map[int64]struct{}{}
panicCnt := 0
var err error
for panicCnt < statsConcurrency {
Expand All @@ -370,6 +377,7 @@ func (e *AnalyzeExec) handleResultsErrorWithConcurrency(ctx context.Context, sta
continue
}
handleGlobalStats(needGlobalStats, globalStatsMap, results)
tableIDs[results.TableID.GetStatisticsID()] = struct{}{}
saveResultsCh <- results
}
close(saveResultsCh)
Expand All @@ -382,6 +390,12 @@ func (e *AnalyzeExec) handleResultsErrorWithConcurrency(ctx context.Context, sta
}
err = errors.New(strings.Join(errMsg, ","))
}
for tableID := range tableIDs {
// Dump stats to historical storage.
if err := recordHistoricalStats(e.ctx, tableID); err != nil {
logutil.BgLogger().Error("record historical stats failed", zap.Error(err))
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though we save stats concurrently by several workers, the logic of recording historical stats is sequential. Will we improve it in the future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dumping historical stats is worked by multiple worker in background, thus it's not sequential

return err
}

Expand Down
12 changes: 8 additions & 4 deletions executor/analyze_global_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ func (e *AnalyzeExec) handleGlobalStats(ctx context.Context, needGlobalStats boo
globalStatsTableIDs[globalStatsID.tableID] = struct{}{}
}
statsHandle := domain.GetDomain(e.ctx).StatsHandle()
tableIDs := map[int64]struct{}{}
for tableID := range globalStatsTableIDs {
tableIDs[tableID] = struct{}{}
tableAllPartitionStats := make(map[int64]*statistics.Table)
for globalStatsID, info := range globalStatsMap {
if globalStatsID.tableID != tableID {
Expand Down Expand Up @@ -101,16 +103,18 @@ func (e *AnalyzeExec) handleGlobalStats(ctx context.Context, needGlobalStats boo
logutil.Logger(ctx).Error("save global-level stats to storage failed", zap.String("info", job.JobInfo),
zap.Int64("histID", hg.ID), zap.Error(err), zap.Int64("tableID", tableID))
}
// Dump stats to historical storage.
if err1 := recordHistoricalStats(e.ctx, globalStatsID.tableID); err1 != nil {
logutil.BgLogger().Error("record historical stats failed", zap.String("info", job.JobInfo), zap.Int64("histID", hg.ID), zap.Error(err1))
}
}
return err
}()
FinishAnalyzeMergeJob(e.ctx, job, mergeStatsErr)
}
}
for tableID := range tableIDs {
// Dump stats to historical storage.
if err := recordHistoricalStats(e.ctx, tableID); err != nil {
logutil.BgLogger().Error("record historical stats failed", zap.Error(err))
}
}
return nil
}

Expand Down
4 changes: 0 additions & 4 deletions executor/analyze_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,6 @@ func (worker *analyzeSaveStatsWorker) run(ctx context.Context, analyzeSnapshot b
worker.errCh <- err
} else {
finishJobWithLog(worker.sctx, results.Job, nil)
// Dump stats to historical storage.
if err := recordHistoricalStats(worker.sctx, results.TableID.TableID); err != nil {
logutil.BgLogger().Error("record historical stats failed", zap.Error(err))
}
}
invalidInfoSchemaStatCache(results.TableID.GetStatisticsID())
if err != nil {
Expand Down
27 changes: 27 additions & 0 deletions executor/historical_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,3 +216,30 @@ func TestGCOutdatedHistoryStats(t *testing.T) {
tk.MustQuery(fmt.Sprintf("select count(*) from mysql.stats_history where table_id = '%d'",
tableInfo.Meta().ID)).Check(testkit.Rows("0"))
}

func TestPartitionTableHistoricalStats(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_historical_stats = 1")
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec(`CREATE TABLE t (a int, b int, index idx(b))
PARTITION BY RANGE ( a ) (
PARTITION p0 VALUES LESS THAN (6)
)`)
tk.MustExec("delete from mysql.stats_history")

tk.MustExec("analyze table test.t")
// dump historical stats
h := dom.StatsHandle()
hsWorker := dom.GetHistoricalStatsWorker()

// assert global table and partition table be dumped
tblID := hsWorker.GetOneHistoricalStatsTable()
err := hsWorker.DumpHistoricalStats(tblID, h)
require.NoError(t, err)
tblID = hsWorker.GetOneHistoricalStatsTable()
err = hsWorker.DumpHistoricalStats(tblID, h)
require.NoError(t, err)
tk.MustQuery("select count(*) from mysql.stats_history").Check(testkit.Rows("2"))
}
2 changes: 2 additions & 0 deletions statistics/handle/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type JSONTable struct {
Count int64 `json:"count"`
ModifyCount int64 `json:"modify_count"`
Partitions map[string]*JSONTable `json:"partitions"`
Version uint64 `json:"version"`
}

type jsonExtendedStats struct {
Expand Down Expand Up @@ -228,6 +229,7 @@ func GenJSONTableFromStats(dbName string, tableInfo *model.TableInfo, tbl *stati
Indices: make(map[string]*jsonColumn, len(tbl.Indices)),
Count: tbl.Count,
ModifyCount: tbl.ModifyCount,
Version: tbl.Version,
}
for _, col := range tbl.Columns {
sc := &stmtctx.StatementContext{TimeZone: time.UTC}
Expand Down
24 changes: 17 additions & 7 deletions statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -2565,17 +2565,27 @@ func (h *Handle) GetPredicateColumns(tableID int64) ([]int64, error) {
const maxColumnSize = 6 << 20

// RecordHistoricalStatsToStorage records the given table's stats data to mysql.stats_history
func (h *Handle) RecordHistoricalStatsToStorage(dbName string, tableInfo *model.TableInfo) (uint64, error) {
func (h *Handle) RecordHistoricalStatsToStorage(dbName string, tableInfo *model.TableInfo, physicalID int64, isPartition bool) (uint64, error) {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
js, err := h.DumpStatsToJSON(dbName, tableInfo, nil, true)
var js *JSONTable
var err error
if isPartition {
js, err = h.tableStatsToJSON(dbName, tableInfo, physicalID, 0)
} else {
js, err = h.DumpStatsToJSON(dbName, tableInfo, nil, true)
Comment on lines +2573 to +2575
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tableStatsToJSON and DumpStatsToJSON seem very similar... Can we refactor them and merge them into one function after this PR?

}
if err != nil {
return 0, errors.Trace(err)
}
version := uint64(0)
for _, value := range js.Columns {
version = uint64(*value.StatsVer)
if version != 0 {
break
if len(js.Partitions) == 0 {
version = js.Version
} else {
for _, p := range js.Partitions {
version = p.Version
if version != 0 {
break
}
}
}
blocks, err := JSONTableToBlocks(js, maxColumnSize)
Expand All @@ -2596,7 +2606,7 @@ func (h *Handle) RecordHistoricalStatsToStorage(dbName string, tableInfo *model.

const sql = "INSERT INTO mysql.stats_history(table_id, stats_data, seq_no, version, create_time) VALUES (%?, %?, %?, %?, %?)"
for i := 0; i < len(blocks); i++ {
if _, err := exec.ExecuteInternal(ctx, sql, tableInfo.ID, blocks[i], i, version, ts); err != nil {
if _, err := exec.ExecuteInternal(ctx, sql, physicalID, blocks[i], i, version, ts); err != nil {
return version, errors.Trace(err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion statistics/handle/handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3299,7 +3299,7 @@ func TestRecordHistoricalStatsToStorage(t *testing.T) {

tableInfo, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(t, err)
version, err := dom.StatsHandle().RecordHistoricalStatsToStorage("t", tableInfo.Meta())
version, err := dom.StatsHandle().RecordHistoricalStatsToStorage("t", tableInfo.Meta(), tableInfo.Meta().ID, false)
require.NoError(t, err)

rows := tk.MustQuery(fmt.Sprintf("select count(*) from mysql.stats_history where version = '%d'", version)).Rows()
Expand Down