Skip to content

Commit

Permalink
planner: move some common code on stats to stats_util (#47312)
Browse files Browse the repository at this point in the history
ref #46905
  • Loading branch information
qw4990 authored Sep 27, 2023
1 parent c98f398 commit 0cc2f5c
Show file tree
Hide file tree
Showing 24 changed files with 169 additions and 158 deletions.
1 change: 1 addition & 0 deletions statistics/handle/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ go_library(
"//statistics/handle/metrics",
"//statistics/handle/storage",
"//statistics/handle/usage",
"//statistics/handle/util",
"//table",
"//types",
"//util",
Expand Down
1 change: 1 addition & 0 deletions statistics/handle/autoanalyze/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ go_library(
"//sessionctx",
"//sessionctx/variable",
"//statistics",
"//statistics/handle/util",
"//util",
"//util/chunk",
"//util/logutil",
Expand Down
3 changes: 2 additions & 1 deletion statistics/handle/autoanalyze/autoanalyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics"
statsutil "github.com/pingcap/tidb/statistics/handle/util"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
Expand Down Expand Up @@ -405,7 +406,7 @@ func execRestrictedSQLWithStatsVer(sctx sessionctx.Context,
opt *Opt,
statsVer int,
sql string, params ...interface{}) ([]chunk.Row, []*ast.ResultField, error) {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
ctx := statsutil.StatsCtx(context.Background())
pruneMode := sctx.GetSessionVars().PartitionPruneMode.Load()
analyzeSnapshot := sctx.GetSessionVars().EnableAnalyzeSnapshot
optFuncs := []sqlexec.OptionFuncAlias{
Expand Down
2 changes: 1 addition & 1 deletion statistics/handle/cache/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//config",
"//kv",
"//parser/model",
"//sessionctx",
"//sessionctx/variable",
Expand All @@ -19,6 +18,7 @@ go_library(
"//statistics/handle/cache/internal/lfu",
"//statistics/handle/cache/internal/mapcache",
"//statistics/handle/cache/internal/metrics",
"//statistics/handle/util",
"//types",
"//util/chunk",
"//util/logutil",
Expand Down
4 changes: 2 additions & 2 deletions statistics/handle/cache/statscacheinner.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"time"

"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
Expand All @@ -31,6 +30,7 @@ import (
"github.com/pingcap/tidb/statistics/handle/cache/internal/lfu"
"github.com/pingcap/tidb/statistics/handle/cache/internal/mapcache"
"github.com/pingcap/tidb/statistics/handle/cache/internal/metrics"
"github.com/pingcap/tidb/statistics/handle/util"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
Expand Down Expand Up @@ -226,7 +226,7 @@ func (c *StatsTableRowCache) GetColLength(id tableHistID) uint64 {
func (c *StatsTableRowCache) Update(ctx context.Context, sctx sessionctx.Context) error {
c.mu.Lock()
defer c.mu.Unlock()
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnStats)
ctx = util.StatsCtx(ctx)
if time.Since(c.modifyTime) < tableStatsCacheExpiry {
if len(c.dirtyIDs) > 0 {
tableRows, err := getRowCountTables(ctx, sctx, c.dirtyIDs...)
Expand Down
33 changes: 11 additions & 22 deletions statistics/handle/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn"
statsutil "github.com/pingcap/tidb/statistics/handle/util"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/sqlexec"
Expand Down Expand Up @@ -134,13 +134,13 @@ func (h *Handle) updateStatsVersion() error {
defer h.pool.Put(se)
exec := se.(sqlexec.SQLExecutor)

ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
ctx := statsutil.StatsCtx(context.Background())
_, err = exec.ExecuteInternal(ctx, "begin")
if err != nil {
return errors.Trace(err)
}
defer func() {
err = finishTransaction(ctx, exec, err)
err = statsutil.FinishTransaction(ctx, exec, err)
}()
startTS, err := getSessionTxnStartTS(se)
if err != nil {
Expand Down Expand Up @@ -271,13 +271,13 @@ func (h *Handle) changeGlobalStatsID(from, to int64) (err error) {
}
defer h.pool.Put(se)
exec := se.(sqlexec.SQLExecutor)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
ctx := statsutil.StatsCtx(context.Background())
_, err = exec.ExecuteInternal(ctx, "begin pessimistic")
if err != nil {
return errors.Trace(err)
}
defer func() {
err = finishTransaction(ctx, exec, err)
err = statsutil.FinishTransaction(ctx, exec, err)
}()
for _, table := range []string{"stats_meta", "stats_top_n", "stats_fm_sketch", "stats_buckets", "stats_histograms", "column_stats_usage"} {
_, err = exec.ExecuteInternal(ctx, "update mysql."+table+" set table_id = %? where table_id = %?", to, from)
Expand Down Expand Up @@ -328,14 +328,14 @@ func (h *Handle) insertTableStats2KV(info *model.TableInfo, physicalID int64) (e
}
defer h.pool.Put(se)
exec := se.(sqlexec.SQLExecutor)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
ctx := statsutil.StatsCtx(context.Background())

_, err = exec.ExecuteInternal(ctx, "begin")
if err != nil {
return errors.Trace(err)
}
defer func() {
err = finishTransaction(ctx, exec, err)
err = statsutil.FinishTransaction(ctx, exec, err)
}()
startTS, err := getSessionTxnStartTS(se)
if err != nil {
Expand Down Expand Up @@ -373,14 +373,14 @@ func (h *Handle) resetTableStats2KVForDrop(physicalID int64) (err error) {
}
defer h.pool.Put(se)
exec := se.(sqlexec.SQLExecutor)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
ctx := statsutil.StatsCtx(context.Background())

_, err = exec.ExecuteInternal(ctx, "begin")
if err != nil {
return errors.Trace(err)
}
defer func() {
err = finishTransaction(ctx, exec, err)
err = statsutil.FinishTransaction(ctx, exec, err)
}()
startTS, err := getSessionTxnStartTS(se)
if err != nil {
Expand Down Expand Up @@ -408,14 +408,14 @@ func (h *Handle) insertColStats2KV(physicalID int64, colInfos []*model.ColumnInf
}
defer h.pool.Put(se)
exec := se.(sqlexec.SQLExecutor)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
ctx := statsutil.StatsCtx(context.Background())

_, err = exec.ExecuteInternal(ctx, "begin")
if err != nil {
return errors.Trace(err)
}
defer func() {
err = finishTransaction(ctx, exec, err)
err = statsutil.FinishTransaction(ctx, exec, err)
}()

startTS, err := getSessionTxnStartTS(se)
Expand Down Expand Up @@ -474,14 +474,3 @@ func (h *Handle) insertColStats2KV(physicalID int64, colInfos []*model.ColumnInf
}
return
}

// finishTransaction will execute `commit` when error is nil, otherwise `rollback`.
func finishTransaction(ctx context.Context, exec sqlexec.SQLExecutor, err error) error {
if err == nil {
_, err = exec.ExecuteInternal(ctx, "commit")
} else {
_, err1 := exec.ExecuteInternal(ctx, "rollback")
terror.Log(errors.Trace(err1))
}
return errors.Trace(err)
}
3 changes: 1 addition & 2 deletions statistics/handle/extstats/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,12 @@ go_library(
importpath = "github.com/pingcap/tidb/statistics/handle/extstats",
visibility = ["//visibility:public"],
deps = [
"//kv",
"//parser/ast",
"//parser/model",
"//parser/terror",
"//sessionctx",
"//statistics",
"//statistics/handle/cache",
"//statistics/handle/util",
"//util/logutil",
"//util/mathutil",
"//util/sqlexec",
Expand Down
43 changes: 11 additions & 32 deletions statistics/handle/extstats/extended_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,12 @@ import (
"slices"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/statistics/handle/cache"
"github.com/pingcap/tidb/statistics/handle/util"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mathutil"
"github.com/pingcap/tidb/util/sqlexec"
Expand Down Expand Up @@ -59,15 +58,14 @@ func InsertExtendedStats(sctx sessionctx.Context,
}
strColIDs := string(bytes)

ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)

ctx := util.StatsCtx(context.Background())
sqlExecutor := exec.(sqlexec.SQLExecutor)
_, err = sqlExecutor.ExecuteInternal(ctx, "begin pessimistic")
if err != nil {
return errors.Trace(err)
}
defer func() {
err = finishTransaction(ctx, sqlExecutor, err)
err = util.FinishTransaction(ctx, sqlExecutor, err)
}()
// No need to use `exec.ExecuteInternal` since we have acquired the lock.
rows, _, err := exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}, "SELECT name, type, column_ids FROM mysql.stats_extended WHERE table_id = %? and status in (%?, %?)", tableID, statistics.ExtendedStatsInited, statistics.ExtendedStatsAnalyzed)
Expand All @@ -88,7 +86,7 @@ func InsertExtendedStats(sctx sessionctx.Context,
return errors.Errorf("extended statistics '%s' with same type on same columns already exists", statsName)
}
}
version, err := getStartTS(sctx)
version, err := util.GetStartTS(sctx)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -127,7 +125,7 @@ func MarkExtendedStatsDeleted(sctx sessionctx.Context,
}
}()
exec := sctx.(sqlexec.RestrictedSQLExecutor)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
ctx := util.StatsCtx(context.Background())
rows, _, err := exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}, "SELECT name FROM mysql.stats_extended WHERE name = %? and table_id = %? and status in (%?, %?)", statsName, tableID, statistics.ExtendedStatsInited, statistics.ExtendedStatsAnalyzed)
if err != nil {
return errors.Trace(err)
Expand All @@ -149,13 +147,13 @@ func MarkExtendedStatsDeleted(sctx sessionctx.Context,
return errors.Trace(err)
}
defer func() {
err1 := finishTransaction(ctx, sqlExec, err)
err1 := util.FinishTransaction(ctx, sqlExec, err)
if err == nil && err1 == nil {
removeExtendedStatsItem(currentCache, updateStatsCache, tableID, statsName)
}
err = err1
}()
version, err := getStartTS(sctx)
version, err := util.GetStartTS(sctx)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -172,7 +170,7 @@ func MarkExtendedStatsDeleted(sctx sessionctx.Context,
// BuildExtendedStats build extended stats for column groups if needed based on the column samples.
func BuildExtendedStats(sctx sessionctx.Context,
tableID int64, cols []*model.ColumnInfo, collectors []*statistics.SampleCollector) (*statistics.ExtendedStatsColl, error) {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
ctx := util.StatsCtx(context.Background())
exec := sctx.(sqlexec.RestrictedSQLExecutor)
const sql = "SELECT name, type, column_ids FROM mysql.stats_extended WHERE table_id = %? and status in (%?, %?)"
rows, _, err := exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}, sql, tableID, statistics.ExtendedStatsAnalyzed, statistics.ExtendedStatsInited)
Expand Down Expand Up @@ -296,16 +294,16 @@ func SaveExtendedStatsToStorage(sctx sessionctx.Context,
}

sqlExec := sctx.(sqlexec.SQLExecutor)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
ctx := util.StatsCtx(context.Background())

_, err = sqlExec.ExecuteInternal(ctx, "begin pessimistic")
if err != nil {
return errors.Trace(err)
}
defer func() {
err = finishTransaction(ctx, sqlExec, err)
err = util.FinishTransaction(ctx, sqlExec, err)
}()
version, err := getStartTS(sctx)
version, err := util.GetStartTS(sctx)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -359,22 +357,3 @@ func removeExtendedStatsItem(currentCache *cache.StatsCache,
}
}
}

// finishTransaction will execute `commit` when error is nil, otherwise `rollback`.
func finishTransaction(ctx context.Context, exec sqlexec.SQLExecutor, err error) error {
if err == nil {
_, err = exec.ExecuteInternal(ctx, "commit")
} else {
_, err1 := exec.ExecuteInternal(ctx, "rollback")
terror.Log(errors.Trace(err1))
}
return errors.Trace(err)
}

func getStartTS(sctx sessionctx.Context) (uint64, error) {
txn, err := sctx.Txn(true)
if err != nil {
return 0, err
}
return txn.StartTS(), nil
}
17 changes: 9 additions & 8 deletions statistics/handle/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/statistics/handle/cache"
"github.com/pingcap/tidb/statistics/handle/lockstats"
"github.com/pingcap/tidb/statistics/handle/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mathutil"
Expand Down Expand Up @@ -244,14 +245,14 @@ func (h *Handle) gcHistoryStatsFromKV(physicalID int64) error {
}
defer h.pool.Put(se)
exec := se.(sqlexec.SQLExecutor)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
ctx := util.StatsCtx(context.Background())

_, err = exec.ExecuteInternal(ctx, "begin pessimistic")
if err != nil {
return errors.Trace(err)
}
defer func() {
err = finishTransaction(ctx, exec, err)
err = util.FinishTransaction(ctx, exec, err)
}()
sql := "delete from mysql.stats_history where table_id = %?"
_, err = exec.ExecuteInternal(ctx, sql, physicalID)
Expand All @@ -271,14 +272,14 @@ func (h *Handle) deleteHistStatsFromKV(physicalID int64, histID int64, isIndex i
}
defer h.pool.Put(se)
exec := se.(sqlexec.SQLExecutor)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
ctx := util.StatsCtx(context.Background())

_, err = exec.ExecuteInternal(ctx, "begin")
if err != nil {
return errors.Trace(err)
}
defer func() {
err = finishTransaction(ctx, exec, err)
err = util.FinishTransaction(ctx, exec, err)
}()
startTS, err := getSessionTxnStartTS(se)
if err != nil {
Expand Down Expand Up @@ -322,13 +323,13 @@ func (h *Handle) DeleteTableStatsFromKV(statsIDs []int64) (err error) {
}
defer h.pool.Put(se)
exec := se.(sqlexec.SQLExecutor)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
ctx := util.StatsCtx(context.Background())
_, err = exec.ExecuteInternal(ctx, "begin")
if err != nil {
return errors.Trace(err)
}
defer func() {
err = finishTransaction(ctx, exec, err)
err = util.FinishTransaction(ctx, exec, err)
}()
startTS, err := getSessionTxnStartTS(se)
if err != nil {
Expand Down Expand Up @@ -374,13 +375,13 @@ func (h *Handle) removeDeletedExtendedStats(version uint64) (err error) {
}
defer h.pool.Put(se)
exec := se.(sqlexec.SQLExecutor)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
ctx := util.StatsCtx(context.Background())
_, err = exec.ExecuteInternal(ctx, "begin pessimistic")
if err != nil {
return errors.Trace(err)
}
defer func() {
err = finishTransaction(ctx, exec, err)
err = util.FinishTransaction(ctx, exec, err)
}()
const sql = "delete from mysql.stats_extended where status = %? and version < %?"
_, err = exec.ExecuteInternal(ctx, sql, statistics.ExtendedStatsDeleted, version)
Expand Down
Loading

0 comments on commit 0cc2f5c

Please sign in to comment.