diff --git a/pkg/statistics/handle/globalstats/global_stats.go b/pkg/statistics/handle/globalstats/global_stats.go new file mode 100644 index 0000000000000..1e9fbdc201d85 --- /dev/null +++ b/pkg/statistics/handle/globalstats/global_stats.go @@ -0,0 +1,400 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package globalstats + +import ( + "fmt" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/pkg/infoschema" + "github.com/pingcap/tidb/pkg/parser/ast" + "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/statistics" + statslogutil "github.com/pingcap/tidb/pkg/statistics/handle/logutil" + statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types" + "github.com/pingcap/tidb/pkg/statistics/handle/util" + "github.com/pingcap/tidb/pkg/types" + "github.com/pingcap/tidb/pkg/util/logutil" + "github.com/tiancaiamao/gp" + "go.uber.org/zap" +) + +const ( + // MaxPartitionMergeBatchSize indicates the max batch size for a worker to merge partition stats + MaxPartitionMergeBatchSize = 256 +) + +// statsGlobalImpl implements util.StatsGlobal +type statsGlobalImpl struct { + statsHandler statstypes.StatsHandle +} + +// NewStatsGlobal creates a new StatsGlobal. +func NewStatsGlobal(statsHandler statstypes.StatsHandle) statstypes.StatsGlobal { + return &statsGlobalImpl{statsHandler: statsHandler} +} + +// MergePartitionStats2GlobalStatsByTableID merge the partition-level stats to global-level stats based on the tableID. +func (sg *statsGlobalImpl) MergePartitionStats2GlobalStatsByTableID(sc sessionctx.Context, + opts map[ast.AnalyzeOptionType]uint64, is infoschema.InfoSchema, + info *statstypes.GlobalStatsInfo, + physicalID int64, +) (err error) { + globalStats, err := MergePartitionStats2GlobalStatsByTableID(sc, sg.statsHandler, opts, is, physicalID, info.IsIndex == 1, info.HistIDs) + if err != nil { + if types.ErrPartitionStatsMissing.Equal(err) || types.ErrPartitionColumnStatsMissing.Equal(err) { + // When we find some partition-level stats are missing, we need to report warning. + sc.GetSessionVars().StmtCtx.AppendWarning(err) + } + return err + } + return WriteGlobalStatsToStorage(sg.statsHandler, globalStats, info, physicalID) +} + +// GlobalStats is used to store the statistics contained in the global-level stats +// which is generated by the merge of partition-level stats. +// It will both store the column stats and index stats. +// In the column statistics, the variable `num` is equal to the number of columns in the partition table. +// In the index statistics, the variable `num` is always equal to one. +type GlobalStats struct { + Hg []*statistics.Histogram + Cms []*statistics.CMSketch + TopN []*statistics.TopN + Fms []*statistics.FMSketch + MissingPartitionStats []string + Num int + Count int64 + ModifyCount int64 +} + +func newGlobalStats(histCount int) *GlobalStats { + globalStats := new(GlobalStats) + globalStats.Num = histCount + globalStats.Count = 0 + globalStats.Hg = make([]*statistics.Histogram, globalStats.Num) + globalStats.Cms = make([]*statistics.CMSketch, globalStats.Num) + globalStats.TopN = make([]*statistics.TopN, globalStats.Num) + globalStats.Fms = make([]*statistics.FMSketch, globalStats.Num) + + return globalStats +} + +// MergePartitionStats2GlobalStats merge the partition-level stats to global-level stats based on the tableInfo. +func MergePartitionStats2GlobalStats( + sc sessionctx.Context, + statsHandle statstypes.StatsHandle, + opts map[ast.AnalyzeOptionType]uint64, + is infoschema.InfoSchema, + globalTableInfo *model.TableInfo, + isIndex bool, + histIDs []int64, +) (globalStats *GlobalStats, err error) { + if sc.GetSessionVars().EnableAsyncMergeGlobalStats { + statslogutil.SingletonStatsSamplerLogger().Info("use async merge global stats", + zap.Int64("tableID", globalTableInfo.ID), + zap.String("table", globalTableInfo.Name.L), + ) + worker, err := NewAsyncMergePartitionStats2GlobalStats(statsHandle, globalTableInfo, histIDs, is) + if err != nil { + return nil, errors.Trace(err) + } + err = worker.MergePartitionStats2GlobalStats(sc, opts, isIndex) + if err != nil { + return nil, errors.Trace(err) + } + return worker.Result(), nil + } + statslogutil.SingletonStatsSamplerLogger().Info("use blocking merge global stats", + zap.Int64("tableID", globalTableInfo.ID), + zap.String("table", globalTableInfo.Name.L), + ) + return blockingMergePartitionStats2GlobalStats(sc, statsHandle.GPool(), opts, is, globalTableInfo, isIndex, histIDs, nil, statsHandle) +} + +// MergePartitionStats2GlobalStatsByTableID merge the partition-level stats to global-level stats based on the tableID. +func MergePartitionStats2GlobalStatsByTableID( + sc sessionctx.Context, + statsHandle statstypes.StatsHandle, + opts map[ast.AnalyzeOptionType]uint64, + is infoschema.InfoSchema, + tableID int64, + isIndex bool, + histIDs []int64, +) (globalStats *GlobalStats, err error) { + // Get the partition table IDs. + globalTable, ok := statsHandle.TableInfoByID(is, tableID) + if !ok { + err = errors.Errorf("unknown physical ID %d in stats meta table, maybe it has been dropped", tableID) + return + } + + globalTableInfo := globalTable.Meta() + globalStats, err = MergePartitionStats2GlobalStats(sc, statsHandle, opts, is, globalTableInfo, isIndex, histIDs) + if err != nil { + return nil, errors.Trace(err) + } + if len(globalStats.MissingPartitionStats) > 0 { + var item string + if !isIndex { + item = "columns" + } else { + item = "index" + if len(histIDs) > 0 { + item += " " + globalTableInfo.FindIndexNameByID(histIDs[0]) + } + } + + logutil.BgLogger().Warn("missing partition stats when merging global stats", zap.String("table", globalTableInfo.Name.L), + zap.String("item", item), zap.Strings("missing", globalStats.MissingPartitionStats)) + } + return +} + +// analyzeOptionDefault saves the default values of NumBuckets and NumTopN. +// These values will be used in dynamic mode when we drop table partition and then need to merge global-stats. +// These values originally came from the analyzeOptionDefault structure in the planner/core/planbuilder.go file. +var analyzeOptionDefault = map[ast.AnalyzeOptionType]uint64{ + ast.AnalyzeOptNumBuckets: 256, + ast.AnalyzeOptNumTopN: 20, +} + +// blockingMergePartitionStats2GlobalStats merge the partition-level stats to global-level stats based on the tableInfo. +// It is the old algorithm to merge partition-level stats to global-level stats. It will happen the OOM. because it will load all the partition-level stats into memory. +func blockingMergePartitionStats2GlobalStats( + sc sessionctx.Context, + gpool *gp.Pool, + opts map[ast.AnalyzeOptionType]uint64, + is infoschema.InfoSchema, + globalTableInfo *model.TableInfo, + isIndex bool, + histIDs []int64, + allPartitionStats map[int64]*statistics.Table, + statsHandle statstypes.StatsHandle, +) (globalStats *GlobalStats, err error) { + externalCache := false + if allPartitionStats != nil { + externalCache = true + } + + partitionNum := len(globalTableInfo.Partition.Definitions) + if len(histIDs) == 0 { + for _, col := range globalTableInfo.Columns { + // The virtual generated column stats can not be merged to the global stats. + if col.IsVirtualGenerated() { + continue + } + histIDs = append(histIDs, col.ID) + } + } + + // Initialized the globalStats. + globalStats = newGlobalStats(len(histIDs)) + + // Slice Dimensions Explanation + // First dimension: Column or Index Stats + // Second dimension: Partition Tables + // Because all topN and histograms need to be collected before they can be merged. + // So we should store all the partition-level stats first, and merge them together. + allHg := make([][]*statistics.Histogram, globalStats.Num) + allCms := make([][]*statistics.CMSketch, globalStats.Num) + allTopN := make([][]*statistics.TopN, globalStats.Num) + allFms := make([][]*statistics.FMSketch, globalStats.Num) + for i := 0; i < globalStats.Num; i++ { + allHg[i] = make([]*statistics.Histogram, 0, partitionNum) + allCms[i] = make([]*statistics.CMSketch, 0, partitionNum) + allTopN[i] = make([]*statistics.TopN, 0, partitionNum) + allFms[i] = make([]*statistics.FMSketch, 0, partitionNum) + } + + skipMissingPartitionStats := sc.GetSessionVars().SkipMissingPartitionStats + for _, def := range globalTableInfo.Partition.Definitions { + partitionID := def.ID + partitionTable, ok := statsHandle.TableInfoByID(is, partitionID) + if !ok { + err = errors.Errorf("unknown physical ID %d in stats meta table, maybe it has been dropped", partitionID) + return + } + tableInfo := partitionTable.Meta() + var partitionStats *statistics.Table + var okLoad bool + if allPartitionStats != nil { + partitionStats, okLoad = allPartitionStats[partitionID] + } else { + okLoad = false + } + // If pre-load partition stats isn't provided, then we load partition stats directly and set it into allPartitionStats + if !okLoad { + var err1 error + partitionStats, err1 = statsHandle.LoadTablePartitionStats(tableInfo, &def) + if err1 != nil { + if skipMissingPartitionStats && types.ErrPartitionStatsMissing.Equal(err1) { + globalStats.MissingPartitionStats = append(globalStats.MissingPartitionStats, fmt.Sprintf("partition `%s`", def.Name.L)) + continue + } + err = err1 + return + } + if externalCache { + allPartitionStats[partitionID] = partitionStats + } + } + + for i := 0; i < globalStats.Num; i++ { + // GetStatsInfo will return the copy of the statsInfo, so we don't need to worry about the data race. + // partitionStats will be released after the for loop. + hg, cms, topN, fms, analyzed := partitionStats.GetStatsInfo(histIDs[i], isIndex, externalCache) + skipPartition := false + if !analyzed { + var missingPart string + if !isIndex { + missingPart = fmt.Sprintf("partition `%s` column `%s`", def.Name.L, tableInfo.FindColumnNameByID(histIDs[i])) + } else { + missingPart = fmt.Sprintf("partition `%s` index `%s`", def.Name.L, tableInfo.FindIndexNameByID(histIDs[i])) + } + if !skipMissingPartitionStats { + err = types.ErrPartitionStatsMissing.GenWithStackByArgs(fmt.Sprintf("table `%s` %s", tableInfo.Name.L, missingPart)) + return + } + globalStats.MissingPartitionStats = append(globalStats.MissingPartitionStats, missingPart) + skipPartition = true + } + + // Partition stats is not empty but column stats(hist, topN) is missing. + if partitionStats.RealtimeCount > 0 && (hg == nil || hg.TotalRowCount() <= 0) && (topN == nil || topN.TotalCount() <= 0) { + var missingPart string + if !isIndex { + missingPart = fmt.Sprintf("partition `%s` column `%s`", def.Name.L, tableInfo.FindColumnNameByID(histIDs[i])) + } else { + missingPart = fmt.Sprintf("partition `%s` index `%s`", def.Name.L, tableInfo.FindIndexNameByID(histIDs[i])) + } + if !skipMissingPartitionStats { + err = types.ErrPartitionColumnStatsMissing.GenWithStackByArgs(fmt.Sprintf("table `%s` %s", tableInfo.Name.L, missingPart)) + return + } + globalStats.MissingPartitionStats = append(globalStats.MissingPartitionStats, missingPart+" hist and topN") + skipPartition = true + } + + if i == 0 { + // In a partition, we will only update globalStats.Count once. + globalStats.Count += partitionStats.RealtimeCount + globalStats.ModifyCount += partitionStats.ModifyCount + } + + if !skipPartition { + allHg[i] = append(allHg[i], hg) + allCms[i] = append(allCms[i], cms) + allTopN[i] = append(allTopN[i], topN) + allFms[i] = append(allFms[i], fms) + } + } + } + + // After collect all the statistics from the partition-level stats, + // we should merge them together. + for i := 0; i < globalStats.Num; i++ { + if len(allHg[i]) == 0 { + // If all partitions have no stats, we skip merging global stats because it may not handle the case `len(allHg[i]) == 0` + // correctly. It can avoid unexpected behaviors such as nil pointer panic. + continue + } + // FMSketch use many memory, so we first deal with it and then destroy it. + // Merge FMSketch. + // NOTE: allFms maybe contain empty. + globalStats.Fms[i] = allFms[i][0] + for j := 1; j < len(allFms[i]); j++ { + if globalStats.Fms[i] == nil { + globalStats.Fms[i] = allFms[i][j] + } else { + globalStats.Fms[i].MergeFMSketch(allFms[i][j]) + allFms[i][j].DestroyAndPutToPool() + } + } + + // Update the global NDV. + globalStatsNDV := globalStats.Fms[i].NDV() + if globalStatsNDV > globalStats.Count { + globalStatsNDV = globalStats.Count + } + globalStats.Fms[i].DestroyAndPutToPool() + + // Merge CMSketch. + globalStats.Cms[i] = allCms[i][0] + for j := 1; j < len(allCms[i]); j++ { + err = globalStats.Cms[i].MergeCMSketch(allCms[i][j]) + if err != nil { + return + } + } + + // Merge topN. + // Note: We need to merge TopN before merging the histogram. + // Because after merging TopN, some numbers will be left. + // These remaining topN numbers will be used as a separate bucket for later histogram merging. + var poppedTopN []statistics.TopNMeta + wrapper := NewStatsWrapper(allHg[i], allTopN[i]) + globalStats.TopN[i], poppedTopN, allHg[i], err = mergeGlobalStatsTopN(gpool, sc, wrapper, + sc.GetSessionVars().StmtCtx.TimeZone(), sc.GetSessionVars().AnalyzeVersion, uint32(opts[ast.AnalyzeOptNumTopN]), isIndex) + if err != nil { + return + } + + // Merge histogram. + globalStats.Hg[i], err = statistics.MergePartitionHist2GlobalHist(sc.GetSessionVars().StmtCtx, allHg[i], poppedTopN, + int64(opts[ast.AnalyzeOptNumBuckets]), isIndex) + if err != nil { + return + } + + // NOTICE: after merging bucket NDVs have the trend to be underestimated, so for safe we don't use them. + for j := range globalStats.Hg[i].Buckets { + globalStats.Hg[i].Buckets[j].NDV = 0 + } + + globalStats.Hg[i].NDV = globalStatsNDV + } + return +} + +// WriteGlobalStatsToStorage is to write global stats to storage +func WriteGlobalStatsToStorage(statsHandle statstypes.StatsHandle, globalStats *GlobalStats, info *statstypes.GlobalStatsInfo, gid int64) (err error) { + // Dump global-level stats to kv. + for i := 0; i < globalStats.Num; i++ { + hg, cms, topN := globalStats.Hg[i], globalStats.Cms[i], globalStats.TopN[i] + if hg == nil { + // All partitions have no stats so global stats are not created. + continue + } + // fms for global stats doesn't need to dump to kv. + err = statsHandle.SaveStatsToStorage(gid, + globalStats.Count, + globalStats.ModifyCount, + info.IsIndex, + hg, + cms, + topN, + info.StatsVersion, + 1, + true, + util.StatsMetaHistorySourceAnalyze, + ) + if err != nil { + statslogutil.StatsLogger().Error("save global-level stats to storage failed", + zap.Int64("histID", hg.ID), zap.Error(err), zap.Int64("tableID", gid)) + } + } + return err +} diff --git a/pkg/statistics/handle/util/util.go b/pkg/statistics/handle/util/util.go new file mode 100644 index 0000000000000..1db8addc5fae3 --- /dev/null +++ b/pkg/statistics/handle/util/util.go @@ -0,0 +1,293 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + "context" + "strconv" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/parser/ast" + "github.com/pingcap/tidb/pkg/parser/terror" + "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/sessionctx/variable" + "github.com/pingcap/tidb/pkg/util/chunk" + "github.com/pingcap/tidb/pkg/util/intest" + "github.com/pingcap/tidb/pkg/util/sqlexec" + "github.com/pingcap/tidb/pkg/util/sqlexec/mock" + "github.com/pingcap/tipb/go-tipb" + "github.com/tikv/client-go/v2/oracle" +) + +const ( + // StatsMetaHistorySourceAnalyze indicates stats history meta source from analyze + StatsMetaHistorySourceAnalyze = "analyze" + // StatsMetaHistorySourceLoadStats indicates stats history meta source from load stats + StatsMetaHistorySourceLoadStats = "load stats" + // StatsMetaHistorySourceFlushStats indicates stats history meta source from flush stats + StatsMetaHistorySourceFlushStats = "flush stats" + // StatsMetaHistorySourceSchemaChange indicates stats history meta source from schema change + StatsMetaHistorySourceSchemaChange = "schema change" + // StatsMetaHistorySourceExtendedStats indicates stats history meta source from extended stats + StatsMetaHistorySourceExtendedStats = "extended stats" + + // TiDBGlobalStats represents the global-stats for a partitioned table. + TiDBGlobalStats = "global" +) + +var ( + // UseCurrentSessionOpt to make sure the sql is executed in current session. + UseCurrentSessionOpt = []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession} + + // StatsCtx is used to mark the request is from stats module. + StatsCtx = kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) +) + +// finishTransaction will execute `commit` when error is nil, otherwise `rollback`. +func finishTransaction(sctx sessionctx.Context, err error) error { + if err == nil { + _, _, err = ExecRows(sctx, "COMMIT") + } else { + _, _, err1 := ExecRows(sctx, "rollback") + terror.Log(errors.Trace(err1)) + } + return errors.Trace(err) +} + +var ( + // FlagWrapTxn indicates whether to wrap a transaction. + FlagWrapTxn = 0 +) + +// CallWithSCtx allocates a sctx from the pool and call the f(). +func CallWithSCtx(pool SessionPool, f func(sctx sessionctx.Context) error, flags ...int) (err error) { + se, err := pool.Get() + if err != nil { + return err + } + defer func() { + if err == nil { // only recycle when no error + pool.Put(se) + } + }() + sctx := se.(sessionctx.Context) + if err := UpdateSCtxVarsForStats(sctx); err != nil { // update stats variables automatically + return err + } + + wrapTxn := false + for _, flag := range flags { + if flag == FlagWrapTxn { + wrapTxn = true + } + } + if wrapTxn { + err = WrapTxn(sctx, f) + } else { + err = f(sctx) + } + return err +} + +// UpdateSCtxVarsForStats updates all necessary variables that may affect the behavior of statistics. +func UpdateSCtxVarsForStats(sctx sessionctx.Context) error { + // async merge global stats + enableAsyncMergeGlobalStats, err := sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBEnableAsyncMergeGlobalStats) + if err != nil { + return err + } + sctx.GetSessionVars().EnableAsyncMergeGlobalStats = variable.TiDBOptOn(enableAsyncMergeGlobalStats) + + // concurrency of save stats to storage + analyzePartitionConcurrency, err := sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBAnalyzePartitionConcurrency) + if err != nil { + return err + } + c, err := strconv.ParseInt(analyzePartitionConcurrency, 10, 64) + if err != nil { + return err + } + sctx.GetSessionVars().AnalyzePartitionConcurrency = int(c) + + // analyzer version + verInString, err := sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBAnalyzeVersion) + if err != nil { + return err + } + ver, err := strconv.ParseInt(verInString, 10, 64) + if err != nil { + return err + } + sctx.GetSessionVars().AnalyzeVersion = int(ver) + + // enable historical stats + val, err := sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBEnableHistoricalStats) + if err != nil { + return err + } + sctx.GetSessionVars().EnableHistoricalStats = variable.TiDBOptOn(val) + + // partition mode + pruneMode, err := sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBPartitionPruneMode) + if err != nil { + return err + } + sctx.GetSessionVars().PartitionPruneMode.Store(pruneMode) + + // enable analyze snapshot + analyzeSnapshot, err := sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBEnableAnalyzeSnapshot) + if err != nil { + return err + } + sctx.GetSessionVars().EnableAnalyzeSnapshot = variable.TiDBOptOn(analyzeSnapshot) + + // enable skip column types + val, err = sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBAnalyzeSkipColumnTypes) + if err != nil { + return err + } + sctx.GetSessionVars().AnalyzeSkipColumnTypes = variable.ParseAnalyzeSkipColumnTypes(val) + + // skip missing partition stats + val, err = sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBSkipMissingPartitionStats) + if err != nil { + return err + } + sctx.GetSessionVars().SkipMissingPartitionStats = variable.TiDBOptOn(val) + verInString, err = sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBMergePartitionStatsConcurrency) + if err != nil { + return err + } + ver, err = strconv.ParseInt(verInString, 10, 64) + if err != nil { + return err + } + sctx.GetSessionVars().AnalyzePartitionMergeConcurrency = int(ver) + return nil +} + +// GetCurrentPruneMode returns the current latest partitioning table prune mode. +func GetCurrentPruneMode(pool SessionPool) (mode string, err error) { + err = CallWithSCtx(pool, func(sctx sessionctx.Context) error { + mode = sctx.GetSessionVars().PartitionPruneMode.Load() + return nil + }) + return +} + +// WrapTxn uses a transaction here can let different SQLs in this operation have the same data visibility. +func WrapTxn(sctx sessionctx.Context, f func(sctx sessionctx.Context) error) (err error) { + // TODO: check whether this sctx is already in a txn + if _, _, err := ExecRows(sctx, "BEGIN PESSIMISTIC"); err != nil { + return err + } + defer func() { + err = finishTransaction(sctx, err) + }() + err = f(sctx) + return +} + +// GetStartTS gets the start ts from current transaction. +func GetStartTS(sctx sessionctx.Context) (uint64, error) { + txn, err := sctx.Txn(true) + if err != nil { + return 0, err + } + return txn.StartTS(), nil +} + +// Exec is a helper function to execute sql and return RecordSet. +func Exec(sctx sessionctx.Context, sql string, args ...any) (sqlexec.RecordSet, error) { + sqlExec := sctx.GetSQLExecutor() + // TODO: use RestrictedSQLExecutor + ExecOptionUseCurSession instead of SQLExecutor + return sqlExec.ExecuteInternal(StatsCtx, sql, args...) +} + +// ExecRows is a helper function to execute sql and return rows and fields. +func ExecRows(sctx sessionctx.Context, sql string, args ...any) (rows []chunk.Row, fields []*ast.ResultField, err error) { + if intest.InTest { + if v := sctx.Value(mock.RestrictedSQLExecutorKey{}); v != nil { + return v.(*mock.MockRestrictedSQLExecutor).ExecRestrictedSQL(StatsCtx, + UseCurrentSessionOpt, sql, args...) + } + } + + sqlExec := sctx.GetRestrictedSQLExecutor() + return sqlExec.ExecRestrictedSQL(StatsCtx, UseCurrentSessionOpt, sql, args...) +} + +// ExecWithOpts is a helper function to execute sql and return rows and fields. +func ExecWithOpts(sctx sessionctx.Context, opts []sqlexec.OptionFuncAlias, sql string, args ...any) (rows []chunk.Row, fields []*ast.ResultField, err error) { + sqlExec := sctx.GetRestrictedSQLExecutor() + return sqlExec.ExecRestrictedSQL(StatsCtx, opts, sql, args...) +} + +// DurationToTS converts duration to timestamp. +func DurationToTS(d time.Duration) uint64 { + return oracle.ComposeTS(d.Nanoseconds()/int64(time.Millisecond), 0) +} + +// JSONTable is used for dumping statistics. +type JSONTable struct { + Columns map[string]*JSONColumn `json:"columns"` + Indices map[string]*JSONColumn `json:"indices"` + Partitions map[string]*JSONTable `json:"partitions"` + DatabaseName string `json:"database_name"` + TableName string `json:"table_name"` + ExtStats []*JSONExtendedStats `json:"ext_stats"` + Count int64 `json:"count"` + ModifyCount int64 `json:"modify_count"` + Version uint64 `json:"version"` + IsHistoricalStats bool `json:"is_historical_stats"` +} + +// JSONExtendedStats is used for dumping extended statistics. +type JSONExtendedStats struct { + StatsName string `json:"stats_name"` + StringVals string `json:"string_vals"` + ColIDs []int64 `json:"cols"` + ScalarVals float64 `json:"scalar_vals"` + Tp uint8 `json:"type"` +} + +// JSONColumn is used for dumping statistics. +type JSONColumn struct { + Histogram *tipb.Histogram `json:"histogram"` + CMSketch *tipb.CMSketch `json:"cm_sketch"` + FMSketch *tipb.FMSketch `json:"fm_sketch"` + // StatsVer is a pointer here since the old version json file would not contain version information. + StatsVer *int64 `json:"stats_ver"` + NullCount int64 `json:"null_count"` + TotColSize int64 `json:"tot_col_size"` + LastUpdateVersion uint64 `json:"last_update_version"` + Correlation float64 `json:"correlation"` +} + +// TotalMemoryUsage returns the total memory usage of this column. +func (col *JSONColumn) TotalMemoryUsage() (size int64) { + if col.Histogram != nil { + size += int64(col.Histogram.Size()) + } + if col.CMSketch != nil { + size += int64(col.CMSketch.Size()) + } + if col.FMSketch != nil { + size += int64(col.FMSketch.Size()) + } + return size +}