diff --git a/domain/domain.go b/domain/domain.go index fca482e873b28..91b020caeed3b 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -35,6 +35,7 @@ import ( "github.com/pingcap/tidb/bindinfo" "github.com/pingcap/tidb/br/pkg/streamhelper" "github.com/pingcap/tidb/br/pkg/streamhelper/daemon" +<<<<<<< HEAD:domain/domain.go "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/ddl/placement" @@ -73,6 +74,66 @@ import ( "github.com/pingcap/tidb/util/memoryusagealarm" "github.com/pingcap/tidb/util/servermemorylimit" "github.com/pingcap/tidb/util/sqlexec" +======= + "github.com/pingcap/tidb/pkg/bindinfo" + "github.com/pingcap/tidb/pkg/config" + "github.com/pingcap/tidb/pkg/ddl" + "github.com/pingcap/tidb/pkg/ddl/placement" + "github.com/pingcap/tidb/pkg/ddl/schematracker" + "github.com/pingcap/tidb/pkg/ddl/systable" + ddlutil "github.com/pingcap/tidb/pkg/ddl/util" + "github.com/pingcap/tidb/pkg/disttask/framework/scheduler" + "github.com/pingcap/tidb/pkg/disttask/framework/storage" + "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor" + "github.com/pingcap/tidb/pkg/domain/globalconfigsync" + "github.com/pingcap/tidb/pkg/domain/infosync" + "github.com/pingcap/tidb/pkg/domain/resourcegroup" + "github.com/pingcap/tidb/pkg/errno" + "github.com/pingcap/tidb/pkg/infoschema" + infoschema_metrics "github.com/pingcap/tidb/pkg/infoschema/metrics" + "github.com/pingcap/tidb/pkg/infoschema/perfschema" + "github.com/pingcap/tidb/pkg/keyspace" + "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/meta" + "github.com/pingcap/tidb/pkg/meta/autoid" + "github.com/pingcap/tidb/pkg/metrics" + "github.com/pingcap/tidb/pkg/owner" + "github.com/pingcap/tidb/pkg/parser" + "github.com/pingcap/tidb/pkg/parser/ast" + "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/parser/terror" + metrics2 "github.com/pingcap/tidb/pkg/planner/core/metrics" + "github.com/pingcap/tidb/pkg/privilege/privileges" + "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/sessionctx/sessionstates" + "github.com/pingcap/tidb/pkg/sessionctx/sysproctrack" + "github.com/pingcap/tidb/pkg/sessionctx/variable" + "github.com/pingcap/tidb/pkg/statistics/handle" + "github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze" + statslogutil "github.com/pingcap/tidb/pkg/statistics/handle/logutil" + handleutil "github.com/pingcap/tidb/pkg/statistics/handle/util" + "github.com/pingcap/tidb/pkg/store/helper" + "github.com/pingcap/tidb/pkg/ttl/ttlworker" + "github.com/pingcap/tidb/pkg/types" + "github.com/pingcap/tidb/pkg/util" + "github.com/pingcap/tidb/pkg/util/dbterror" + disttaskutil "github.com/pingcap/tidb/pkg/util/disttask" + "github.com/pingcap/tidb/pkg/util/domainutil" + "github.com/pingcap/tidb/pkg/util/engine" + "github.com/pingcap/tidb/pkg/util/etcd" + "github.com/pingcap/tidb/pkg/util/expensivequery" + "github.com/pingcap/tidb/pkg/util/gctuner" + "github.com/pingcap/tidb/pkg/util/globalconn" + "github.com/pingcap/tidb/pkg/util/intest" + "github.com/pingcap/tidb/pkg/util/logutil" + "github.com/pingcap/tidb/pkg/util/memory" + "github.com/pingcap/tidb/pkg/util/memoryusagealarm" + "github.com/pingcap/tidb/pkg/util/replayer" + "github.com/pingcap/tidb/pkg/util/servermemorylimit" + "github.com/pingcap/tidb/pkg/util/sqlkiller" + "github.com/pingcap/tidb/pkg/util/syncutil" +>>>>>>> b066365ef36 (*: kill auto analyze out of the auto analyze time windows (#55062)):pkg/domain/domain.go "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/txnkv/transaction" pd "github.com/tikv/pd/client" @@ -673,6 +734,24 @@ func (do *Domain) topologySyncerKeeper() { } } +// CheckAutoAnalyzeWindows checks the auto analyze windows and kill the auto analyze process if it is not in the window. +func (do *Domain) CheckAutoAnalyzeWindows() { + se, err := do.sysSessionPool.Get() + + if err != nil { + logutil.BgLogger().Warn("get system session failed", zap.Error(err)) + return + } + // Make sure the session is new. + sctx := se.(sessionctx.Context) + defer do.sysSessionPool.Put(se) + if !autoanalyze.CheckAutoAnalyzeWindow(sctx) { + for _, id := range handleutil.GlobalAutoAnalyzeProcessList.All() { + do.SysProcTracker().KillSysProcess(id) + } + } +} + func (do *Domain) refreshMDLCheckTableInfo() { se, err := do.sysSessionPool.Get() @@ -2097,6 +2176,7 @@ func (do *Domain) updateStatsWorker(ctx sessionctx.Context, owner owner.Manager) if err != nil { logutil.BgLogger().Debug("GC stats failed", zap.Error(err)) } + do.CheckAutoAnalyzeWindows() case <-dumpColStatsUsageTicker.C: err := statsHandle.DumpColStatsUsageToKV() if err != nil { diff --git a/pkg/domain/BUILD.bazel b/pkg/domain/BUILD.bazel new file mode 100644 index 0000000000000..adaeb3a4f02d3 --- /dev/null +++ b/pkg/domain/BUILD.bazel @@ -0,0 +1,177 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "domain", + srcs = [ + "domain.go", + "domain_sysvars.go", + "domainctx.go", + "extract.go", + "historical_stats.go", + "optimize_trace.go", + "plan_replayer.go", + "plan_replayer_dump.go", + "ru_stats.go", + "runaway.go", + "schema_checker.go", + "schema_validator.go", + "sysvar_cache.go", + "test_helper.go", + "topn_slow_query.go", + ], + importpath = "github.com/pingcap/tidb/pkg/domain", + visibility = ["//visibility:public"], + deps = [ + "//br/pkg/streamhelper", + "//br/pkg/streamhelper/daemon", + "//pkg/bindinfo", + "//pkg/config", + "//pkg/ddl", + "//pkg/ddl/placement", + "//pkg/ddl/schematracker", + "//pkg/ddl/systable", + "//pkg/ddl/util", + "//pkg/disttask/framework/scheduler", + "//pkg/disttask/framework/storage", + "//pkg/disttask/framework/taskexecutor", + "//pkg/domain/globalconfigsync", + "//pkg/domain/infosync", + "//pkg/domain/metrics", + "//pkg/domain/resourcegroup", + "//pkg/errno", + "//pkg/infoschema", + "//pkg/infoschema/metrics", + "//pkg/infoschema/perfschema", + "//pkg/keyspace", + "//pkg/kv", + "//pkg/meta", + "//pkg/meta/autoid", + "//pkg/metrics", + "//pkg/owner", + "//pkg/parser", + "//pkg/parser/ast", + "//pkg/parser/model", + "//pkg/parser/mysql", + "//pkg/parser/terror", + "//pkg/planner/core/metrics", + "//pkg/privilege/privileges", + "//pkg/sessionctx", + "//pkg/sessionctx/sessionstates", + "//pkg/sessionctx/sysproctrack", + "//pkg/sessionctx/variable", + "//pkg/statistics", + "//pkg/statistics/handle", + "//pkg/statistics/handle/autoanalyze", + "//pkg/statistics/handle/logutil", + "//pkg/statistics/handle/util", + "//pkg/store/helper", + "//pkg/ttl/cache", + "//pkg/ttl/sqlbuilder", + "//pkg/ttl/ttlworker", + "//pkg/types", + "//pkg/util", + "//pkg/util/chunk", + "//pkg/util/context", + "//pkg/util/dbterror", + "//pkg/util/disttask", + "//pkg/util/domainutil", + "//pkg/util/engine", + "//pkg/util/etcd", + "//pkg/util/execdetails", + "//pkg/util/expensivequery", + "//pkg/util/gctuner", + "//pkg/util/globalconn", + "//pkg/util/intest", + "//pkg/util/logutil", + "//pkg/util/memory", + "//pkg/util/memoryusagealarm", + "//pkg/util/printer", + "//pkg/util/replayer", + "//pkg/util/servermemorylimit", + "//pkg/util/sqlexec", + "//pkg/util/sqlkiller", + "//pkg/util/syncutil", + "@com_github_burntsushi_toml//:toml", + "@com_github_ngaut_pools//:pools", + "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_failpoint//:failpoint", + "@com_github_pingcap_kvproto//pkg/metapb", + "@com_github_pingcap_kvproto//pkg/pdpb", + "@com_github_pingcap_kvproto//pkg/resource_manager", + "@com_github_pingcap_log//:log", + "@com_github_stretchr_testify//require", + "@com_github_tikv_client_go_v2//oracle", + "@com_github_tikv_client_go_v2//tikv", + "@com_github_tikv_client_go_v2//txnkv/transaction", + "@com_github_tikv_pd_client//:client", + "@com_github_tikv_pd_client//http", + "@com_github_tikv_pd_client//resource_group/controller", + "@io_etcd_go_etcd_client_v3//:client", + "@io_etcd_go_etcd_client_v3//concurrency", + "@org_golang_google_grpc//:grpc", + "@org_golang_google_grpc//backoff", + "@org_golang_google_grpc//keepalive", + "@org_uber_go_atomic//:atomic", + "@org_uber_go_zap//:zap", + ], +) + +go_test( + name = "domain_test", + timeout = "short", + srcs = [ + "db_test.go", + "domain_test.go", + "domain_utils_test.go", + "domainctx_test.go", + "extract_test.go", + "main_test.go", + "plan_replayer_handle_test.go", + "plan_replayer_test.go", + "ru_stats_test.go", + "schema_checker_test.go", + "schema_validator_test.go", + "topn_slow_query_test.go", + ], + embed = [":domain"], + flaky = True, + shard_count = 28, + deps = [ + "//pkg/config", + "//pkg/ddl", + "//pkg/domain/infosync", + "//pkg/errno", + "//pkg/infoschema", + "//pkg/keyspace", + "//pkg/kv", + "//pkg/metrics", + "//pkg/parser/ast", + "//pkg/parser/auth", + "//pkg/parser/model", + "//pkg/parser/mysql", + "//pkg/parser/terror", + "//pkg/server", + "//pkg/session", + "//pkg/sessionctx/variable", + "//pkg/store/mockstore", + "//pkg/testkit", + "//pkg/testkit/testsetup", + "//pkg/types", + "//pkg/util", + "//pkg/util/mock", + "//pkg/util/replayer", + "//pkg/util/stmtsummary/v2:stmtsummary", + "@com_github_ngaut_pools//:pools", + "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_failpoint//:failpoint", + "@com_github_pingcap_kvproto//pkg/metapb", + "@com_github_pingcap_kvproto//pkg/resource_manager", + "@com_github_prometheus_client_model//go", + "@com_github_stretchr_testify//require", + "@com_github_tikv_client_go_v2//oracle", + "@com_github_tikv_client_go_v2//txnkv/transaction", + "@com_github_tikv_pd_client//:client", + "@io_etcd_go_etcd_tests_v3//integration", + "@org_uber_go_goleak//:goleak", + ], +) diff --git a/pkg/statistics/handle/autoanalyze/autoanalyze.go b/pkg/statistics/handle/autoanalyze/autoanalyze.go new file mode 100644 index 0000000000000..0f36cc3740a5f --- /dev/null +++ b/pkg/statistics/handle/autoanalyze/autoanalyze.go @@ -0,0 +1,859 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package autoanalyze + +import ( + "context" + "fmt" + "math/rand" + "net" + "strconv" + "strings" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/pkg/domain/infosync" + "github.com/pingcap/tidb/pkg/infoschema" + "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/parser/terror" + "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/sessionctx/sysproctrack" + "github.com/pingcap/tidb/pkg/sessionctx/variable" + "github.com/pingcap/tidb/pkg/statistics" + "github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/exec" + "github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/refresher" + "github.com/pingcap/tidb/pkg/statistics/handle/lockstats" + statslogutil "github.com/pingcap/tidb/pkg/statistics/handle/logutil" + statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types" + statsutil "github.com/pingcap/tidb/pkg/statistics/handle/util" + "github.com/pingcap/tidb/pkg/types" + "github.com/pingcap/tidb/pkg/util" + "github.com/pingcap/tidb/pkg/util/logutil" + "github.com/pingcap/tidb/pkg/util/sqlescape" + "github.com/pingcap/tidb/pkg/util/timeutil" + "go.uber.org/zap" +) + +// statsAnalyze implements util.StatsAnalyze. +// statsAnalyze is used to handle auto-analyze and manage analyze jobs. +type statsAnalyze struct { + statsHandle statstypes.StatsHandle + // sysProcTracker is used to track sys process like analyze + sysProcTracker sysproctrack.Tracker +} + +// NewStatsAnalyze creates a new StatsAnalyze. +func NewStatsAnalyze( + statsHandle statstypes.StatsHandle, + sysProcTracker sysproctrack.Tracker, +) statstypes.StatsAnalyze { + return &statsAnalyze{statsHandle: statsHandle, sysProcTracker: sysProcTracker} +} + +// InsertAnalyzeJob inserts the analyze job to the storage. +func (sa *statsAnalyze) InsertAnalyzeJob(job *statistics.AnalyzeJob, instance string, procID uint64) error { + return statsutil.CallWithSCtx(sa.statsHandle.SPool(), func(sctx sessionctx.Context) error { + return insertAnalyzeJob(sctx, job, instance, procID) + }) +} + +func (sa *statsAnalyze) StartAnalyzeJob(job *statistics.AnalyzeJob) { + err := statsutil.CallWithSCtx(sa.statsHandle.SPool(), func(sctx sessionctx.Context) error { + startAnalyzeJob(sctx, job) + return nil + }) + if err != nil { + statslogutil.StatsLogger().Warn("failed to start analyze job", zap.Error(err)) + } +} + +func (sa *statsAnalyze) UpdateAnalyzeJobProgress(job *statistics.AnalyzeJob, rowCount int64) { + err := statsutil.CallWithSCtx(sa.statsHandle.SPool(), func(sctx sessionctx.Context) error { + updateAnalyzeJobProgress(sctx, job, rowCount) + return nil + }) + if err != nil { + statslogutil.StatsLogger().Warn("failed to update analyze job progress", zap.Error(err)) + } +} + +func (sa *statsAnalyze) FinishAnalyzeJob(job *statistics.AnalyzeJob, failReason error, analyzeType statistics.JobType) { + err := statsutil.CallWithSCtx(sa.statsHandle.SPool(), func(sctx sessionctx.Context) error { + finishAnalyzeJob(sctx, job, failReason, analyzeType) + return nil + }) + if err != nil { + statslogutil.StatsLogger().Warn("failed to finish analyze job", zap.Error(err)) + } +} + +// DeleteAnalyzeJobs deletes the analyze jobs whose update time is earlier than updateTime. +func (sa *statsAnalyze) DeleteAnalyzeJobs(updateTime time.Time) error { + return statsutil.CallWithSCtx(sa.statsHandle.SPool(), func(sctx sessionctx.Context) error { + _, _, err := statsutil.ExecRows(sctx, "DELETE FROM mysql.analyze_jobs WHERE update_time < CONVERT_TZ(%?, '+00:00', @@TIME_ZONE)", updateTime.UTC().Format(types.TimeFormat)) + return err + }) +} + +// CleanupCorruptedAnalyzeJobsOnCurrentInstance cleans up the potentially corrupted analyze job. +// It only cleans up the jobs that are associated with the current instance. +func (sa *statsAnalyze) CleanupCorruptedAnalyzeJobsOnCurrentInstance(currentRunningProcessIDs map[uint64]struct{}) error { + return statsutil.CallWithSCtx(sa.statsHandle.SPool(), func(sctx sessionctx.Context) error { + return CleanupCorruptedAnalyzeJobsOnCurrentInstance(sctx, currentRunningProcessIDs) + }, statsutil.FlagWrapTxn) +} + +// CleanupCorruptedAnalyzeJobsOnDeadInstances removes analyze jobs that may have been corrupted. +// Specifically, it removes jobs associated with instances that no longer exist in the cluster. +func (sa *statsAnalyze) CleanupCorruptedAnalyzeJobsOnDeadInstances() error { + return statsutil.CallWithSCtx(sa.statsHandle.SPool(), func(sctx sessionctx.Context) error { + return CleanupCorruptedAnalyzeJobsOnDeadInstances(sctx) + }, statsutil.FlagWrapTxn) +} + +// SelectAnalyzeJobsOnCurrentInstanceSQL is the SQL to select the analyze jobs whose +// state is `pending` or `running` and the update time is more than 10 minutes ago +// and the instance is current instance. +const SelectAnalyzeJobsOnCurrentInstanceSQL = `SELECT id, process_id + FROM mysql.analyze_jobs + WHERE instance = %? + AND state IN ('pending', 'running') + AND update_time < CONVERT_TZ(%?, '+00:00', @@TIME_ZONE)` + +// SelectAnalyzeJobsSQL is the SQL to select the analyze jobs whose +// state is `pending` or `running` and the update time is more than 10 minutes ago. +const SelectAnalyzeJobsSQL = `SELECT id, instance + FROM mysql.analyze_jobs + WHERE state IN ('pending', 'running') + AND update_time < CONVERT_TZ(%?, '+00:00', @@TIME_ZONE)` + +// BatchUpdateAnalyzeJobSQL is the SQL to update the analyze jobs to `failed` state. +const BatchUpdateAnalyzeJobSQL = `UPDATE mysql.analyze_jobs + SET state = 'failed', + fail_reason = 'The TiDB Server has either shut down or the analyze query was terminated during the analyze job execution', + process_id = NULL + WHERE id IN (%?)` + +func tenMinutesAgo() string { + return time.Now().Add(-10 * time.Minute).UTC().Format(types.TimeFormat) +} + +// CleanupCorruptedAnalyzeJobsOnCurrentInstance cleans up the potentially corrupted analyze job from current instance. +// Exported for testing. +func CleanupCorruptedAnalyzeJobsOnCurrentInstance( + sctx sessionctx.Context, + currentRunningProcessIDs map[uint64]struct{}, +) error { + serverInfo, err := infosync.GetServerInfo() + if err != nil { + return errors.Trace(err) + } + instance := net.JoinHostPort(serverInfo.IP, strconv.Itoa(int(serverInfo.Port))) + // Get all the analyze jobs whose state is `pending` or `running` and the update time is more than 10 minutes ago + // and the instance is current instance. + rows, _, err := statsutil.ExecRows( + sctx, + SelectAnalyzeJobsOnCurrentInstanceSQL, + instance, + tenMinutesAgo(), + ) + if err != nil { + return errors.Trace(err) + } + + jobIDs := make([]string, 0, len(rows)) + for _, row := range rows { + // The process ID is typically non-null for running or pending jobs. + // However, in rare cases(I don't which case), it may be null. Therefore, it's necessary to check its value. + if !row.IsNull(1) { + processID := row.GetUint64(1) + // If the process id is not in currentRunningProcessIDs, we need to clean up the job. + // They don't belong to current instance any more. + if _, ok := currentRunningProcessIDs[processID]; !ok { + jobID := row.GetUint64(0) + jobIDs = append(jobIDs, strconv.FormatUint(jobID, 10)) + } + } + } + + // Do a batch update to clean up the jobs. + if len(jobIDs) > 0 { + _, _, err = statsutil.ExecRows( + sctx, + BatchUpdateAnalyzeJobSQL, + jobIDs, + ) + if err != nil { + return errors.Trace(err) + } + statslogutil.StatsLogger().Info( + "clean up the potentially corrupted analyze jobs from current instance", + zap.Strings("jobIDs", jobIDs), + ) + } + + return nil +} + +// CleanupCorruptedAnalyzeJobsOnDeadInstances cleans up the potentially corrupted analyze job from dead instances. +func CleanupCorruptedAnalyzeJobsOnDeadInstances( + sctx sessionctx.Context, +) error { + rows, _, err := statsutil.ExecRows( + sctx, + SelectAnalyzeJobsSQL, + tenMinutesAgo(), + ) + if err != nil { + return errors.Trace(err) + } + if len(rows) == 0 { + return nil + } + + // Get all the instances from etcd. + serverInfo, err := infosync.GetAllServerInfo(context.Background()) + if err != nil { + return errors.Trace(err) + } + instances := make(map[string]struct{}, len(serverInfo)) + for _, info := range serverInfo { + instance := net.JoinHostPort(info.IP, strconv.Itoa(int(info.Port))) + instances[instance] = struct{}{} + } + + jobIDs := make([]string, 0, len(rows)) + for _, row := range rows { + // If the instance is not in instances, we need to clean up the job. + // It means the instance is down or the instance is not in the cluster any more. + instance := row.GetString(1) + if _, ok := instances[instance]; !ok { + jobID := row.GetUint64(0) + jobIDs = append(jobIDs, strconv.FormatUint(jobID, 10)) + } + } + + // Do a batch update to clean up the jobs. + if len(jobIDs) > 0 { + _, _, err = statsutil.ExecRows( + sctx, + BatchUpdateAnalyzeJobSQL, + jobIDs, + ) + if err != nil { + return errors.Trace(err) + } + statslogutil.StatsLogger().Info( + "clean up the potentially corrupted analyze jobs from dead instances", + zap.Strings("jobIDs", jobIDs), + ) + } + + return nil +} + +// HandleAutoAnalyze analyzes the outdated tables. (The change percent of the table exceeds the threshold) +// It also analyzes newly created tables and newly added indexes. +func (sa *statsAnalyze) HandleAutoAnalyze() (analyzed bool) { + _ = statsutil.CallWithSCtx(sa.statsHandle.SPool(), func(sctx sessionctx.Context) error { + analyzed = HandleAutoAnalyze(sctx, sa.statsHandle, sa.sysProcTracker) + return nil + }) + return +} + +// CheckAnalyzeVersion checks whether all the statistics versions of this table's columns and indexes are the same. +func (sa *statsAnalyze) CheckAnalyzeVersion(tblInfo *model.TableInfo, physicalIDs []int64, version *int) bool { + // We simply choose one physical id to get its stats. + var tbl *statistics.Table + for _, pid := range physicalIDs { + tbl = sa.statsHandle.GetPartitionStats(tblInfo, pid) + if !tbl.Pseudo { + break + } + } + if tbl == nil || tbl.Pseudo { + return true + } + return statistics.CheckAnalyzeVerOnTable(tbl, version) +} + +// HandleAutoAnalyze analyzes the newly created table or index. +func HandleAutoAnalyze( + sctx sessionctx.Context, + statsHandle statstypes.StatsHandle, + sysProcTracker sysproctrack.Tracker, +) (analyzed bool) { + defer func() { + if r := recover(); r != nil { + statslogutil.StatsLogger().Error( + "HandleAutoAnalyze panicked", + zap.Any("recover", r), + zap.Stack("stack"), + ) + } + }() + if variable.EnableAutoAnalyzePriorityQueue.Load() { + r := refresher.NewRefresher(statsHandle, sysProcTracker) + err := r.RebuildTableAnalysisJobQueue() + if err != nil { + statslogutil.StatsLogger().Error("rebuild table analysis job queue failed", zap.Error(err)) + return false + } + return r.PickOneTableAndAnalyzeByPriority() + } + + parameters := exec.GetAutoAnalyzeParameters(sctx) + autoAnalyzeRatio := exec.ParseAutoAnalyzeRatio(parameters[variable.TiDBAutoAnalyzeRatio]) + start, end, ok := checkAutoAnalyzeWindow(parameters) + if !ok { + return false + } + + pruneMode := variable.PartitionPruneMode(sctx.GetSessionVars().PartitionPruneMode.Load()) + + return RandomPickOneTableAndTryAutoAnalyze( + sctx, + statsHandle, + sysProcTracker, + autoAnalyzeRatio, + pruneMode, + start, + end, + ) +} + +// CheckAutoAnalyzeWindow determine the time window for auto-analysis and verify if the current time falls within this range. +// parameters is a map of auto analyze parameters. it is from GetAutoAnalyzeParameters. +func CheckAutoAnalyzeWindow(sctx sessionctx.Context) bool { + parameters := exec.GetAutoAnalyzeParameters(sctx) + _, _, ok := checkAutoAnalyzeWindow(parameters) + return ok +} + +func checkAutoAnalyzeWindow(parameters map[string]string) (time.Time, time.Time, bool) { + start, end, err := exec.ParseAutoAnalysisWindow( + parameters[variable.TiDBAutoAnalyzeStartTime], + parameters[variable.TiDBAutoAnalyzeEndTime], + ) + if err != nil { + statslogutil.StatsLogger().Error( + "parse auto analyze period failed", + zap.Error(err), + ) + return start, end, false + } + if !timeutil.WithinDayTimePeriod(start, end, time.Now()) { + return start, end, false + } + return start, end, true +} + +// RandomPickOneTableAndTryAutoAnalyze randomly picks one table and tries to analyze it. +// 1. If the table is not analyzed, analyze it. +// 2. If the table is analyzed, analyze it when "tbl.ModifyCount/tbl.Count > autoAnalyzeRatio". +// 3. If the table is analyzed, analyze its indices when the index is not analyzed. +// 4. If the table is locked, skip it. +// Exposed solely for testing. +func RandomPickOneTableAndTryAutoAnalyze( + sctx sessionctx.Context, + statsHandle statstypes.StatsHandle, + sysProcTracker sysproctrack.Tracker, + autoAnalyzeRatio float64, + pruneMode variable.PartitionPruneMode, + start, end time.Time, +) bool { + is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema) + dbs := infoschema.AllSchemaNames(is) + // Shuffle the database and table slice to randomize the order of analyzing tables. + rd := rand.New(rand.NewSource(time.Now().UnixNano())) // #nosec G404 + rd.Shuffle(len(dbs), func(i, j int) { + dbs[i], dbs[j] = dbs[j], dbs[i] + }) + // Query locked tables once to minimize overhead. + // Outdated lock info is acceptable as we verify table lock status pre-analysis. + lockedTables, err := lockstats.QueryLockedTables(sctx) + if err != nil { + statslogutil.StatsLogger().Error( + "check table lock failed", + zap.Error(err), + ) + return false + } + + for _, db := range dbs { + // Ignore the memory and system database. + if util.IsMemOrSysDB(strings.ToLower(db)) { + continue + } + + tbls, err := is.SchemaTableInfos(context.Background(), model.NewCIStr(db)) + terror.Log(err) + // We shuffle dbs and tbls so that the order of iterating tables is random. If the order is fixed and the auto + // analyze job of one table fails for some reason, it may always analyze the same table and fail again and again + // when the HandleAutoAnalyze is triggered. Randomizing the order can avoid the problem. + // TODO: Design a priority queue to place the table which needs analyze most in the front. + rd.Shuffle(len(tbls), func(i, j int) { + tbls[i], tbls[j] = tbls[j], tbls[i] + }) + + // We need to check every partition of every table to see if it needs to be analyzed. + for _, tblInfo := range tbls { + // Sometimes the tables are too many. Auto-analyze will take too much time on it. + // so we need to check the available time. + if !timeutil.WithinDayTimePeriod(start, end, time.Now()) { + return false + } + // If table locked, skip analyze all partitions of the table. + // FIXME: This check is not accurate, because other nodes may change the table lock status at any time. + if _, ok := lockedTables[tblInfo.ID]; ok { + continue + } + + if tblInfo.IsView() { + continue + } + + pi := tblInfo.GetPartitionInfo() + // No partitions, analyze the whole table. + if pi == nil { + statsTbl := statsHandle.GetTableStatsForAutoAnalyze(tblInfo) + sql := "analyze table %n.%n" + analyzed := tryAutoAnalyzeTable(sctx, statsHandle, sysProcTracker, tblInfo, statsTbl, autoAnalyzeRatio, sql, db, tblInfo.Name.O) + if analyzed { + // analyze one table at a time to let it get the freshest parameters. + // others will be analyzed next round which is just 3s later. + return true + } + continue + } + // Only analyze the partition that has not been locked. + partitionDefs := make([]model.PartitionDefinition, 0, len(pi.Definitions)) + for _, def := range pi.Definitions { + if _, ok := lockedTables[def.ID]; !ok { + partitionDefs = append(partitionDefs, def) + } + } + partitionStats := getPartitionStats(statsHandle, tblInfo, partitionDefs) + if pruneMode == variable.Dynamic { + analyzed := tryAutoAnalyzePartitionTableInDynamicMode( + sctx, + statsHandle, + sysProcTracker, + tblInfo, + partitionDefs, + partitionStats, + db, + autoAnalyzeRatio, + ) + if analyzed { + return true + } + continue + } + for _, def := range partitionDefs { + sql := "analyze table %n.%n partition %n" + statsTbl := partitionStats[def.ID] + analyzed := tryAutoAnalyzeTable(sctx, statsHandle, sysProcTracker, tblInfo, statsTbl, autoAnalyzeRatio, sql, db, tblInfo.Name.O, def.Name.O) + if analyzed { + return true + } + } + } + } + + return false +} + +func getPartitionStats( + statsHandle statstypes.StatsHandle, + tblInfo *model.TableInfo, + defs []model.PartitionDefinition, +) map[int64]*statistics.Table { + partitionStats := make(map[int64]*statistics.Table, len(defs)) + + for _, def := range defs { + partitionStats[def.ID] = statsHandle.GetPartitionStatsForAutoAnalyze(tblInfo, def.ID) + } + + return partitionStats +} + +// Determine whether the table and index require analysis. +func tryAutoAnalyzeTable( + sctx sessionctx.Context, + statsHandle statstypes.StatsHandle, + sysProcTracker sysproctrack.Tracker, + tblInfo *model.TableInfo, + statsTbl *statistics.Table, + ratio float64, + sql string, + params ...any, +) bool { + // 1. If the statistics are either not loaded or are classified as pseudo, there is no need for analyze + // Pseudo statistics can be created by the optimizer, so we need to double check it. + // 2. If the table is too small, we don't want to waste time to analyze it. + // Leave the opportunity to other bigger tables. + if statsTbl == nil || statsTbl.Pseudo || statsTbl.RealtimeCount < statistics.AutoAnalyzeMinCnt { + return false + } + + // Check if the table needs to analyze. + if needAnalyze, reason := NeedAnalyzeTable( + statsTbl, + ratio, + ); needAnalyze { + escaped, err := sqlescape.EscapeSQL(sql, params...) + if err != nil { + return false + } + statslogutil.StatsLogger().Info( + "auto analyze triggered", + zap.String("sql", escaped), + zap.String("reason", reason), + ) + + tableStatsVer := sctx.GetSessionVars().AnalyzeVersion + statistics.CheckAnalyzeVerOnTable(statsTbl, &tableStatsVer) + exec.AutoAnalyze(sctx, statsHandle, sysProcTracker, tableStatsVer, sql, params...) + + return true + } + + // Whether the table needs to analyze or not, we need to check the indices of the table. + for _, idx := range tblInfo.Indices { + if idxStats := statsTbl.GetIdx(idx.ID); idxStats == nil && !statsTbl.ColAndIdxExistenceMap.HasAnalyzed(idx.ID, true) && idx.State == model.StatePublic { + sqlWithIdx := sql + " index %n" + paramsWithIdx := append(params, idx.Name.O) + escaped, err := sqlescape.EscapeSQL(sqlWithIdx, paramsWithIdx...) + if err != nil { + return false + } + + statslogutil.StatsLogger().Info( + "auto analyze for unanalyzed indexes", + zap.String("sql", escaped), + ) + tableStatsVer := sctx.GetSessionVars().AnalyzeVersion + statistics.CheckAnalyzeVerOnTable(statsTbl, &tableStatsVer) + exec.AutoAnalyze(sctx, statsHandle, sysProcTracker, tableStatsVer, sqlWithIdx, paramsWithIdx...) + return true + } + } + return false +} + +// NeedAnalyzeTable checks if we need to analyze the table: +// 1. If the table has never been analyzed, we need to analyze it. +// 2. If the table had been analyzed before, we need to analyze it when +// "tbl.ModifyCount/tbl.Count > autoAnalyzeRatio" and the current time is +// between `start` and `end`. +// +// Exposed for test. +func NeedAnalyzeTable(tbl *statistics.Table, autoAnalyzeRatio float64) (bool, string) { + analyzed := tbl.IsAnalyzed() + if !analyzed { + return true, "table unanalyzed" + } + // Auto analyze is disabled. + if autoAnalyzeRatio == 0 { + return false, "" + } + // No need to analyze it. + tblCnt := float64(tbl.RealtimeCount) + if histCnt := tbl.GetAnalyzeRowCount(); histCnt > 0 { + tblCnt = histCnt + } + if float64(tbl.ModifyCount)/tblCnt <= autoAnalyzeRatio { + return false, "" + } + return true, fmt.Sprintf("too many modifications(%v/%v>%v)", tbl.ModifyCount, tblCnt, autoAnalyzeRatio) +} + +// It is very similar to tryAutoAnalyzeTable, but it commits the analyze job in batch for partitions. +func tryAutoAnalyzePartitionTableInDynamicMode( + sctx sessionctx.Context, + statsHandle statstypes.StatsHandle, + sysProcTracker sysproctrack.Tracker, + tblInfo *model.TableInfo, + partitionDefs []model.PartitionDefinition, + partitionStats map[int64]*statistics.Table, + db string, + ratio float64, +) bool { + tableStatsVer := sctx.GetSessionVars().AnalyzeVersion + analyzePartitionBatchSize := int(variable.AutoAnalyzePartitionBatchSize.Load()) + needAnalyzePartitionNames := make([]any, 0, len(partitionDefs)) + + for _, def := range partitionDefs { + partitionStats := partitionStats[def.ID] + // 1. If the statistics are either not loaded or are classified as pseudo, there is no need for analyze. + // Pseudo statistics can be created by the optimizer, so we need to double check it. + // 2. If the table is too small, we don't want to waste time to analyze it. + // Leave the opportunity to other bigger tables. + if partitionStats == nil || partitionStats.Pseudo || partitionStats.RealtimeCount < statistics.AutoAnalyzeMinCnt { + continue + } + if needAnalyze, reason := NeedAnalyzeTable( + partitionStats, + ratio, + ); needAnalyze { + needAnalyzePartitionNames = append(needAnalyzePartitionNames, def.Name.O) + statslogutil.StatsLogger().Info( + "need to auto analyze", + zap.String("database", db), + zap.String("table", tblInfo.Name.String()), + zap.String("partition", def.Name.O), + zap.String("reason", reason), + ) + statistics.CheckAnalyzeVerOnTable(partitionStats, &tableStatsVer) + } + } + + getSQL := func(prefix, suffix string, numPartitions int) string { + var sqlBuilder strings.Builder + sqlBuilder.WriteString(prefix) + for i := 0; i < numPartitions; i++ { + if i != 0 { + sqlBuilder.WriteString(",") + } + sqlBuilder.WriteString(" %n") + } + sqlBuilder.WriteString(suffix) + return sqlBuilder.String() + } + + if len(needAnalyzePartitionNames) > 0 { + statslogutil.StatsLogger().Info("start to auto analyze", + zap.String("database", db), + zap.String("table", tblInfo.Name.String()), + zap.Any("partitions", needAnalyzePartitionNames), + zap.Int("analyze partition batch size", analyzePartitionBatchSize), + ) + + statsTbl := statsHandle.GetTableStats(tblInfo) + statistics.CheckAnalyzeVerOnTable(statsTbl, &tableStatsVer) + for i := 0; i < len(needAnalyzePartitionNames); i += analyzePartitionBatchSize { + start := i + end := start + analyzePartitionBatchSize + if end >= len(needAnalyzePartitionNames) { + end = len(needAnalyzePartitionNames) + } + + // Do batch analyze for partitions. + sql := getSQL("analyze table %n.%n partition", "", end-start) + params := append([]any{db, tblInfo.Name.O}, needAnalyzePartitionNames[start:end]...) + + statslogutil.StatsLogger().Info( + "auto analyze triggered", + zap.String("database", db), + zap.String("table", tblInfo.Name.String()), + zap.Any("partitions", needAnalyzePartitionNames[start:end]), + ) + exec.AutoAnalyze(sctx, statsHandle, sysProcTracker, tableStatsVer, sql, params...) + } + + return true + } + // Check if any index of the table needs to analyze. + for _, idx := range tblInfo.Indices { + if idx.State != model.StatePublic { + continue + } + // Collect all the partition names that need to analyze. + for _, def := range partitionDefs { + partitionStats := partitionStats[def.ID] + // 1. If the statistics are either not loaded or are classified as pseudo, there is no need for analyze. + // Pseudo statistics can be created by the optimizer, so we need to double check it. + if partitionStats == nil || partitionStats.Pseudo { + continue + } + // 2. If the index is not analyzed, we need to analyze it. + if !partitionStats.ColAndIdxExistenceMap.HasAnalyzed(idx.ID, true) { + needAnalyzePartitionNames = append(needAnalyzePartitionNames, def.Name.O) + statistics.CheckAnalyzeVerOnTable(partitionStats, &tableStatsVer) + } + } + if len(needAnalyzePartitionNames) > 0 { + statsTbl := statsHandle.GetTableStats(tblInfo) + statistics.CheckAnalyzeVerOnTable(statsTbl, &tableStatsVer) + + for i := 0; i < len(needAnalyzePartitionNames); i += analyzePartitionBatchSize { + start := i + end := start + analyzePartitionBatchSize + if end >= len(needAnalyzePartitionNames) { + end = len(needAnalyzePartitionNames) + } + + sql := getSQL("analyze table %n.%n partition", " index %n", end-start) + params := append([]any{db, tblInfo.Name.O}, needAnalyzePartitionNames[start:end]...) + params = append(params, idx.Name.O) + statslogutil.StatsLogger().Info("auto analyze for unanalyzed", + zap.String("database", db), + zap.String("table", tblInfo.Name.String()), + zap.String("index", idx.Name.String()), + zap.Any("partitions", needAnalyzePartitionNames[start:end]), + ) + exec.AutoAnalyze(sctx, statsHandle, sysProcTracker, tableStatsVer, sql, params...) + } + + return true + } + } + + return false +} + +// insertAnalyzeJob inserts analyze job into mysql.analyze_jobs and gets job ID for further updating job. +func insertAnalyzeJob(sctx sessionctx.Context, job *statistics.AnalyzeJob, instance string, procID uint64) (err error) { + jobInfo := job.JobInfo + const textMaxLength = 65535 + if len(jobInfo) > textMaxLength { + jobInfo = jobInfo[:textMaxLength] + } + const insertJob = "INSERT INTO mysql.analyze_jobs (table_schema, table_name, partition_name, job_info, state, instance, process_id) VALUES (%?, %?, %?, %?, %?, %?, %?)" + _, _, err = statsutil.ExecRows(sctx, insertJob, job.DBName, job.TableName, job.PartitionName, jobInfo, statistics.AnalyzePending, instance, procID) + if err != nil { + return err + } + const getJobID = "SELECT LAST_INSERT_ID()" + rows, _, err := statsutil.ExecRows(sctx, getJobID) + if err != nil { + return err + } + job.ID = new(uint64) + *job.ID = rows[0].GetUint64(0) + failpoint.Inject("DebugAnalyzeJobOperations", func(val failpoint.Value) { + if val.(bool) { + logutil.BgLogger().Info("InsertAnalyzeJob", + zap.String("table_schema", job.DBName), + zap.String("table_name", job.TableName), + zap.String("partition_name", job.PartitionName), + zap.String("job_info", jobInfo), + zap.Uint64("job_id", *job.ID), + ) + } + }) + return nil +} + +// startAnalyzeJob marks the state of the analyze job as running and sets the start time. +func startAnalyzeJob(sctx sessionctx.Context, job *statistics.AnalyzeJob) { + if job == nil || job.ID == nil { + return + } + job.StartTime = time.Now() + job.Progress.SetLastDumpTime(job.StartTime) + const sql = "UPDATE mysql.analyze_jobs SET start_time = CONVERT_TZ(%?, '+00:00', @@TIME_ZONE), state = %? WHERE id = %?" + _, _, err := statsutil.ExecRows(sctx, sql, job.StartTime.UTC().Format(types.TimeFormat), statistics.AnalyzeRunning, *job.ID) + if err != nil { + statslogutil.StatsLogger().Warn("failed to update analyze job", zap.String("update", fmt.Sprintf("%s->%s", statistics.AnalyzePending, statistics.AnalyzeRunning)), zap.Error(err)) + } + failpoint.Inject("DebugAnalyzeJobOperations", func(val failpoint.Value) { + if val.(bool) { + logutil.BgLogger().Info("StartAnalyzeJob", + zap.Time("start_time", job.StartTime), + zap.Uint64("job id", *job.ID), + ) + } + }) +} + +// updateAnalyzeJobProgress updates count of the processed rows when increment reaches a threshold. +func updateAnalyzeJobProgress(sctx sessionctx.Context, job *statistics.AnalyzeJob, rowCount int64) { + if job == nil || job.ID == nil { + return + } + delta := job.Progress.Update(rowCount) + if delta == 0 { + return + } + const sql = "UPDATE mysql.analyze_jobs SET processed_rows = processed_rows + %? WHERE id = %?" + _, _, err := statsutil.ExecRows(sctx, sql, delta, *job.ID) + if err != nil { + statslogutil.StatsLogger().Warn("failed to update analyze job", zap.String("update", fmt.Sprintf("process %v rows", delta)), zap.Error(err)) + } + failpoint.Inject("DebugAnalyzeJobOperations", func(val failpoint.Value) { + if val.(bool) { + logutil.BgLogger().Info("UpdateAnalyzeJobProgress", + zap.Int64("increase processed_rows", delta), + zap.Uint64("job id", *job.ID), + ) + } + }) +} + +// finishAnalyzeJob finishes an analyze or merge job +func finishAnalyzeJob(sctx sessionctx.Context, job *statistics.AnalyzeJob, analyzeErr error, analyzeType statistics.JobType) { + if job == nil || job.ID == nil { + return + } + + job.EndTime = time.Now() + var sql string + var args []any + + // process_id is used to see which process is running the analyze job and kill the analyze job. After the analyze job + // is finished(or failed), process_id is useless and we set it to NULL to avoid `kill tidb process_id` wrongly. + if analyzeErr != nil { + failReason := analyzeErr.Error() + const textMaxLength = 65535 + if len(failReason) > textMaxLength { + failReason = failReason[:textMaxLength] + } + + if analyzeType == statistics.TableAnalysisJob { + sql = "UPDATE mysql.analyze_jobs SET processed_rows = processed_rows + %?, end_time = CONVERT_TZ(%?, '+00:00', @@TIME_ZONE), state = %?, fail_reason = %?, process_id = NULL WHERE id = %?" + args = []any{job.Progress.GetDeltaCount(), job.EndTime.UTC().Format(types.TimeFormat), statistics.AnalyzeFailed, failReason, *job.ID} + } else { + sql = "UPDATE mysql.analyze_jobs SET end_time = CONVERT_TZ(%?, '+00:00', @@TIME_ZONE), state = %?, fail_reason = %?, process_id = NULL WHERE id = %?" + args = []any{job.EndTime.UTC().Format(types.TimeFormat), statistics.AnalyzeFailed, failReason, *job.ID} + } + } else { + if analyzeType == statistics.TableAnalysisJob { + sql = "UPDATE mysql.analyze_jobs SET processed_rows = processed_rows + %?, end_time = CONVERT_TZ(%?, '+00:00', @@TIME_ZONE), state = %?, process_id = NULL WHERE id = %?" + args = []any{job.Progress.GetDeltaCount(), job.EndTime.UTC().Format(types.TimeFormat), statistics.AnalyzeFinished, *job.ID} + } else { + sql = "UPDATE mysql.analyze_jobs SET end_time = CONVERT_TZ(%?, '+00:00', @@TIME_ZONE), state = %?, process_id = NULL WHERE id = %?" + args = []any{job.EndTime.UTC().Format(types.TimeFormat), statistics.AnalyzeFinished, *job.ID} + } + } + + _, _, err := statsutil.ExecRows(sctx, sql, args...) + if err != nil { + state := statistics.AnalyzeFinished + if analyzeErr != nil { + state = statistics.AnalyzeFailed + } + logutil.BgLogger().Warn("failed to update analyze job", zap.String("update", fmt.Sprintf("%s->%s", statistics.AnalyzeRunning, state)), zap.Error(err)) + } + + failpoint.Inject("DebugAnalyzeJobOperations", func(val failpoint.Value) { + if val.(bool) { + logger := logutil.BgLogger().With( + zap.Time("end_time", job.EndTime), + zap.Uint64("job id", *job.ID), + ) + if analyzeType == statistics.TableAnalysisJob { + logger = logger.With(zap.Int64("increase processed_rows", job.Progress.GetDeltaCount())) + } + if analyzeErr != nil { + logger = logger.With(zap.Error(analyzeErr)) + } + logger.Info("FinishAnalyzeJob") + } + }) +} diff --git a/pkg/statistics/handle/autoanalyze/exec/BUILD.bazel b/pkg/statistics/handle/autoanalyze/exec/BUILD.bazel new file mode 100644 index 0000000000000..1ef6999f7006f --- /dev/null +++ b/pkg/statistics/handle/autoanalyze/exec/BUILD.bazel @@ -0,0 +1,40 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "exec", + srcs = ["exec.go"], + importpath = "github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/exec", + visibility = ["//visibility:public"], + deps = [ + "//pkg/metrics", + "//pkg/parser/ast", + "//pkg/sessionctx", + "//pkg/sessionctx/sysproctrack", + "//pkg/sessionctx/variable", + "//pkg/statistics", + "//pkg/statistics/handle/logutil", + "//pkg/statistics/handle/types", + "//pkg/statistics/handle/util", + "//pkg/util/chunk", + "//pkg/util/logutil", + "//pkg/util/sqlescape", + "//pkg/util/sqlexec", + "@com_github_pingcap_errors//:errors", + "@org_uber_go_zap//:zap", + ], +) + +go_test( + name = "exec_test", + timeout = "short", + srcs = ["exec_test.go"], + flaky = True, + deps = [ + ":exec", + "//pkg/parser/model", + "//pkg/sessionctx", + "//pkg/testkit", + "//pkg/util", + "@com_github_stretchr_testify//require", + ], +) diff --git a/pkg/statistics/handle/autoanalyze/exec/exec.go b/pkg/statistics/handle/autoanalyze/exec/exec.go new file mode 100644 index 0000000000000..1430a1fc7fd2f --- /dev/null +++ b/pkg/statistics/handle/autoanalyze/exec/exec.go @@ -0,0 +1,142 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package exec + +import ( + "math" + "strconv" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/pkg/metrics" + "github.com/pingcap/tidb/pkg/parser/ast" + "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/sessionctx/sysproctrack" + "github.com/pingcap/tidb/pkg/sessionctx/variable" + "github.com/pingcap/tidb/pkg/statistics" + statslogutil "github.com/pingcap/tidb/pkg/statistics/handle/logutil" + statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types" + statsutil "github.com/pingcap/tidb/pkg/statistics/handle/util" + "github.com/pingcap/tidb/pkg/util/chunk" + "github.com/pingcap/tidb/pkg/util/logutil" + "github.com/pingcap/tidb/pkg/util/sqlescape" + "github.com/pingcap/tidb/pkg/util/sqlexec" + "go.uber.org/zap" +) + +var execOptionForAnalyze = map[int]sqlexec.OptionFuncAlias{ + statistics.Version0: sqlexec.ExecOptionAnalyzeVer1, + statistics.Version1: sqlexec.ExecOptionAnalyzeVer1, + statistics.Version2: sqlexec.ExecOptionAnalyzeVer2, +} + +// AutoAnalyze executes the auto analyze task. +func AutoAnalyze( + sctx sessionctx.Context, + statsHandle statstypes.StatsHandle, + sysProcTracker sysproctrack.Tracker, + statsVer int, + sql string, + params ...any, +) { + startTime := time.Now() + _, _, err := RunAnalyzeStmt(sctx, statsHandle, sysProcTracker, statsVer, sql, params...) + dur := time.Since(startTime) + metrics.AutoAnalyzeHistogram.Observe(dur.Seconds()) + if err != nil { + escaped, err1 := sqlescape.EscapeSQL(sql, params...) + if err1 != nil { + escaped = "" + } + statslogutil.StatsLogger().Error( + "auto analyze failed", + zap.String("sql", escaped), + zap.Duration("cost_time", dur), + zap.Error(err), + ) + metrics.AutoAnalyzeCounter.WithLabelValues("failed").Inc() + } else { + metrics.AutoAnalyzeCounter.WithLabelValues("succ").Inc() + } +} + +// RunAnalyzeStmt executes the analyze statement. +func RunAnalyzeStmt( + sctx sessionctx.Context, + statsHandle statstypes.StatsHandle, + sysProcTracker sysproctrack.Tracker, + statsVer int, + sql string, + params ...any, +) ([]chunk.Row, []*ast.ResultField, error) { + pruneMode := sctx.GetSessionVars().PartitionPruneMode.Load() + analyzeSnapshot := sctx.GetSessionVars().EnableAnalyzeSnapshot + autoAnalyzeTracker := statsutil.NewAutoAnalyzeTracker(sysProcTracker.Track, sysProcTracker.UnTrack) + autoAnalyzeProcID := statsHandle.AutoAnalyzeProcID() + optFuncs := []sqlexec.OptionFuncAlias{ + execOptionForAnalyze[statsVer], + sqlexec.GetAnalyzeSnapshotOption(analyzeSnapshot), + sqlexec.GetPartitionPruneModeOption(pruneMode), + sqlexec.ExecOptionUseCurSession, + sqlexec.ExecOptionWithSysProcTrack(autoAnalyzeProcID, autoAnalyzeTracker.Track, autoAnalyzeTracker.UnTrack), + } + defer func() { + if r := recover(); r != nil { + logutil.BgLogger().Warn("panic in execAnalyzeStmt", zap.Any("error", r), zap.Stack("stack")) + } + statsHandle.ReleaseAutoAnalyzeProcID(autoAnalyzeProcID) + }() + return statsutil.ExecWithOpts(sctx, optFuncs, sql, params...) +} + +// GetAutoAnalyzeParameters gets the auto analyze parameters from mysql.global_variables. +func GetAutoAnalyzeParameters(sctx sessionctx.Context) map[string]string { + sql := "select variable_name, variable_value from mysql.global_variables where variable_name in (%?, %?, %?)" + rows, _, err := statsutil.ExecWithOpts(sctx, nil, sql, variable.TiDBAutoAnalyzeRatio, variable.TiDBAutoAnalyzeStartTime, variable.TiDBAutoAnalyzeEndTime) + if err != nil { + return map[string]string{} + } + parameters := make(map[string]string, len(rows)) + for _, row := range rows { + parameters[row.GetString(0)] = row.GetString(1) + } + return parameters +} + +// ParseAutoAnalyzeRatio parses the auto analyze ratio from the string. +func ParseAutoAnalyzeRatio(ratio string) float64 { + autoAnalyzeRatio, err := strconv.ParseFloat(ratio, 64) + if err != nil { + return variable.DefAutoAnalyzeRatio + } + return math.Max(autoAnalyzeRatio, 0) +} + +// ParseAutoAnalysisWindow parses the time window for auto analysis. +// It parses the times in UTC location. +func ParseAutoAnalysisWindow(start, end string) (time.Time, time.Time, error) { + if start == "" { + start = variable.DefAutoAnalyzeStartTime + } + if end == "" { + end = variable.DefAutoAnalyzeEndTime + } + s, err := time.ParseInLocation(variable.FullDayTimeFormat, start, time.UTC) + if err != nil { + return s, s, errors.Trace(err) + } + e, err := time.ParseInLocation(variable.FullDayTimeFormat, end, time.UTC) + return s, e, err +} diff --git a/pkg/statistics/handle/autoanalyze/exec/exec_test.go b/pkg/statistics/handle/autoanalyze/exec/exec_test.go new file mode 100644 index 0000000000000..55ce730ac8c11 --- /dev/null +++ b/pkg/statistics/handle/autoanalyze/exec/exec_test.go @@ -0,0 +1,88 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package exec_test + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/exec" + "github.com/pingcap/tidb/pkg/testkit" + "github.com/pingcap/tidb/pkg/util" + "github.com/stretchr/testify/require" +) + +func TestExecAutoAnalyzes(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + + tk.MustExec("create table t (a int, b int, index idx(a))") + tk.MustExec("insert into t values (1, 1), (2, 2), (3, 3)") + + se := tk.Session() + sctx := se.(sessionctx.Context) + handle := dom.StatsHandle() + + exec.AutoAnalyze( + sctx, + handle, + dom.SysProcTracker(), + 2, "analyze table %n", "t", + ) + + // Check the result of analyze. + is := dom.InfoSchema() + tbl, err := is.TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + tblStats := handle.GetTableStats(tbl.Meta()) + require.Equal(t, int64(3), tblStats.RealtimeCount) +} + +func TestKillInWindows(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t1 (a int, b int, index idx(a)) partition by range (a) (partition p0 values less than (2), partition p1 values less than (14))") + tk.MustExec("insert into t1 values (4, 4), (5, 5), (6, 6), (7, 7), (8, 8), (9, 9), (10, 10), (11, 11), (12, 12), (13, 13)") + handle := dom.StatsHandle() + sysProcTracker := dom.SysProcTracker() + now := time.Now() + startTime := now.Add(1 * time.Hour).Format("15:04 -0700") + endTime := now.Add(2 * time.Hour).Format("15:04 -0700") + tk.MustExec(fmt.Sprintf("SET GLOBAL tidb_auto_analyze_start_time='%s'", startTime)) + tk.MustExec(fmt.Sprintf("SET GLOBAL tidb_auto_analyze_end_time='%s'", endTime)) + var wg util.WaitGroupWrapper + exitCh := make(chan struct{}) + wg.Run(func() { + for { + select { + case <-exitCh: + return + default: + dom.CheckAutoAnalyzeWindows() + } + } + }) + sctx := tk.Session() + _, _, err := exec.RunAnalyzeStmt(sctx, handle, sysProcTracker, 2, "analyze table %n", "t1") + require.ErrorContains(t, err, "[executor:1317]Query execution was interrupted") + close(exitCh) + wg.Wait() +}