Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: move some common code on stats to stats_util #47312

Merged
merged 8 commits into from
Sep 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions statistics/handle/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ go_library(
"//statistics/handle/metrics",
"//statistics/handle/storage",
"//statistics/handle/usage",
"//statistics/handle/util",
"//table",
"//types",
"//util",
Expand Down
1 change: 1 addition & 0 deletions statistics/handle/autoanalyze/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ go_library(
"//sessionctx",
"//sessionctx/variable",
"//statistics",
"//statistics/handle/util",
"//util",
"//util/chunk",
"//util/logutil",
Expand Down
3 changes: 2 additions & 1 deletion statistics/handle/autoanalyze/autoanalyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics"
statsutil "github.com/pingcap/tidb/statistics/handle/util"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
Expand Down Expand Up @@ -405,7 +406,7 @@ func execRestrictedSQLWithStatsVer(sctx sessionctx.Context,
opt *Opt,
statsVer int,
sql string, params ...interface{}) ([]chunk.Row, []*ast.ResultField, error) {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
ctx := statsutil.StatsCtx(context.Background())
pruneMode := sctx.GetSessionVars().PartitionPruneMode.Load()
analyzeSnapshot := sctx.GetSessionVars().EnableAnalyzeSnapshot
optFuncs := []sqlexec.OptionFuncAlias{
Expand Down
2 changes: 1 addition & 1 deletion statistics/handle/cache/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//config",
"//kv",
"//parser/model",
"//sessionctx",
"//sessionctx/variable",
Expand All @@ -19,6 +18,7 @@ go_library(
"//statistics/handle/cache/internal/lfu",
"//statistics/handle/cache/internal/mapcache",
"//statistics/handle/cache/internal/metrics",
"//statistics/handle/util",
"//types",
"//util/chunk",
"//util/logutil",
Expand Down
4 changes: 2 additions & 2 deletions statistics/handle/cache/statscacheinner.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"time"

"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
Expand All @@ -31,6 +30,7 @@ import (
"github.com/pingcap/tidb/statistics/handle/cache/internal/lfu"
"github.com/pingcap/tidb/statistics/handle/cache/internal/mapcache"
"github.com/pingcap/tidb/statistics/handle/cache/internal/metrics"
"github.com/pingcap/tidb/statistics/handle/util"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
Expand Down Expand Up @@ -226,7 +226,7 @@ func (c *StatsTableRowCache) GetColLength(id tableHistID) uint64 {
func (c *StatsTableRowCache) Update(ctx context.Context, sctx sessionctx.Context) error {
c.mu.Lock()
defer c.mu.Unlock()
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnStats)
ctx = util.StatsCtx(ctx)
if time.Since(c.modifyTime) < tableStatsCacheExpiry {
if len(c.dirtyIDs) > 0 {
tableRows, err := getRowCountTables(ctx, sctx, c.dirtyIDs...)
Expand Down
33 changes: 11 additions & 22 deletions statistics/handle/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn"
statsutil "github.com/pingcap/tidb/statistics/handle/util"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/sqlexec"
Expand Down Expand Up @@ -134,13 +134,13 @@ func (h *Handle) updateStatsVersion() error {
defer h.pool.Put(se)
exec := se.(sqlexec.SQLExecutor)

ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
ctx := statsutil.StatsCtx(context.Background())
_, err = exec.ExecuteInternal(ctx, "begin")
if err != nil {
return errors.Trace(err)
}
defer func() {
err = finishTransaction(ctx, exec, err)
err = statsutil.FinishTransaction(ctx, exec, err)
}()
startTS, err := getSessionTxnStartTS(se)
if err != nil {
Expand Down Expand Up @@ -271,13 +271,13 @@ func (h *Handle) changeGlobalStatsID(from, to int64) (err error) {
}
defer h.pool.Put(se)
exec := se.(sqlexec.SQLExecutor)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
ctx := statsutil.StatsCtx(context.Background())
_, err = exec.ExecuteInternal(ctx, "begin pessimistic")
if err != nil {
return errors.Trace(err)
}
defer func() {
err = finishTransaction(ctx, exec, err)
err = statsutil.FinishTransaction(ctx, exec, err)
}()
for _, table := range []string{"stats_meta", "stats_top_n", "stats_fm_sketch", "stats_buckets", "stats_histograms", "column_stats_usage"} {
_, err = exec.ExecuteInternal(ctx, "update mysql."+table+" set table_id = %? where table_id = %?", to, from)
Expand Down Expand Up @@ -328,14 +328,14 @@ func (h *Handle) insertTableStats2KV(info *model.TableInfo, physicalID int64) (e
}
defer h.pool.Put(se)
exec := se.(sqlexec.SQLExecutor)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
ctx := statsutil.StatsCtx(context.Background())

_, err = exec.ExecuteInternal(ctx, "begin")
if err != nil {
return errors.Trace(err)
}
defer func() {
err = finishTransaction(ctx, exec, err)
err = statsutil.FinishTransaction(ctx, exec, err)
}()
startTS, err := getSessionTxnStartTS(se)
if err != nil {
Expand Down Expand Up @@ -373,14 +373,14 @@ func (h *Handle) resetTableStats2KVForDrop(physicalID int64) (err error) {
}
defer h.pool.Put(se)
exec := se.(sqlexec.SQLExecutor)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
ctx := statsutil.StatsCtx(context.Background())

_, err = exec.ExecuteInternal(ctx, "begin")
if err != nil {
return errors.Trace(err)
}
defer func() {
err = finishTransaction(ctx, exec, err)
err = statsutil.FinishTransaction(ctx, exec, err)
}()
startTS, err := getSessionTxnStartTS(se)
if err != nil {
Expand Down Expand Up @@ -408,14 +408,14 @@ func (h *Handle) insertColStats2KV(physicalID int64, colInfos []*model.ColumnInf
}
defer h.pool.Put(se)
exec := se.(sqlexec.SQLExecutor)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
ctx := statsutil.StatsCtx(context.Background())

_, err = exec.ExecuteInternal(ctx, "begin")
if err != nil {
return errors.Trace(err)
}
defer func() {
err = finishTransaction(ctx, exec, err)
err = statsutil.FinishTransaction(ctx, exec, err)
}()

startTS, err := getSessionTxnStartTS(se)
Expand Down Expand Up @@ -474,14 +474,3 @@ func (h *Handle) insertColStats2KV(physicalID int64, colInfos []*model.ColumnInf
}
return
}

// finishTransaction will execute `commit` when error is nil, otherwise `rollback`.
func finishTransaction(ctx context.Context, exec sqlexec.SQLExecutor, err error) error {
if err == nil {
_, err = exec.ExecuteInternal(ctx, "commit")
} else {
_, err1 := exec.ExecuteInternal(ctx, "rollback")
terror.Log(errors.Trace(err1))
}
return errors.Trace(err)
}
3 changes: 1 addition & 2 deletions statistics/handle/extstats/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,12 @@ go_library(
importpath = "github.com/pingcap/tidb/statistics/handle/extstats",
visibility = ["//visibility:public"],
deps = [
"//kv",
"//parser/ast",
"//parser/model",
"//parser/terror",
"//sessionctx",
"//statistics",
"//statistics/handle/cache",
"//statistics/handle/util",
"//util/logutil",
"//util/mathutil",
"//util/sqlexec",
Expand Down
43 changes: 11 additions & 32 deletions statistics/handle/extstats/extended_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,12 @@ import (
"slices"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/statistics/handle/cache"
"github.com/pingcap/tidb/statistics/handle/util"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mathutil"
"github.com/pingcap/tidb/util/sqlexec"
Expand Down Expand Up @@ -59,15 +58,14 @@ func InsertExtendedStats(sctx sessionctx.Context,
}
strColIDs := string(bytes)

ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)

ctx := util.StatsCtx(context.Background())
sqlExecutor := exec.(sqlexec.SQLExecutor)
_, err = sqlExecutor.ExecuteInternal(ctx, "begin pessimistic")
if err != nil {
return errors.Trace(err)
}
defer func() {
err = finishTransaction(ctx, sqlExecutor, err)
err = util.FinishTransaction(ctx, sqlExecutor, err)
}()
// No need to use `exec.ExecuteInternal` since we have acquired the lock.
rows, _, err := exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}, "SELECT name, type, column_ids FROM mysql.stats_extended WHERE table_id = %? and status in (%?, %?)", tableID, statistics.ExtendedStatsInited, statistics.ExtendedStatsAnalyzed)
Expand All @@ -88,7 +86,7 @@ func InsertExtendedStats(sctx sessionctx.Context,
return errors.Errorf("extended statistics '%s' with same type on same columns already exists", statsName)
}
}
version, err := getStartTS(sctx)
version, err := util.GetStartTS(sctx)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -127,7 +125,7 @@ func MarkExtendedStatsDeleted(sctx sessionctx.Context,
}
}()
exec := sctx.(sqlexec.RestrictedSQLExecutor)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
ctx := util.StatsCtx(context.Background())
rows, _, err := exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}, "SELECT name FROM mysql.stats_extended WHERE name = %? and table_id = %? and status in (%?, %?)", statsName, tableID, statistics.ExtendedStatsInited, statistics.ExtendedStatsAnalyzed)
if err != nil {
return errors.Trace(err)
Expand All @@ -149,13 +147,13 @@ func MarkExtendedStatsDeleted(sctx sessionctx.Context,
return errors.Trace(err)
}
defer func() {
err1 := finishTransaction(ctx, sqlExec, err)
err1 := util.FinishTransaction(ctx, sqlExec, err)
if err == nil && err1 == nil {
removeExtendedStatsItem(currentCache, updateStatsCache, tableID, statsName)
}
err = err1
}()
version, err := getStartTS(sctx)
version, err := util.GetStartTS(sctx)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -172,7 +170,7 @@ func MarkExtendedStatsDeleted(sctx sessionctx.Context,
// BuildExtendedStats build extended stats for column groups if needed based on the column samples.
func BuildExtendedStats(sctx sessionctx.Context,
tableID int64, cols []*model.ColumnInfo, collectors []*statistics.SampleCollector) (*statistics.ExtendedStatsColl, error) {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
ctx := util.StatsCtx(context.Background())
exec := sctx.(sqlexec.RestrictedSQLExecutor)
const sql = "SELECT name, type, column_ids FROM mysql.stats_extended WHERE table_id = %? and status in (%?, %?)"
rows, _, err := exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}, sql, tableID, statistics.ExtendedStatsAnalyzed, statistics.ExtendedStatsInited)
Expand Down Expand Up @@ -296,16 +294,16 @@ func SaveExtendedStatsToStorage(sctx sessionctx.Context,
}

sqlExec := sctx.(sqlexec.SQLExecutor)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
ctx := util.StatsCtx(context.Background())

_, err = sqlExec.ExecuteInternal(ctx, "begin pessimistic")
if err != nil {
return errors.Trace(err)
}
defer func() {
err = finishTransaction(ctx, sqlExec, err)
err = util.FinishTransaction(ctx, sqlExec, err)
}()
version, err := getStartTS(sctx)
version, err := util.GetStartTS(sctx)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -359,22 +357,3 @@ func removeExtendedStatsItem(currentCache *cache.StatsCache,
}
}
}

// finishTransaction will execute `commit` when error is nil, otherwise `rollback`.
func finishTransaction(ctx context.Context, exec sqlexec.SQLExecutor, err error) error {
if err == nil {
_, err = exec.ExecuteInternal(ctx, "commit")
} else {
_, err1 := exec.ExecuteInternal(ctx, "rollback")
terror.Log(errors.Trace(err1))
}
return errors.Trace(err)
}

func getStartTS(sctx sessionctx.Context) (uint64, error) {
txn, err := sctx.Txn(true)
if err != nil {
return 0, err
}
return txn.StartTS(), nil
}
17 changes: 9 additions & 8 deletions statistics/handle/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/statistics/handle/cache"
"github.com/pingcap/tidb/statistics/handle/lockstats"
"github.com/pingcap/tidb/statistics/handle/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mathutil"
Expand Down Expand Up @@ -244,14 +245,14 @@ func (h *Handle) gcHistoryStatsFromKV(physicalID int64) error {
}
defer h.pool.Put(se)
exec := se.(sqlexec.SQLExecutor)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
ctx := util.StatsCtx(context.Background())

_, err = exec.ExecuteInternal(ctx, "begin pessimistic")
if err != nil {
return errors.Trace(err)
}
defer func() {
err = finishTransaction(ctx, exec, err)
err = util.FinishTransaction(ctx, exec, err)
}()
sql := "delete from mysql.stats_history where table_id = %?"
_, err = exec.ExecuteInternal(ctx, sql, physicalID)
Expand All @@ -271,14 +272,14 @@ func (h *Handle) deleteHistStatsFromKV(physicalID int64, histID int64, isIndex i
}
defer h.pool.Put(se)
exec := se.(sqlexec.SQLExecutor)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
ctx := util.StatsCtx(context.Background())

_, err = exec.ExecuteInternal(ctx, "begin")
if err != nil {
return errors.Trace(err)
}
defer func() {
err = finishTransaction(ctx, exec, err)
err = util.FinishTransaction(ctx, exec, err)
}()
startTS, err := getSessionTxnStartTS(se)
if err != nil {
Expand Down Expand Up @@ -322,13 +323,13 @@ func (h *Handle) DeleteTableStatsFromKV(statsIDs []int64) (err error) {
}
defer h.pool.Put(se)
exec := se.(sqlexec.SQLExecutor)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
ctx := util.StatsCtx(context.Background())
_, err = exec.ExecuteInternal(ctx, "begin")
if err != nil {
return errors.Trace(err)
}
defer func() {
err = finishTransaction(ctx, exec, err)
err = util.FinishTransaction(ctx, exec, err)
}()
startTS, err := getSessionTxnStartTS(se)
if err != nil {
Expand Down Expand Up @@ -374,13 +375,13 @@ func (h *Handle) removeDeletedExtendedStats(version uint64) (err error) {
}
defer h.pool.Put(se)
exec := se.(sqlexec.SQLExecutor)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
ctx := util.StatsCtx(context.Background())
_, err = exec.ExecuteInternal(ctx, "begin pessimistic")
if err != nil {
return errors.Trace(err)
}
defer func() {
err = finishTransaction(ctx, exec, err)
err = util.FinishTransaction(ctx, exec, err)
}()
const sql = "delete from mysql.stats_extended where status = %? and version < %?"
_, err = exec.ExecuteInternal(ctx, sql, statistics.ExtendedStatsDeleted, version)
Expand Down
Loading