diff --git a/config/config.go b/config/config.go index 53141fadd093a..c0f61a914fd66 100644 --- a/config/config.go +++ b/config/config.go @@ -439,12 +439,17 @@ func (s *Security) ClusterSecurity() tikvcfg.Security { // Status is the status section of the config. type Status struct { - StatusHost string `toml:"status-host" json:"status-host"` - MetricsAddr string `toml:"metrics-addr" json:"metrics-addr"` - StatusPort uint `toml:"status-port" json:"status-port"` - MetricsInterval uint `toml:"metrics-interval" json:"metrics-interval"` - ReportStatus bool `toml:"report-status" json:"report-status"` - RecordQPSbyDB bool `toml:"record-db-qps" json:"record-db-qps"` + StatusHost string `toml:"status-host" json:"status-host"` + MetricsAddr string `toml:"metrics-addr" json:"metrics-addr"` + StatusPort uint `toml:"status-port" json:"status-port"` + MetricsInterval uint `toml:"metrics-interval" json:"metrics-interval"` + ReportStatus bool `toml:"report-status" json:"report-status"` + RecordQPSbyDB bool `toml:"record-db-qps" json:"record-db-qps"` + GRPCKeepAliveTime uint `toml:"grpc-keepalive-time" json:"grpc-keepalive-time"` + GRPCKeepAliveTimeout uint `toml:"grpc-keepalive-timeout" json:"grpc-keepalive-timeout"` + GRPCConcurrentStreams uint `toml:"grpc-concurrent-streams" json:"grpc-concurrent-streams"` + GRPCInitialWindowSize int `toml:"grpc-initial-window-size" json:"grpc-initial-window-size"` + GRPCMaxSendMsgSize int `toml:"grpc-max-send-msg-size" json:"grpc-max-send-msg-size"` } // Performance is the performance section of the config. @@ -658,11 +663,16 @@ var defaultConf = Config{ EnableSlowLog: *NewAtomicBool(logutil.DefaultTiDBEnableSlowLog), }, Status: Status{ - ReportStatus: true, - StatusHost: DefStatusHost, - StatusPort: DefStatusPort, - MetricsInterval: 15, - RecordQPSbyDB: false, + ReportStatus: true, + StatusHost: DefStatusHost, + StatusPort: DefStatusPort, + MetricsInterval: 15, + RecordQPSbyDB: false, + GRPCKeepAliveTime: 10, + GRPCKeepAliveTimeout: 3, + GRPCConcurrentStreams: 1024, + GRPCInitialWindowSize: 2 * 1024 * 1024, + GRPCMaxSendMsgSize: 10 * 1024 * 1024, }, Performance: Performance{ MaxMemory: 0, diff --git a/config/config.toml.example b/config/config.toml.example index 0e91f903748f8..66e7085084414 100644 --- a/config/config.toml.example +++ b/config/config.toml.example @@ -238,6 +238,23 @@ metrics-interval = 15 # Record statements qps by database name if it is enabled. record-db-qps = false +# After a duration of this time in seconds if the server doesn't see any activity it pings +# the client to see if the transport is still alive. +grpc-keepalive-time = 10 + +# After having pinged for keepalive check, the server waits for a duration of timeout in seconds +# and if no activity is seen even after that the connection is closed. +grpc-keepalive-timeout = 3 + +## The number of max concurrent streams/requests on a client connection. +# grpc-concurrent-streams = 1024 + +## Sets window size for stream. The default value is 2MB. +# grpc-initial-window-size = 2097152 + +## Set maximum message length in bytes that gRPC can send. `-1` means unlimited. The default value is 10MB. +# grpc-max-send-msg-size = 10485760 + [performance] # Max CPUs to use, 0 use number of CPUs in the machine. max-procs = 0 diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 24c3769217e5d..073df014d1d30 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -477,7 +477,7 @@ func newMetaWithQueueTp(txn kv.Transaction, tp workerType) *meta.Meta { } func (w *worker) setDDLLabelForTopSQL(job *model.Job) { - if !variable.TopSQLEnabled() || job == nil { + if !topsql.InstanceEnabled() || job == nil { return } diff --git a/distsql/request_builder.go b/distsql/request_builder.go index 44ff8d237f413..d826bfc28fe58 100644 --- a/distsql/request_builder.go +++ b/distsql/request_builder.go @@ -35,6 +35,7 @@ import ( "github.com/pingcap/tidb/util/codec" "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/ranger" + "github.com/pingcap/tidb/util/topsql" "github.com/pingcap/tipb/go-tipb" ) @@ -291,7 +292,7 @@ func (builder *RequestBuilder) SetFromInfoSchema(pis interface{}) *RequestBuilde // SetResourceGroupTagger sets the request resource group tagger. func (builder *RequestBuilder) SetResourceGroupTagger(sc *stmtctx.StatementContext) *RequestBuilder { - if variable.TopSQLEnabled() { + if topsql.InstanceEnabled() { builder.Request.ResourceGroupTagger = sc.GetResourceGroupTagger() } return builder diff --git a/domain/globalconfigsync/globalconfig_test.go b/domain/globalconfigsync/globalconfig_test.go index c7be9137ddf68..3a81b1a38453a 100644 --- a/domain/globalconfigsync/globalconfig_test.go +++ b/domain/globalconfigsync/globalconfig_test.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/tidb/domain/globalconfigsync" "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/store/mockstore" + "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/util/testbridge" "github.com/stretchr/testify/require" "go.etcd.io/etcd/integration" @@ -67,11 +68,8 @@ func TestStoreGlobalConfig(t *testing.T) { defer cluster.Terminate(t) domain.GetGlobalConfigSyncer().SetEtcdClient(cluster.RandClient()) - se, err := session.CreateSession4Test(store) - require.NoError(t, err) - - _, err = se.Execute(context.Background(), "set @@global.tidb_enable_top_sql=1;") - require.NoError(t, err) + tk := testkit.NewTestKit(t, store) + tk.MustExec("set @@global.tidb_enable_top_sql=1;") for i := 0; i < 20; i++ { resp, err := cluster.RandClient().Get(context.Background(), "/global/config/enable_resource_metering") diff --git a/domain/sysvar_cache.go b/domain/sysvar_cache.go index d89ba88a76ee0..5791b782d0b0a 100644 --- a/domain/sysvar_cache.go +++ b/domain/sysvar_cache.go @@ -26,6 +26,8 @@ import ( "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/sqlexec" "github.com/pingcap/tidb/util/stmtsummary" + "github.com/pingcap/tidb/util/topsql/reporter" + "github.com/pingcap/tidb/util/tracecpu" storekv "github.com/tikv/client-go/v2/kv" pd "github.com/tikv/pd/client" "go.uber.org/zap" @@ -201,36 +203,34 @@ func (do *Domain) checkEnableServerGlobalVar(name, sVal string) { err = stmtsummary.StmtSummaryByDigestMap.SetMaxSQLLength(sVal, false) case variable.TiDBCapturePlanBaseline: variable.CapturePlanBaseline.Set(sVal, false) - case variable.TiDBEnableTopSQL: - variable.TopSQLVariable.Enable.Store(variable.TiDBOptOn(sVal)) case variable.TiDBTopSQLPrecisionSeconds: var val int64 val, err = strconv.ParseInt(sVal, 10, 64) if err != nil { break } - variable.TopSQLVariable.PrecisionSeconds.Store(val) + tracecpu.PrecisionSeconds.Store(val) case variable.TiDBTopSQLMaxStatementCount: var val int64 val, err = strconv.ParseInt(sVal, 10, 64) if err != nil { break } - variable.TopSQLVariable.MaxStatementCount.Store(val) + reporter.MaxStatementCount.Store(val) case variable.TiDBTopSQLMaxCollect: var val int64 val, err = strconv.ParseInt(sVal, 10, 64) if err != nil { break } - variable.TopSQLVariable.MaxCollect.Store(val) + reporter.MaxCollect.Store(val) case variable.TiDBTopSQLReportIntervalSeconds: var val int64 val, err = strconv.ParseInt(sVal, 10, 64) if err != nil { break } - variable.TopSQLVariable.ReportIntervalSeconds.Store(val) + reporter.ReportIntervalSeconds.Store(val) case variable.TiDBRestrictedReadOnly: variable.RestrictedReadOnly.Store(variable.TiDBOptOn(sVal)) case variable.TiDBStoreLimit: diff --git a/executor/adapter.go b/executor/adapter.go index 87f87a9712516..40929db7b23ce 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -314,7 +314,7 @@ func (a *ExecStmt) RebuildPlan(ctx context.Context) (int64, error) { } func (a *ExecStmt) setPlanLabelForTopSQL(ctx context.Context) context.Context { - if a.Plan == nil || !variable.TopSQLEnabled() { + if a.Plan == nil || !topsql.InstanceEnabled() { return ctx } vars := a.Ctx.GetSessionVars() diff --git a/executor/executor.go b/executor/executor.go index eee07f8774ed0..21a3b5b4952d7 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -65,6 +65,7 @@ import ( "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/resourcegrouptag" "github.com/pingcap/tidb/util/topsql" + "github.com/pingcap/tidb/util/tracecpu" tikverr "github.com/tikv/client-go/v2/error" tikvstore "github.com/tikv/client-go/v2/kv" tikvutil "github.com/tikv/client-go/v2/util" @@ -1726,11 +1727,11 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) { sc.InitSQLDigest(prepareStmt.NormalizedSQL, prepareStmt.SQLDigest) // For `execute stmt` SQL, should reset the SQL digest with the prepare SQL digest. goCtx := context.Background() - if variable.EnablePProfSQLCPU.Load() && len(prepareStmt.NormalizedSQL) > 0 { + if tracecpu.EnablePProfSQLCPU.Load() && len(prepareStmt.NormalizedSQL) > 0 { goCtx = pprof.WithLabels(goCtx, pprof.Labels("sql", util.QueryStrForLog(prepareStmt.NormalizedSQL))) pprof.SetGoroutineLabels(goCtx) } - if variable.TopSQLEnabled() && prepareStmt.SQLDigest != nil { + if topsql.InstanceEnabled() && prepareStmt.SQLDigest != nil { topsql.AttachSQLInfo(goCtx, prepareStmt.NormalizedSQL, prepareStmt.SQLDigest, "", nil, vars.InRestrictedSQL) } } @@ -1910,7 +1911,7 @@ func FillVirtualColumnValue(virtualRetTypes []*types.FieldType, virtualColumnInd } func setResourceGroupTaggerForTxn(sc *stmtctx.StatementContext, snapshot kv.Snapshot) { - if snapshot != nil && variable.TopSQLEnabled() { + if snapshot != nil && topsql.InstanceEnabled() { snapshot.SetOption(kv.ResourceGroupTagger, sc.GetResourceGroupTagger()) } } diff --git a/executor/executor_test.go b/executor/executor_test.go index c93aa09e6ccdf..bd52712673f80 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -82,6 +82,8 @@ import ( "github.com/pingcap/tidb/util/testkit" "github.com/pingcap/tidb/util/testutil" "github.com/pingcap/tidb/util/timeutil" + "github.com/pingcap/tidb/util/topsql" + topsqlmock "github.com/pingcap/tidb/util/tracecpu/mock" "github.com/pingcap/tipb/go-tipb" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/oracle" @@ -8718,11 +8720,8 @@ func (s *testResourceTagSuite) TestResourceGroupTag(c *C) { tk.MustExec("create table t(a int, b int, unique index idx(a));") tbInfo := testGetTableByName(c, tk.Se, "test", "t") - // Enable Top SQL - variable.TopSQLVariable.Enable.Store(true) - config.UpdateGlobal(func(conf *config.Config) { - conf.TopSQL.ReceiverAddress = "mock-agent" - }) + topsql.SetupTopSQL(topsqlmock.NewTopSQLCollector()) + defer topsql.Close() c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/unistoreRPCClientSendHook", `return(true)`), IsNil) defer failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/unistoreRPCClientSendHook") diff --git a/executor/main_test.go b/executor/main_test.go index bff65b72d6a2d..7a9e377ae23c1 100644 --- a/executor/main_test.go +++ b/executor/main_test.go @@ -57,6 +57,9 @@ func TestMain(m *testing.M) { goleak.IgnoreTopFunction("go.etcd.io/etcd/pkg/logutil.(*MergeLogger).outputLoop"), goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), goleak.IgnoreTopFunction("gopkg.in/natefinch/lumberjack%2ev2.(*Logger).millRun"), + goleak.IgnoreTopFunction("time.Sleep"), + goleak.IgnoreTopFunction("runtime/pprof.readProfile"), + goleak.IgnoreTopFunction("github.com/pingcap/tidb/util/topsql/tracecpu.(*sqlCPUProfiler).startAnalyzeProfileWorker"), } callback := func(i int) int { testDataMap.GenerateOutputIfNeeded() diff --git a/executor/prepared.go b/executor/prepared.go index 3013aba0de9cd..8c91b02a411eb 100644 --- a/executor/prepared.go +++ b/executor/prepared.go @@ -30,7 +30,6 @@ import ( "github.com/pingcap/tidb/planner" plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/sessionctx" - "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/types" driver "github.com/pingcap/tidb/types/parser_driver" "github.com/pingcap/tidb/util" @@ -194,7 +193,7 @@ func (e *PrepareExec) Next(ctx context.Context, req *chunk.Chunk) error { SchemaVersion: ret.InfoSchema.SchemaMetaVersion(), } normalizedSQL, digest := parser.NormalizeDigest(prepared.Stmt.Text()) - if variable.TopSQLEnabled() { + if topsql.InstanceEnabled() { ctx = topsql.AttachSQLInfo(ctx, normalizedSQL, digest, "", nil, vars.InRestrictedSQL) } diff --git a/executor/set_test.go b/executor/set_test.go index 2595b4a3131f5..dea1182e13759 100644 --- a/executor/set_test.go +++ b/executor/set_test.go @@ -38,6 +38,8 @@ import ( "github.com/pingcap/tidb/util/mock" "github.com/pingcap/tidb/util/testkit" "github.com/pingcap/tidb/util/testutil" + "github.com/pingcap/tidb/util/topsql/reporter" + "github.com/pingcap/tidb/util/tracecpu" ) func (s *testSerialSuite1) TestSetVar(c *C) { @@ -1497,27 +1499,21 @@ func (s *testSerialSuite) TestSetTopSQLVariables(c *C) { }() tk := testkit.NewTestKit(c, s.store) - tk.MustExec("set @@global.tidb_enable_top_sql='On';") - tk.MustQuery("select @@global.tidb_enable_top_sql;").Check(testkit.Rows("1")) - c.Assert(variable.TopSQLVariable.Enable.Load(), IsTrue) - tk.MustExec("set @@global.tidb_enable_top_sql='off';") - tk.MustQuery("select @@global.tidb_enable_top_sql;").Check(testkit.Rows("0")) - c.Assert(variable.TopSQLVariable.Enable.Load(), IsFalse) tk.MustExec("set @@global.tidb_top_sql_precision_seconds=2;") tk.MustQuery("select @@global.tidb_top_sql_precision_seconds;").Check(testkit.Rows("2")) - c.Assert(variable.TopSQLVariable.PrecisionSeconds.Load(), Equals, int64(2)) + c.Assert(tracecpu.PrecisionSeconds.Load(), Equals, int64(2)) _, err := tk.Exec("set @@global.tidb_top_sql_precision_seconds='abc';") c.Assert(err.Error(), Equals, "[variable:1232]Incorrect argument type to variable 'tidb_top_sql_precision_seconds'") tk.MustExec("set @@global.tidb_top_sql_precision_seconds='-1';") tk.MustQuery("select @@global.tidb_top_sql_precision_seconds;").Check(testkit.Rows("1")) tk.MustExec("set @@global.tidb_top_sql_precision_seconds=2;") tk.MustQuery("select @@global.tidb_top_sql_precision_seconds;").Check(testkit.Rows("2")) - c.Assert(variable.TopSQLVariable.PrecisionSeconds.Load(), Equals, int64(2)) + c.Assert(tracecpu.PrecisionSeconds.Load(), Equals, int64(2)) tk.MustExec("set @@global.tidb_top_sql_max_statement_count=20;") tk.MustQuery("select @@global.tidb_top_sql_max_statement_count;").Check(testkit.Rows("20")) - c.Assert(variable.TopSQLVariable.MaxStatementCount.Load(), Equals, int64(20)) + c.Assert(reporter.MaxStatementCount.Load(), Equals, int64(20)) _, err = tk.Exec("set @@global.tidb_top_sql_max_statement_count='abc';") c.Assert(err.Error(), Equals, "[variable:1232]Incorrect argument type to variable 'tidb_top_sql_max_statement_count'") tk.MustExec("set @@global.tidb_top_sql_max_statement_count='-1';") @@ -1528,11 +1524,11 @@ func (s *testSerialSuite) TestSetTopSQLVariables(c *C) { tk.MustExec("set @@global.tidb_top_sql_max_statement_count=20;") tk.MustQuery("select @@global.tidb_top_sql_max_statement_count;").Check(testkit.Rows("20")) - c.Assert(variable.TopSQLVariable.MaxStatementCount.Load(), Equals, int64(20)) + c.Assert(reporter.MaxStatementCount.Load(), Equals, int64(20)) tk.MustExec("set @@global.tidb_top_sql_max_collect=20000;") tk.MustQuery("select @@global.tidb_top_sql_max_collect;").Check(testkit.Rows("20000")) - c.Assert(variable.TopSQLVariable.MaxCollect.Load(), Equals, int64(20000)) + c.Assert(reporter.MaxCollect.Load(), Equals, int64(20000)) _, err = tk.Exec("set @@global.tidb_top_sql_max_collect='abc';") c.Assert(err.Error(), Equals, "[variable:1232]Incorrect argument type to variable 'tidb_top_sql_max_collect'") tk.MustExec("set @@global.tidb_top_sql_max_collect='-1';") @@ -1545,11 +1541,11 @@ func (s *testSerialSuite) TestSetTopSQLVariables(c *C) { tk.MustExec("set @@global.tidb_top_sql_max_collect=20000;") tk.MustQuery("select @@global.tidb_top_sql_max_collect;").Check(testkit.Rows("20000")) - c.Assert(variable.TopSQLVariable.MaxCollect.Load(), Equals, int64(20000)) + c.Assert(reporter.MaxCollect.Load(), Equals, int64(20000)) tk.MustExec("set @@global.tidb_top_sql_report_interval_seconds=120;") tk.MustQuery("select @@global.tidb_top_sql_report_interval_seconds;").Check(testkit.Rows("120")) - c.Assert(variable.TopSQLVariable.ReportIntervalSeconds.Load(), Equals, int64(120)) + c.Assert(reporter.ReportIntervalSeconds.Load(), Equals, int64(120)) _, err = tk.Exec("set @@global.tidb_top_sql_report_interval_seconds='abc';") c.Assert(err.Error(), Equals, "[variable:1232]Incorrect argument type to variable 'tidb_top_sql_report_interval_seconds'") tk.MustExec("set @@global.tidb_top_sql_report_interval_seconds='5000';") @@ -1558,7 +1554,7 @@ func (s *testSerialSuite) TestSetTopSQLVariables(c *C) { tk.MustExec("set @@global.tidb_top_sql_report_interval_seconds=120;") tk.MustQuery("select @@global.tidb_top_sql_report_interval_seconds;").Check(testkit.Rows("120")) - c.Assert(variable.TopSQLVariable.ReportIntervalSeconds.Load(), Equals, int64(120)) + c.Assert(reporter.ReportIntervalSeconds.Load(), Equals, int64(120)) // Test for hide top sql variable in show variable. tk.MustQuery("show variables like '%top_sql%'").Check(testkit.Rows()) diff --git a/executor/update.go b/executor/update.go index 7df144b28196c..ea5230632283a 100644 --- a/executor/update.go +++ b/executor/update.go @@ -26,12 +26,12 @@ import ( "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" plannercore "github.com/pingcap/tidb/planner/core" - "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/memory" + "github.com/pingcap/tidb/util/topsql" "github.com/tikv/client-go/v2/txnkv/txnsnapshot" ) @@ -271,7 +271,7 @@ func (e *UpdateExec) updateRows(ctx context.Context) (int, error) { txn.GetSnapshot().SetOption(kv.CollectRuntimeStats, e.stats.SnapshotRuntimeStats) } } - if variable.TopSQLEnabled() { + if topsql.InstanceEnabled() { txn, err := e.ctx.Txn(true) if err == nil { txn.SetOption(kv.ResourceGroupTagger, e.ctx.GetSessionVars().StmtCtx.GetResourceGroupTagger()) diff --git a/metrics/metrics.go b/metrics/metrics.go index f5c1faf5bb3aa..4e9ceff3a5ad6 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -154,6 +154,7 @@ func RegisterMetrics() { prometheus.MustRegister(SmallTxnWriteDuration) prometheus.MustRegister(TxnWriteThroughput) prometheus.MustRegister(LoadSysVarCacheCounter) + prometheus.MustRegister(TopSQLProfileCounter) prometheus.MustRegister(TopSQLIgnoredCounter) prometheus.MustRegister(TopSQLReportDurationHistogram) prometheus.MustRegister(TopSQLReportDataHistogram) diff --git a/metrics/topsql.go b/metrics/topsql.go index 7ee6e55ae3f71..e6816cd3f9243 100644 --- a/metrics/topsql.go +++ b/metrics/topsql.go @@ -18,6 +18,14 @@ import "github.com/prometheus/client_golang/prometheus" // Top SQL metrics. var ( + TopSQLProfileCounter = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "tidb", + Subsystem: "topsql", + Name: "profile_total", + Help: "Counter of profiling", + }) + TopSQLIgnoredCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "tidb", diff --git a/server/conn.go b/server/conn.go index 0209ab77171a5..772e0b533b901 100644 --- a/server/conn.go +++ b/server/conn.go @@ -85,6 +85,8 @@ import ( "github.com/pingcap/tidb/util/hack" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/memory" + "github.com/pingcap/tidb/util/topsql" + "github.com/pingcap/tidb/util/tracecpu" "github.com/prometheus/client_golang/prometheus" "github.com/tikv/client-go/v2/util" "go.uber.org/zap" @@ -1244,10 +1246,10 @@ func (cc *clientConn) dispatch(ctx context.Context, data []byte) error { cc.lastPacket = data cmd := data[0] data = data[1:] - if variable.TopSQLEnabled() { + if topsql.InstanceEnabled() { defer pprof.SetGoroutineLabels(ctx) } - if variable.EnablePProfSQLCPU.Load() { + if tracecpu.EnablePProfSQLCPU.Load() { label := getLastStmtInConn{cc}.PProfLabel() if len(label) > 0 { defer pprof.SetGoroutineLabels(ctx) diff --git a/server/conn_stmt.go b/server/conn_stmt.go index 980de55c6c896..b45799d73b8dd 100644 --- a/server/conn_stmt.go +++ b/server/conn_stmt.go @@ -50,7 +50,6 @@ import ( "github.com/pingcap/tidb/parser/terror" plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/sessionctx/stmtctx" - "github.com/pingcap/tidb/sessionctx/variable" storeerr "github.com/pingcap/tidb/store/driver/error" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/execdetails" @@ -127,7 +126,7 @@ func (cc *clientConn) handleStmtExecute(ctx context.Context, data []byte) (err e stmtID := binary.LittleEndian.Uint32(data[0:4]) pos += 4 - if variable.TopSQLEnabled() { + if topsql.InstanceEnabled() { preparedStmt, _ := cc.preparedStmtID2CachePreparedStmt(stmtID) if preparedStmt != nil && preparedStmt.SQLDigest != nil { ctx = topsql.AttachSQLInfo(ctx, preparedStmt.NormalizedSQL, preparedStmt.SQLDigest, "", nil, false) @@ -273,7 +272,7 @@ func (cc *clientConn) handleStmtFetch(ctx context.Context, data []byte) (err err return errors.Annotate(mysql.NewErr(mysql.ErrUnknownStmtHandler, strconv.FormatUint(uint64(stmtID), 10), "stmt_fetch"), cc.preparedStmt2String(stmtID)) } - if variable.TopSQLEnabled() { + if topsql.InstanceEnabled() { prepareObj, _ := cc.preparedStmtID2CachePreparedStmt(stmtID) if prepareObj != nil && prepareObj.SQLDigest != nil { ctx = topsql.AttachSQLInfo(ctx, prepareObj.NormalizedSQL, prepareObj.SQLDigest, "", nil, false) diff --git a/server/http_status.go b/server/http_status.go index 8248e7c54e3de..00cbf5ddcc50f 100644 --- a/server/http_status.go +++ b/server/http_status.go @@ -46,7 +46,7 @@ import ( "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/printer" - "github.com/pingcap/tidb/util/topsql/tracecpu" + "github.com/pingcap/tidb/util/tracecpu" "github.com/pingcap/tidb/util/versioninfo" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/soheilhy/cmux" diff --git a/server/rpc_server.go b/server/rpc_server.go index 67965ac381f4d..3b23539c0bac1 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "net" + "time" "github.com/pingcap/kvproto/pkg/coprocessor" "github.com/pingcap/kvproto/pkg/diagnosticspb" @@ -32,8 +33,10 @@ import ( "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/memory" + "github.com/pingcap/tidb/util/topsql" "go.uber.org/zap" "google.golang.org/grpc" + "google.golang.org/grpc/keepalive" "google.golang.org/grpc/peer" ) @@ -46,7 +49,15 @@ func NewRPCServer(config *config.Config, dom *domain.Domain, sm util.SessionMana } }() - s := grpc.NewServer() + s := grpc.NewServer( + grpc.KeepaliveParams(keepalive.ServerParameters{ + Time: time.Duration(config.Status.GRPCKeepAliveTime) * time.Second, + Timeout: time.Duration(config.Status.GRPCKeepAliveTimeout) * time.Second, + }), + grpc.MaxConcurrentStreams(uint32(config.Status.GRPCConcurrentStreams)), + grpc.InitialWindowSize(int32(config.Status.GRPCInitialWindowSize)), + grpc.MaxSendMsgSize(config.Status.GRPCMaxSendMsgSize), + ) rpcSrv := &rpcServer{ DiagnosticsServer: sysutil.NewDiagnosticsServer(config.Log.File.Filename), dom: dom, @@ -54,6 +65,7 @@ func NewRPCServer(config *config.Config, dom *domain.Domain, sm util.SessionMana } diagnosticspb.RegisterDiagnosticsServer(s, rpcSrv) tikvpb.RegisterTikvServer(s, rpcSrv) + topsql.RegisterPubSubServer(s) return s } diff --git a/server/tidb_test.go b/server/tidb_test.go index 8ea521fda6275..ce1f28b40b2e1 100644 --- a/server/tidb_test.go +++ b/server/tidb_test.go @@ -49,9 +49,9 @@ import ( "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/plancodec" + "github.com/pingcap/tidb/util/topsql" "github.com/pingcap/tidb/util/topsql/reporter" mockTopSQLReporter "github.com/pingcap/tidb/util/topsql/reporter/mock" - "github.com/pingcap/tidb/util/topsql/tracecpu" mockTopSQLTraceCPU "github.com/pingcap/tidb/util/topsql/tracecpu/mock" "github.com/stretchr/testify/require" ) @@ -131,8 +131,6 @@ func createTidbTestTopSQLSuite(t *testing.T) (*tidbTestTopSQLSuite, func()) { dbt.MustExec("set @@global.tidb_top_sql_report_interval_seconds=2;") dbt.MustExec("set @@global.tidb_top_sql_max_statement_count=5;") - tracecpu.GlobalSQLCPUProfiler.Run() - return ts, cleanup } @@ -1315,7 +1313,8 @@ func TestTopSQLCPUProfile(t *testing.T) { }() collector := mockTopSQLTraceCPU.NewTopSQLCollector() - tracecpu.GlobalSQLCPUProfiler.SetCollector(&collectorWrapper{collector}) + topsql.SetupTopSQL(&collectorWrapper{collector}) + defer topsql.Close() dbt := testkit.NewDBTestKit(t, db) dbt.MustExec("drop database if exists topsql") @@ -1324,11 +1323,6 @@ func TestTopSQLCPUProfile(t *testing.T) { dbt.MustExec("create table t (a int auto_increment, b int, unique index idx(a));") dbt.MustExec("create table t1 (a int auto_increment, b int, unique index idx(a));") dbt.MustExec("create table t2 (a int auto_increment, b int, unique index idx(a));") - dbt.MustExec("set @@global.tidb_enable_top_sql='On';") - config.UpdateGlobal(func(conf *config.Config) { - conf.TopSQL.ReceiverAddress = "127.0.0.1:4001" - }) - dbt.MustExec("set @@global.tidb_top_sql_precision_seconds=1;") dbt.MustExec("set @@global.tidb_txn_mode = 'pessimistic'") // Test case 1: DML query: insert/update/replace/delete/select @@ -1518,7 +1512,7 @@ func TestTopSQLCPUProfile(t *testing.T) { checkFn("commit", "") } -func TestTopSQLAgent(t *testing.T) { +func TestTopSQLReceiver(t *testing.T) { t.Skip("unstable, skip it and fix it before 20210702") ts, cleanup := createTidbTestTopSQLSuite(t) @@ -1529,10 +1523,10 @@ func TestTopSQLAgent(t *testing.T) { err := db.Close() require.NoError(t, err) }() - agentServer, err := mockTopSQLReporter.StartMockAgentServer() + receiverServer, err := mockTopSQLReporter.StartMockReceiverServer() require.NoError(t, err) defer func() { - agentServer.Stop() + receiverServer.Stop() }() require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/util/topsql/reporter/resetTimeoutForTest", `return(true)`)) @@ -1562,27 +1556,28 @@ func TestTopSQLAgent(t *testing.T) { conf.TopSQL.ReceiverAddress = addr }) } - dbt.MustExec("set @@global.tidb_enable_top_sql='On';") + + report := reporter.NewRemoteTopSQLReporter(plancodec.DecodeNormalizedPlan) + topsql.SetupTopSQL(report) + defer topsql.Close() + setTopSQLReceiverAddress("") dbt.MustExec("set @@global.tidb_top_sql_precision_seconds=1;") dbt.MustExec("set @@global.tidb_top_sql_report_interval_seconds=2;") dbt.MustExec("set @@global.tidb_top_sql_max_statement_count=5;") - r := reporter.NewRemoteTopSQLReporter(reporter.NewGRPCReportClient(plancodec.DecodeNormalizedPlan)) - tracecpu.GlobalSQLCPUProfiler.SetCollector(&collectorWrapper{r}) - // TODO: change to ensure that the right sql statements are reported, not just counts checkFn := func(n int) { - records := agentServer.GetLatestRecords() + records := receiverServer.GetLatestRecords() require.Len(t, records, n) for _, r := range records { - sqlMeta, exist := agentServer.GetSQLMetaByDigestBlocking(r.SqlDigest, time.Second) + sqlMeta, exist := receiverServer.GetSQLMetaByDigestBlocking(r.SqlDigest, time.Second) require.True(t, exist) require.Regexp(t, "^select.*from.*join", sqlMeta.NormalizedSql) if len(r.PlanDigest) == 0 { continue } - plan, exist := agentServer.GetPlanMetaByDigestBlocking(r.PlanDigest, time.Second) + plan, exist := receiverServer.GetPlanMetaByDigestBlocking(r.PlanDigest, time.Second) require.True(t, exist) plan = strings.Replace(plan, "\n", " ", -1) plan = strings.Replace(plan, "\t", " ", -1) @@ -1606,22 +1601,22 @@ func TestTopSQLAgent(t *testing.T) { cancel := runWorkload(0, 10) // Test with null agent address, the agent server can't receive any record. setTopSQLReceiverAddress("") - agentServer.WaitCollectCnt(1, time.Second*4) + receiverServer.WaitCollectCnt(1, time.Second*4) checkFn(0) // Test after set agent address and the evict take effect. dbt.MustExec("set @@global.tidb_top_sql_max_statement_count=5;") - setTopSQLReceiverAddress(agentServer.Address()) - agentServer.WaitCollectCnt(1, time.Second*4) + setTopSQLReceiverAddress(receiverServer.Address()) + receiverServer.WaitCollectCnt(1, time.Second*4) checkFn(5) // Test with wrong agent address, the agent server can't receive any record. dbt.MustExec("set @@global.tidb_top_sql_max_statement_count=8;") setTopSQLReceiverAddress("127.0.0.1:65530") - agentServer.WaitCollectCnt(1, time.Second*4) + receiverServer.WaitCollectCnt(1, time.Second*4) checkFn(0) // Test after set agent address and the evict take effect. - setTopSQLReceiverAddress(agentServer.Address()) - agentServer.WaitCollectCnt(1, time.Second*4) + setTopSQLReceiverAddress(receiverServer.Address()) + receiverServer.WaitCollectCnt(1, time.Second*4) checkFn(8) cancel() // cancel case 1 @@ -1630,19 +1625,19 @@ func TestTopSQLAgent(t *testing.T) { // empty agent address, should not collect records dbt.MustExec("set @@global.tidb_top_sql_max_statement_count=5;") setTopSQLReceiverAddress("") - agentServer.WaitCollectCnt(1, time.Second*4) + receiverServer.WaitCollectCnt(1, time.Second*4) checkFn(0) // set correct address, should collect records - setTopSQLReceiverAddress(agentServer.Address()) - agentServer.WaitCollectCnt(1, time.Second*4) + setTopSQLReceiverAddress(receiverServer.Address()) + receiverServer.WaitCollectCnt(1, time.Second*4) checkFn(5) // agent server hangs for a while - agentServer.HangFromNow(time.Second * 6) + receiverServer.HangFromNow(time.Second * 6) // run another set of SQL queries cancel2() cancel3 := runWorkload(11, 20) - agentServer.WaitCollectCnt(1, time.Second*8) + receiverServer.WaitCollectCnt(1, time.Second*8) checkFn(5) cancel3() @@ -1650,24 +1645,24 @@ func TestTopSQLAgent(t *testing.T) { cancel4 := runWorkload(0, 10) // empty agent address, should not collect records setTopSQLReceiverAddress("") - agentServer.WaitCollectCnt(1, time.Second*4) + receiverServer.WaitCollectCnt(1, time.Second*4) checkFn(0) // set correct address, should collect records - setTopSQLReceiverAddress(agentServer.Address()) - agentServer.WaitCollectCnt(1, time.Second*8) + setTopSQLReceiverAddress(receiverServer.Address()) + receiverServer.WaitCollectCnt(1, time.Second*8) checkFn(5) // run another set of SQL queries cancel4() cancel5 := runWorkload(11, 20) // agent server shutdown - agentServer.Stop() + receiverServer.Stop() // agent server restart - agentServer, err = mockTopSQLReporter.StartMockAgentServer() + receiverServer, err = mockTopSQLReporter.StartMockReceiverServer() require.NoError(t, err) - setTopSQLReceiverAddress(agentServer.Address()) + setTopSQLReceiverAddress(receiverServer.Address()) // check result - agentServer.WaitCollectCnt(2, time.Second*8) + receiverServer.WaitCollectCnt(2, time.Second*8) checkFn(5) cancel5() } diff --git a/session/session.go b/session/session.go index 8a2b61e50e6db..d033bef25260b 100644 --- a/session/session.go +++ b/session/session.go @@ -1272,7 +1272,7 @@ func (s *session) ExecuteInternal(ctx context.Context, sql string, args ...inter s.sessionVars.InRestrictedSQL = true defer func() { s.sessionVars.InRestrictedSQL = origin - if variable.TopSQLEnabled() { + if topsql.InstanceEnabled() { // Restore the goroutine label by using the original ctx after execution is finished. pprof.SetGoroutineLabels(ctx) } @@ -1412,7 +1412,7 @@ func (s *session) ParseWithParams(ctx context.Context, sql string, args ...inter for _, warn := range warns { s.sessionVars.StmtCtx.AppendWarning(util.SyntaxWarn(warn)) } - if variable.TopSQLEnabled() { + if topsql.InstanceEnabled() { normalized, digest := parser.NormalizeDigest(sql) if digest != nil { // Reset the goroutine label when internal sql execute finish. @@ -1426,7 +1426,7 @@ func (s *session) ParseWithParams(ctx context.Context, sql string, args ...inter // ExecRestrictedStmt implements RestrictedSQLExecutor interface. func (s *session) ExecRestrictedStmt(ctx context.Context, stmtNode ast.StmtNode, opts ...sqlexec.OptionFuncAlias) ( []chunk.Row, []*ast.ResultField, error) { - if variable.TopSQLEnabled() { + if topsql.InstanceEnabled() { defer pprof.SetGoroutineLabels(ctx) } var execOption sqlexec.ExecOption @@ -1533,7 +1533,7 @@ func (s *session) ExecuteStmt(ctx context.Context, stmtNode ast.StmtNode) (sqlex return nil, err } normalizedSQL, digest := s.sessionVars.StmtCtx.SQLDigest() - if variable.TopSQLEnabled() { + if topsql.InstanceEnabled() { ctx = topsql.AttachSQLInfo(ctx, normalizedSQL, digest, "", nil, s.sessionVars.InRestrictedSQL) } diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index cae6fd4c4aa50..f0c08acefb73b 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -35,6 +35,9 @@ import ( "github.com/pingcap/tidb/util/collate" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/stmtsummary" + "github.com/pingcap/tidb/util/topsql" + "github.com/pingcap/tidb/util/topsql/reporter" + "github.com/pingcap/tidb/util/tracecpu" "github.com/pingcap/tidb/util/versioninfo" tikvstore "github.com/tikv/client-go/v2/kv" atomic2 "go.uber.org/atomic" @@ -798,12 +801,13 @@ var defaultSysVars = []*SysVar{ }, GetSession: func(s *SessionVars) (string, error) { return strconv.FormatInt(int64(GlobalLogMaxDays.Load()), 10), nil }}, - {Scope: ScopeSession, Name: TiDBPProfSQLCPU, Value: strconv.Itoa(DefTiDBPProfSQLCPU), Type: TypeInt, skipInit: true, MinValue: 0, MaxValue: 1, SetSession: func(s *SessionVars, val string) error { - EnablePProfSQLCPU.Store(uint32(tidbOptPositiveInt32(val, DefTiDBPProfSQLCPU)) > 0) + {Scope: ScopeSession, Name: TiDBPProfSQLCPU, Value: strconv.Itoa(tracecpu.DefEnablePProfSQLCPU), Type: TypeInt, skipInit: true, MinValue: 0, MaxValue: 1, SetSession: func(s *SessionVars, val string) error { + enable := uint32(tidbOptPositiveInt32(val, tracecpu.DefEnablePProfSQLCPU)) > 0 + tracecpu.EnablePProfSQLCPU.Store(enable) return nil }, GetSession: func(s *SessionVars) (string, error) { val := "0" - if EnablePProfSQLCPU.Load() { + if tracecpu.EnablePProfSQLCPU.Load() { val = "1" } return val, nil @@ -1250,50 +1254,47 @@ var defaultSysVars = []*SysVar{ return nil }}, // variable for top SQL feature. - {Scope: ScopeGlobal, Name: TiDBEnableTopSQL, Value: BoolToOnOff(DefTiDBTopSQLEnable), Type: TypeBool, Hidden: true, AllowEmpty: true, GetGlobal: func(s *SessionVars) (string, error) { - return BoolToOnOff(TopSQLVariable.Enable.Load()), nil - }, SetGlobal: func(vars *SessionVars, s string) error { - TopSQLVariable.Enable.Store(TiDBOptOn(s)) - return nil + {Scope: ScopeGlobal, Name: TiDBEnableTopSQL, Value: BoolToOnOff(topsql.InstanceEnabled()), Type: TypeBool, Hidden: true, GetGlobal: func(sessionVars *SessionVars) (string, error) { + return BoolToOnOff(topsql.InstanceEnabled()), nil }, GlobalConfigName: GlobalConfigEnableTopSQL}, - {Scope: ScopeGlobal, Name: TiDBTopSQLPrecisionSeconds, Value: strconv.Itoa(DefTiDBTopSQLPrecisionSeconds), Type: TypeInt, Hidden: true, MinValue: 1, MaxValue: math.MaxInt64, GetGlobal: func(s *SessionVars) (string, error) { - return strconv.FormatInt(TopSQLVariable.PrecisionSeconds.Load(), 10), nil + {Scope: ScopeGlobal, Name: TiDBTopSQLPrecisionSeconds, Value: strconv.Itoa(tracecpu.DefPrecisionSeconds), Type: TypeInt, Hidden: true, MinValue: 1, MaxValue: math.MaxInt64, GetGlobal: func(s *SessionVars) (string, error) { + return strconv.FormatInt(tracecpu.PrecisionSeconds.Load(), 10), nil }, SetGlobal: func(vars *SessionVars, s string) error { val, err := strconv.ParseInt(s, 10, 64) if err != nil { return err } - TopSQLVariable.PrecisionSeconds.Store(val) + tracecpu.PrecisionSeconds.Store(val) return nil }}, - {Scope: ScopeGlobal, Name: TiDBTopSQLMaxStatementCount, Value: strconv.Itoa(DefTiDBTopSQLMaxStatementCount), Type: TypeInt, Hidden: true, MinValue: 0, MaxValue: 5000, GetGlobal: func(s *SessionVars) (string, error) { - return strconv.FormatInt(TopSQLVariable.MaxStatementCount.Load(), 10), nil + {Scope: ScopeGlobal, Name: TiDBTopSQLMaxStatementCount, Value: strconv.Itoa(reporter.DefMaxStatementCount), Type: TypeInt, Hidden: true, MinValue: 0, MaxValue: 5000, GetGlobal: func(s *SessionVars) (string, error) { + return strconv.FormatInt(reporter.MaxStatementCount.Load(), 10), nil }, SetGlobal: func(vars *SessionVars, s string) error { val, err := strconv.ParseInt(s, 10, 64) if err != nil { return err } - TopSQLVariable.MaxStatementCount.Store(val) + reporter.MaxStatementCount.Store(val) return nil }}, - {Scope: ScopeGlobal, Name: TiDBTopSQLMaxCollect, Value: strconv.Itoa(DefTiDBTopSQLMaxCollect), Type: TypeInt, Hidden: true, MinValue: 1, MaxValue: 500000, GetGlobal: func(s *SessionVars) (string, error) { - return strconv.FormatInt(TopSQLVariable.MaxCollect.Load(), 10), nil + {Scope: ScopeGlobal, Name: TiDBTopSQLMaxCollect, Value: strconv.Itoa(reporter.DefMaxCollect), Type: TypeInt, Hidden: true, MinValue: 1, MaxValue: 500000, GetGlobal: func(s *SessionVars) (string, error) { + return strconv.FormatInt(reporter.MaxCollect.Load(), 10), nil }, SetGlobal: func(vars *SessionVars, s string) error { val, err := strconv.ParseInt(s, 10, 64) if err != nil { return err } - TopSQLVariable.MaxCollect.Store(val) + reporter.MaxCollect.Store(val) return nil }}, - {Scope: ScopeGlobal, Name: TiDBTopSQLReportIntervalSeconds, Value: strconv.Itoa(DefTiDBTopSQLReportIntervalSeconds), Type: TypeInt, Hidden: true, MinValue: 1, MaxValue: 1 * 60 * 60, GetGlobal: func(s *SessionVars) (string, error) { - return strconv.FormatInt(TopSQLVariable.ReportIntervalSeconds.Load(), 10), nil + {Scope: ScopeGlobal, Name: TiDBTopSQLReportIntervalSeconds, Value: strconv.Itoa(reporter.DefReportIntervalSeconds), Type: TypeInt, Hidden: true, MinValue: 1, MaxValue: 1 * 60 * 60, GetGlobal: func(s *SessionVars) (string, error) { + return strconv.FormatInt(reporter.ReportIntervalSeconds.Load(), 10), nil }, SetGlobal: func(vars *SessionVars, s string) error { val, err := strconv.ParseInt(s, 10, 64) if err != nil { return err } - TopSQLVariable.ReportIntervalSeconds.Store(val) + reporter.ReportIntervalSeconds.Store(val) return nil }}, {Scope: ScopeGlobal, Name: SkipNameResolve, Value: Off, Type: TypeBool}, diff --git a/sessionctx/variable/sysvar_test.go b/sessionctx/variable/sysvar_test.go index e15c9f92b92b8..864a0a33867b1 100644 --- a/sessionctx/variable/sysvar_test.go +++ b/sessionctx/variable/sysvar_test.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/parser/terror" + "github.com/pingcap/tidb/util/tracecpu" "github.com/stretchr/testify/require" ) @@ -568,7 +569,7 @@ func TestInstanceScopedVars(t *testing.T) { val, err = GetSessionOrGlobalSystemVar(vars, TiDBPProfSQLCPU) require.NoError(t, err) expected := "0" - if EnablePProfSQLCPU.Load() { + if tracecpu.EnablePProfSQLCPU.Load() { expected = "1" } require.Equal(t, expected, val) diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 2e38db2699969..b44f3f66ce5e5 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -688,7 +688,6 @@ const ( DefTiDBMemQuotaIndexLookupJoin = 32 << 30 // 32GB. DefTiDBMemQuotaDistSQL = 32 << 30 // 32GB. DefTiDBGeneralLog = false - DefTiDBPProfSQLCPU = 0 DefTiDBRetryLimit = 10 DefTiDBDisableTxnAutoRetry = true DefTiDBConstraintCheckInPlace = false @@ -763,11 +762,6 @@ const ( DefTiDBTrackAggregateMemoryUsage = true DefTiDBEnableExchangePartition = false DefCTEMaxRecursionDepth = 1000 - DefTiDBTopSQLEnable = false - DefTiDBTopSQLPrecisionSeconds = 1 - DefTiDBTopSQLMaxStatementCount = 200 - DefTiDBTopSQLMaxCollect = 10000 - DefTiDBTopSQLReportIntervalSeconds = 60 DefTiDBTmpTableMaxSize = 64 << 20 // 64MB. DefTiDBEnableLocalTxn = false DefTiDBTSOClientBatchMaxWaitTime = 0.0 // 0ms @@ -783,7 +777,6 @@ const ( var ( ProcessGeneralLog = atomic.NewBool(false) GlobalLogMaxDays = atomic.NewInt32(int32(config.GetGlobalConfig().Log.File.MaxDays)) - EnablePProfSQLCPU = atomic.NewBool(false) ddlReorgWorkerCounter int32 = DefTiDBDDLReorgWorkerCount ddlReorgBatchSize int32 = DefTiDBDDLReorgBatchSize ddlErrorCountlimit int64 = DefTiDBDDLErrorCountLimit @@ -801,34 +794,8 @@ var ( CapturePlanBaseline = serverGlobalVariable{globalVal: Off} DefExecutorConcurrency = 5 MemoryUsageAlarmRatio = atomic.NewFloat64(config.GetGlobalConfig().Performance.MemoryUsageAlarmRatio) - TopSQLVariable = TopSQL{ - Enable: atomic.NewBool(DefTiDBTopSQLEnable), - PrecisionSeconds: atomic.NewInt64(DefTiDBTopSQLPrecisionSeconds), - MaxStatementCount: atomic.NewInt64(DefTiDBTopSQLMaxStatementCount), - MaxCollect: atomic.NewInt64(DefTiDBTopSQLMaxCollect), - ReportIntervalSeconds: atomic.NewInt64(DefTiDBTopSQLReportIntervalSeconds), - } - EnableLocalTxn = atomic.NewBool(DefTiDBEnableLocalTxn) - MaxTSOBatchWaitInterval = atomic.NewFloat64(DefTiDBTSOClientBatchMaxWaitTime) - EnableTSOFollowerProxy = atomic.NewBool(DefTiDBEnableTSOFollowerProxy) - RestrictedReadOnly = atomic.NewBool(DefTiDBRestrictedReadOnly) + EnableLocalTxn = atomic.NewBool(DefTiDBEnableLocalTxn) + MaxTSOBatchWaitInterval = atomic.NewFloat64(DefTiDBTSOClientBatchMaxWaitTime) + EnableTSOFollowerProxy = atomic.NewBool(DefTiDBEnableTSOFollowerProxy) + RestrictedReadOnly = atomic.NewBool(DefTiDBRestrictedReadOnly) ) - -// TopSQL is the variable for control top sql feature. -type TopSQL struct { - // Enable top-sql or not. - Enable *atomic.Bool - // The refresh interval of top-sql. - PrecisionSeconds *atomic.Int64 - // The maximum number of statements kept in memory. - MaxStatementCount *atomic.Int64 - // The maximum capacity of the collect map. - MaxCollect *atomic.Int64 - // The report data interval of top-sql. - ReportIntervalSeconds *atomic.Int64 -} - -// TopSQLEnabled uses to check whether enabled the top SQL feature. -func TopSQLEnabled() bool { - return TopSQLVariable.Enable.Load() && config.GetGlobalConfig().TopSQL.ReceiverAddress != "" -} diff --git a/tidb-server/main.go b/tidb-server/main.go index f53b1f26f0e88..edc2472b93312 100644 --- a/tidb-server/main.go +++ b/tidb-server/main.go @@ -59,6 +59,7 @@ import ( "github.com/pingcap/tidb/util/kvcache" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/memory" + "github.com/pingcap/tidb/util/plancodec" "github.com/pingcap/tidb/util/printer" "github.com/pingcap/tidb/util/sem" "github.com/pingcap/tidb/util/signal" @@ -66,6 +67,7 @@ import ( storageSys "github.com/pingcap/tidb/util/sys/storage" "github.com/pingcap/tidb/util/systimemon" "github.com/pingcap/tidb/util/topsql" + "github.com/pingcap/tidb/util/topsql/reporter" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/push" "github.com/tikv/client-go/v2/tikv" @@ -209,7 +211,8 @@ func main() { cleanup(svr, storage, dom, graceful) close(exited) }) - topsql.SetupTopSQL() + report := reporter.NewRemoteTopSQLReporter(plancodec.DecodeNormalizedPlan) + topsql.SetupTopSQL(report) terror.MustNil(svr.Run()) <-exited syncLog() diff --git a/util/topsql/main_test.go b/util/topsql/main_test.go index f5e3dc3f7d0cf..7d2142aadea6c 100644 --- a/util/topsql/main_test.go +++ b/util/topsql/main_test.go @@ -17,24 +17,13 @@ package topsql import ( "testing" - "github.com/pingcap/tidb/config" - "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/util/testbridge" - "github.com/pingcap/tidb/util/topsql/tracecpu" "go.uber.org/goleak" ) func TestMain(m *testing.M) { testbridge.WorkaroundGoCheckFlags() - // set up - variable.TopSQLVariable.Enable.Store(true) - config.UpdateGlobal(func(conf *config.Config) { - conf.TopSQL.ReceiverAddress = "mock" - }) - variable.TopSQLVariable.PrecisionSeconds.Store(1) - tracecpu.GlobalSQLCPUProfiler.Run() - opts := []goleak.Option{ goleak.IgnoreTopFunction("time.Sleep"), goleak.IgnoreTopFunction("runtime/pprof.readProfile"), diff --git a/util/topsql/reporter/client.go b/util/topsql/reporter/client.go deleted file mode 100644 index 994189250e52b..0000000000000 --- a/util/topsql/reporter/client.go +++ /dev/null @@ -1,252 +0,0 @@ -// Copyright 2021 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package reporter - -import ( - "context" - "math" - "sync" - "time" - - "github.com/pingcap/tidb/util/logutil" - "github.com/pingcap/tipb/go-tipb" - "go.uber.org/zap" - "google.golang.org/grpc" - "google.golang.org/grpc/backoff" -) - -// ReportClient send data to the target server. -type ReportClient interface { - Send(ctx context.Context, addr string, data reportData) error - Close() -} - -// GRPCReportClient reports data to grpc servers. -type GRPCReportClient struct { - curRPCAddr string - conn *grpc.ClientConn - // calling decodePlan this can take a while, so should not block critical paths - decodePlan planBinaryDecodeFunc -} - -// NewGRPCReportClient returns a new GRPCReportClient -func NewGRPCReportClient(decodePlan planBinaryDecodeFunc) *GRPCReportClient { - return &GRPCReportClient{ - decodePlan: decodePlan, - } -} - -var _ ReportClient = &GRPCReportClient{} - -// Send implements the ReportClient interface. -// Currently the implementation will establish a new connection every time, which is suitable for a per-minute sending period -func (r *GRPCReportClient) Send(ctx context.Context, targetRPCAddr string, data reportData) error { - if targetRPCAddr == "" { - return nil - } - err := r.tryEstablishConnection(ctx, targetRPCAddr) - if err != nil { - return err - } - - var wg sync.WaitGroup - errCh := make(chan error, 3) - wg.Add(3) - - go func() { - defer wg.Done() - errCh <- r.sendBatchSQLMeta(ctx, data.normalizedSQLMap) - }() - go func() { - defer wg.Done() - errCh <- r.sendBatchPlanMeta(ctx, data.normalizedPlanMap) - }() - go func() { - defer wg.Done() - errCh <- r.sendBatchCPUTimeRecord(ctx, data.collectedData) - }() - wg.Wait() - close(errCh) - for err := range errCh { - if err != nil { - return err - } - } - return nil -} - -// Close uses to close grpc connection. -func (r *GRPCReportClient) Close() { - if r.conn == nil { - return - } - err := r.conn.Close() - if err != nil { - logutil.BgLogger().Warn("[top-sql] grpc client close connection failed", zap.Error(err)) - } - r.conn = nil -} - -// sendBatchCPUTimeRecord sends a batch of TopSQL records by stream. -func (r *GRPCReportClient) sendBatchCPUTimeRecord(ctx context.Context, records []*dataPoints) error { - if len(records) == 0 { - return nil - } - start := time.Now() - client := tipb.NewTopSQLAgentClient(r.conn) - stream, err := client.ReportCPUTimeRecords(ctx) - if err != nil { - return err - } - for _, record := range records { - record := &tipb.CPUTimeRecord{ - RecordListTimestampSec: record.TimestampList, - RecordListCpuTimeMs: record.CPUTimeMsList, - SqlDigest: record.SQLDigest, - PlanDigest: record.PlanDigest, - } - if err := stream.Send(record); err != nil { - return err - } - } - topSQLReportRecordCounterHistogram.Observe(float64(len(records))) - // See https://pkg.go.dev/google.golang.org/grpc#ClientConn.NewStream for how to avoid leaking the stream - _, err = stream.CloseAndRecv() - if err != nil { - reportRecordDurationFailedHistogram.Observe(time.Since(start).Seconds()) - return err - } - reportRecordDurationSuccHistogram.Observe(time.Since(start).Seconds()) - return nil -} - -// sendBatchSQLMeta sends a batch of SQL metas by stream. -func (r *GRPCReportClient) sendBatchSQLMeta(ctx context.Context, sqlMap *sync.Map) error { - start := time.Now() - client := tipb.NewTopSQLAgentClient(r.conn) - stream, err := client.ReportSQLMeta(ctx) - if err != nil { - return err - } - cnt := 0 - sqlMap.Range(func(key, value interface{}) bool { - cnt++ - meta := value.(SQLMeta) - sqlMeta := &tipb.SQLMeta{ - SqlDigest: []byte(key.(string)), - NormalizedSql: meta.normalizedSQL, - IsInternalSql: meta.isInternal, - } - if err = stream.Send(sqlMeta); err != nil { - return false - } - return true - }) - // stream.Send return error - if err != nil { - return err - } - topSQLReportSQLCountHistogram.Observe(float64(cnt)) - _, err = stream.CloseAndRecv() - if err != nil { - reportSQLDurationFailedHistogram.Observe(time.Since(start).Seconds()) - return err - } - reportSQLDurationSuccHistogram.Observe(time.Since(start).Seconds()) - return nil -} - -// sendBatchPlanMeta sends a batch of SQL metas by stream. -func (r *GRPCReportClient) sendBatchPlanMeta(ctx context.Context, planMap *sync.Map) error { - start := time.Now() - client := tipb.NewTopSQLAgentClient(r.conn) - stream, err := client.ReportPlanMeta(ctx) - if err != nil { - return err - } - cnt := 0 - planMap.Range(func(key, value interface{}) bool { - planDecoded, errDecode := r.decodePlan(value.(string)) - if errDecode != nil { - logutil.BgLogger().Warn("[top-sql] decode plan failed", zap.Error(errDecode)) - return true - } - cnt++ - planMeta := &tipb.PlanMeta{ - PlanDigest: []byte(key.(string)), - NormalizedPlan: planDecoded, - } - if err = stream.Send(planMeta); err != nil { - return false - } - return true - }) - // stream.Send return error - if err != nil { - return err - } - topSQLReportPlanCountHistogram.Observe(float64(cnt)) - _, err = stream.CloseAndRecv() - if err != nil { - reportPlanDurationFailedHistogram.Observe(time.Since(start).Seconds()) - return err - } - reportPlanDurationSuccHistogram.Observe(time.Since(start).Seconds()) - return err -} - -// tryEstablishConnection establishes the gRPC connection if connection is not established. -func (r *GRPCReportClient) tryEstablishConnection(ctx context.Context, targetRPCAddr string) (err error) { - if r.curRPCAddr == targetRPCAddr && r.conn != nil { - // Address is not changed, skip. - return nil - } - - if r.conn != nil { - err := r.conn.Close() - logutil.BgLogger().Warn("[top-sql] grpc client close connection failed", zap.Error(err)) - } - - r.conn, err = r.dial(ctx, targetRPCAddr) - if err != nil { - return err - } - r.curRPCAddr = targetRPCAddr - return nil -} - -func (r *GRPCReportClient) dial(ctx context.Context, targetRPCAddr string) (*grpc.ClientConn, error) { - dialCtx, cancel := context.WithTimeout(ctx, dialTimeout) - defer cancel() - return grpc.DialContext( - dialCtx, - targetRPCAddr, - grpc.WithBlock(), - grpc.WithInsecure(), - grpc.WithInitialWindowSize(grpcInitialWindowSize), - grpc.WithInitialConnWindowSize(grpcInitialConnWindowSize), - grpc.WithDefaultCallOptions( - grpc.MaxCallRecvMsgSize(math.MaxInt64), - ), - grpc.WithConnectParams(grpc.ConnectParams{ - Backoff: backoff.Config{ - BaseDelay: 100 * time.Millisecond, // Default was 1s. - Multiplier: 1.6, // Default - Jitter: 0.2, // Default - MaxDelay: 3 * time.Second, // Default was 120s. - }, - }), - ) -} diff --git a/util/topsql/reporter/datasink.go b/util/topsql/reporter/datasink.go new file mode 100644 index 0000000000000..3058a30b2903c --- /dev/null +++ b/util/topsql/reporter/datasink.go @@ -0,0 +1,37 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package reporter + +import ( + "time" +) + +// DataSink collects and sends data to a target. +type DataSink interface { + // Send pushes a report data into the sink, which will later be sent to a target by the sink. A deadline can be + // specified to control how late it should be sent. If the sink is kept full and cannot schedule a send within + // the specified deadline, the data will be silently dropped. + Send(data ReportData, deadline time.Time) + + // IsPaused indicates that the DataSink is not expecting to receive records for now + // and may resume in the future. + IsPaused() bool + + // IsDown indicates that the DataSink has been down and can be cleared. + // Note that: once a DataSink is down, it cannot go back to be up. + IsDown() bool + + Close() +} diff --git a/util/topsql/reporter/mock/pubsub.go b/util/topsql/reporter/mock/pubsub.go new file mode 100644 index 0000000000000..493d95c17f827 --- /dev/null +++ b/util/topsql/reporter/mock/pubsub.go @@ -0,0 +1,67 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mock + +import ( + "fmt" + "net" + + "github.com/pingcap/tidb/util/logutil" + "go.uber.org/zap" + "google.golang.org/grpc" +) + +type mockPubSubServer struct { + addr string + listen net.Listener + grpcServer *grpc.Server +} + +// NewMockPubSubServer creates a mock publisher server. +func NewMockPubSubServer() (*mockPubSubServer, error) { + addr := "127.0.0.1:0" + lis, err := net.Listen("tcp", addr) + if err != nil { + return nil, err + } + server := grpc.NewServer() + + return &mockPubSubServer{ + addr: fmt.Sprintf("127.0.0.1:%d", lis.Addr().(*net.TCPAddr).Port), + listen: lis, + grpcServer: server, + }, nil +} + +func (svr *mockPubSubServer) Serve() { + err := svr.grpcServer.Serve(svr.listen) + if err != nil { + logutil.BgLogger().Warn("[top-sql] mock pubsub server serve failed", zap.Error(err)) + } +} + +func (svr *mockPubSubServer) Server() *grpc.Server { + return svr.grpcServer +} + +func (svr *mockPubSubServer) Address() string { + return svr.addr +} + +func (svr *mockPubSubServer) Stop() { + if svr.grpcServer != nil { + svr.grpcServer.Stop() + } +} diff --git a/util/topsql/reporter/mock/server.go b/util/topsql/reporter/mock/server.go index 5dca814fb30e5..aed6280956268 100644 --- a/util/topsql/reporter/mock/server.go +++ b/util/topsql/reporter/mock/server.go @@ -28,7 +28,7 @@ import ( "google.golang.org/grpc" ) -type mockAgentServer struct { +type mockReceiverServer struct { sync.Mutex addr string grpcServer *grpc.Server @@ -41,42 +41,42 @@ type mockAgentServer struct { } } -// StartMockAgentServer starts the mock agent server. -func StartMockAgentServer() (*mockAgentServer, error) { +// StartMockReceiverServer starts the mock receiver server. +func StartMockReceiverServer() (*mockReceiverServer, error) { addr := "127.0.0.1:0" lis, err := net.Listen("tcp", addr) if err != nil { return nil, err } server := grpc.NewServer() - agentServer := &mockAgentServer{ + receiverServer := &mockReceiverServer{ addr: fmt.Sprintf("127.0.0.1:%d", lis.Addr().(*net.TCPAddr).Port), grpcServer: server, sqlMetas: make(map[string]tipb.SQLMeta, 5000), planMetas: make(map[string]string, 5000), } - agentServer.hang.beginTime.Store(time.Now()) - agentServer.hang.endTime.Store(time.Now()) - tipb.RegisterTopSQLAgentServer(server, agentServer) + receiverServer.hang.beginTime.Store(time.Now()) + receiverServer.hang.endTime.Store(time.Now()) + tipb.RegisterTopSQLAgentServer(server, receiverServer) go func() { err := server.Serve(lis) if err != nil { - logutil.BgLogger().Warn("[top-sql] mock agent server serve failed", zap.Error(err)) + logutil.BgLogger().Warn("[top-sql] mock receiver server serve failed", zap.Error(err)) } }() - return agentServer, nil + return receiverServer, nil } -func (svr *mockAgentServer) HangFromNow(duration time.Duration) { +func (svr *mockReceiverServer) HangFromNow(duration time.Duration) { now := time.Now() svr.hang.beginTime.Store(now) svr.hang.endTime.Store(now.Add(duration)) } // mayHang will check the hanging period, and ensure to sleep through it -func (svr *mockAgentServer) mayHang() { +func (svr *mockReceiverServer) mayHang() { now := time.Now() beginTime := svr.hang.beginTime.Load().(time.Time) endTime := svr.hang.endTime.Load().(time.Time) @@ -85,7 +85,7 @@ func (svr *mockAgentServer) mayHang() { } } -func (svr *mockAgentServer) ReportCPUTimeRecords(stream tipb.TopSQLAgent_ReportCPUTimeRecordsServer) error { +func (svr *mockReceiverServer) ReportCPUTimeRecords(stream tipb.TopSQLAgent_ReportCPUTimeRecordsServer) error { records := make([]*tipb.CPUTimeRecord, 0, 10) for { svr.mayHang() @@ -103,7 +103,7 @@ func (svr *mockAgentServer) ReportCPUTimeRecords(stream tipb.TopSQLAgent_ReportC return stream.SendAndClose(&tipb.EmptyResponse{}) } -func (svr *mockAgentServer) ReportSQLMeta(stream tipb.TopSQLAgent_ReportSQLMetaServer) error { +func (svr *mockReceiverServer) ReportSQLMeta(stream tipb.TopSQLAgent_ReportSQLMetaServer) error { for { svr.mayHang() req, err := stream.Recv() @@ -119,7 +119,7 @@ func (svr *mockAgentServer) ReportSQLMeta(stream tipb.TopSQLAgent_ReportSQLMetaS return stream.SendAndClose(&tipb.EmptyResponse{}) } -func (svr *mockAgentServer) ReportPlanMeta(stream tipb.TopSQLAgent_ReportPlanMetaServer) error { +func (svr *mockReceiverServer) ReportPlanMeta(stream tipb.TopSQLAgent_ReportPlanMetaServer) error { for { svr.mayHang() req, err := stream.Recv() @@ -135,7 +135,7 @@ func (svr *mockAgentServer) ReportPlanMeta(stream tipb.TopSQLAgent_ReportPlanMet return stream.SendAndClose(&tipb.EmptyResponse{}) } -func (svr *mockAgentServer) WaitCollectCnt(cnt int, timeout time.Duration) { +func (svr *mockReceiverServer) WaitCollectCnt(cnt int, timeout time.Duration) { start := time.Now() svr.Lock() old := len(svr.records) @@ -154,7 +154,7 @@ func (svr *mockAgentServer) WaitCollectCnt(cnt int, timeout time.Duration) { } } -func (svr *mockAgentServer) GetSQLMetaByDigestBlocking(digest []byte, timeout time.Duration) (meta tipb.SQLMeta, exist bool) { +func (svr *mockReceiverServer) GetSQLMetaByDigestBlocking(digest []byte, timeout time.Duration) (meta tipb.SQLMeta, exist bool) { start := time.Now() for { svr.Lock() @@ -167,7 +167,7 @@ func (svr *mockAgentServer) GetSQLMetaByDigestBlocking(digest []byte, timeout ti } } -func (svr *mockAgentServer) GetPlanMetaByDigestBlocking(digest []byte, timeout time.Duration) (normalizedPlan string, exist bool) { +func (svr *mockReceiverServer) GetPlanMetaByDigestBlocking(digest []byte, timeout time.Duration) (normalizedPlan string, exist bool) { start := time.Now() for { svr.Lock() @@ -180,7 +180,7 @@ func (svr *mockAgentServer) GetPlanMetaByDigestBlocking(digest []byte, timeout t } } -func (svr *mockAgentServer) GetLatestRecords() []*tipb.CPUTimeRecord { +func (svr *mockReceiverServer) GetLatestRecords() []*tipb.CPUTimeRecord { svr.Lock() records := svr.records svr.records = [][]*tipb.CPUTimeRecord{} @@ -192,11 +192,11 @@ func (svr *mockAgentServer) GetLatestRecords() []*tipb.CPUTimeRecord { return records[len(records)-1] } -func (svr *mockAgentServer) Address() string { +func (svr *mockReceiverServer) Address() string { return svr.addr } -func (svr *mockAgentServer) Stop() { +func (svr *mockReceiverServer) Stop() { if svr.grpcServer != nil { svr.grpcServer.Stop() } diff --git a/util/topsql/reporter/pubsub.go b/util/topsql/reporter/pubsub.go new file mode 100644 index 0000000000000..b78e865da0a72 --- /dev/null +++ b/util/topsql/reporter/pubsub.go @@ -0,0 +1,257 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package reporter + +import ( + "context" + "errors" + "time" + + "github.com/pingcap/tidb/util" + "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tipb/go-tipb" + "go.uber.org/atomic" + "go.uber.org/zap" +) + +// TopSQLPubSubService implements tipb.TopSQLPubSubServer. +// +// If a client subscribes to TopSQL records, the TopSQLPubSubService is responsible +// for registering an associated DataSink to the reporter. Then the reporter sends +// data to the client periodically. +type TopSQLPubSubService struct { + dataSinkRegHandle DataSinkRegHandle +} + +// NewTopSQLPubSubService creates a new TopSQLPubSubService. +func NewTopSQLPubSubService( + dataSinkRegHandle DataSinkRegHandle, +) *TopSQLPubSubService { + return &TopSQLPubSubService{ + dataSinkRegHandle: dataSinkRegHandle, + } +} + +var _ tipb.TopSQLPubSubServer = &TopSQLPubSubService{} + +// Subscribe registers dataSinks to the reporter and redirects data received from reporter +// to subscribers associated with those dataSinks. +func (t *TopSQLPubSubService) Subscribe( + _ *tipb.TopSQLSubRequest, + stream tipb.TopSQLPubSub_SubscribeServer, +) error { + ds := newPubSubDataSink(stream) + t.dataSinkRegHandle.Register(ds) + ds.run() + return nil +} + +type pubSubDataSink struct { + stream tipb.TopSQLPubSub_SubscribeServer + sendTaskCh chan sendTask + isDown *atomic.Bool +} + +func newPubSubDataSink( + stream tipb.TopSQLPubSub_SubscribeServer, +) *pubSubDataSink { + return &pubSubDataSink{ + stream: stream, + sendTaskCh: make(chan sendTask, 1), + isDown: atomic.NewBool(false), + } +} + +var _ DataSink = &pubSubDataSink{} + +func (s *pubSubDataSink) Send(data ReportData, deadline time.Time) { + if s.IsDown() { + return + } + + select { + case s.sendTaskCh <- sendTask{data: data, deadline: deadline}: + // sent successfully + default: + ignoreReportChannelFullCounter.Inc() + logutil.BgLogger().Warn("[top-sql] report channel is full") + } +} + +func (s *pubSubDataSink) IsPaused() bool { + return false +} + +func (s *pubSubDataSink) IsDown() bool { + return s.isDown.Load() +} + +func (s *pubSubDataSink) Close() { + close(s.sendTaskCh) +} + +func (s *pubSubDataSink) run() { + defer s.isDown.Store(true) + + for task := range s.sendTaskCh { + ctx, cancel := context.WithDeadline(context.Background(), task.deadline) + start := time.Now() + + var err error + go util.WithRecovery(func() { + defer cancel() + err = s.doSend(ctx, task.data) + if err != nil { + reportAllDurationFailedHistogram.Observe(time.Since(start).Seconds()) + } else { + reportAllDurationSuccHistogram.Observe(time.Since(start).Seconds()) + } + }, nil) + + <-ctx.Done() + if errors.Is(ctx.Err(), context.DeadlineExceeded) { + logutil.BgLogger().Warn( + "[top-sql] pubsub datasink failed to send data to subscriber due to deadline exceeded", + zap.Time("deadline", task.deadline), + ) + } else if err != nil { + logutil.BgLogger().Warn( + "[top-sql] pubsub datasink failed to send data to subscriber", + zap.Error(err), + ) + } + } +} + +func (s *pubSubDataSink) doSend(ctx context.Context, data ReportData) error { + if err := s.sendCPUTime(ctx, data.CPUTimeRecords); err != nil { + return err + } + if err := s.sendSQLMeta(ctx, data.SQLMetas); err != nil { + return err + } + return s.sendPlanMeta(ctx, data.PlanMetas) +} + +func (s *pubSubDataSink) sendCPUTime(ctx context.Context, records []*tipb.CPUTimeRecord) (err error) { + if len(records) == 0 { + return + } + + start := time.Now() + sentCount := 0 + defer func() { + topSQLReportRecordCounterHistogram.Observe(float64(sentCount)) + if err != nil { + reportRecordDurationFailedHistogram.Observe(time.Since(start).Seconds()) + } else { + reportRecordDurationSuccHistogram.Observe(time.Since(start).Seconds()) + } + }() + + cpuRecord := &tipb.TopSQLSubResponse_Record{} + r := &tipb.TopSQLSubResponse{RespOneof: cpuRecord} + + for i := range records { + cpuRecord.Record = records[i] + if err = s.stream.Send(r); err != nil { + return + } + sentCount += 1 + + select { + case <-ctx.Done(): + err = ctx.Err() + return + default: + } + } + + return +} + +func (s *pubSubDataSink) sendSQLMeta(ctx context.Context, sqlMetas []*tipb.SQLMeta) (err error) { + if len(sqlMetas) == 0 { + return + } + + start := time.Now() + sentCount := 0 + defer func() { + topSQLReportSQLCountHistogram.Observe(float64(sentCount)) + if err != nil { + reportSQLDurationFailedHistogram.Observe(time.Since(start).Seconds()) + } else { + reportSQLDurationSuccHistogram.Observe(time.Since(start).Seconds()) + } + }() + + sqlMeta := &tipb.TopSQLSubResponse_SqlMeta{} + r := &tipb.TopSQLSubResponse{RespOneof: sqlMeta} + + for i := range sqlMetas { + sqlMeta.SqlMeta = sqlMetas[i] + if err = s.stream.Send(r); err != nil { + return + } + sentCount += 1 + + select { + case <-ctx.Done(): + err = ctx.Err() + return + default: + } + } + + return +} + +func (s *pubSubDataSink) sendPlanMeta(ctx context.Context, planMetas []*tipb.PlanMeta) (err error) { + if len(planMetas) == 0 { + return + } + + start := time.Now() + sentCount := 0 + defer func() { + topSQLReportPlanCountHistogram.Observe(float64(sentCount)) + if err != nil { + reportPlanDurationFailedHistogram.Observe(time.Since(start).Seconds()) + } else { + reportPlanDurationSuccHistogram.Observe(time.Since(start).Seconds()) + } + }() + + planMeta := &tipb.TopSQLSubResponse_PlanMeta{} + r := &tipb.TopSQLSubResponse{RespOneof: planMeta} + + for i := range planMetas { + planMeta.PlanMeta = planMetas[i] + if err = s.stream.Send(r); err != nil { + return + } + sentCount += 1 + + select { + case <-ctx.Done(): + err = ctx.Err() + return + default: + } + } + + return +} diff --git a/util/topsql/reporter/reporter.go b/util/topsql/reporter/reporter.go index 08503610734da..45fdd684b44ee 100644 --- a/util/topsql/reporter/reporter.go +++ b/util/topsql/reporter/reporter.go @@ -23,12 +23,11 @@ import ( "time" "github.com/pingcap/failpoint" - "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/metrics" - "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/logutil" - "github.com/pingcap/tidb/util/topsql/tracecpu" + "github.com/pingcap/tidb/util/tracecpu" + "github.com/pingcap/tipb/go-tipb" "github.com/wangjohn/quickselect" atomic2 "go.uber.org/atomic" "go.uber.org/zap" @@ -41,6 +40,27 @@ const ( grpcInitialConnWindowSize = 1 << 30 // keyOthers is the key to store the aggregation of all records that is out of Top N. keyOthers = "" + + // DefMaxStatementCount indicates that the default MaxStatementCount is 200. + DefMaxStatementCount = 200 + + // DefMaxCollect indicates that the default MaxCollect is 10000. + DefMaxCollect = 10000 + + // DefReportIntervalSeconds indicates that the default ReportIntervalSeconds is 60. + DefReportIntervalSeconds = 60 +) + +// Variables for control top sql feature. +var ( + // MaxStatementCount is the maximum number of statements kept in memory. + MaxStatementCount = atomic2.NewInt64(DefMaxStatementCount) + + // MaxCollect is the maximum capacity of the collect map. + MaxCollect = atomic2.NewInt64(DefMaxCollect) + + // ReportIntervalSeconds is he report data interval of top-sql. + ReportIntervalSeconds = atomic2.NewInt64(DefReportIntervalSeconds) ) var _ TopSQLReporter = &RemoteTopSQLReporter{} @@ -50,9 +70,15 @@ type TopSQLReporter interface { tracecpu.Collector RegisterSQL(sqlDigest []byte, normalizedSQL string, isInternal bool) RegisterPlan(planDigest []byte, normalizedPlan string) + DataSinkRegHandle() DataSinkRegHandle Close() } +// DataSinkRegHandle registers DataSink +type DataSinkRegHandle interface { + Register(dataSink DataSink) +} + type cpuData struct { timestamp uint64 records []tracecpu.SQLCPUTimeRecord @@ -120,17 +146,25 @@ type planBinaryDecodeFunc func(string) (string, error) type RemoteTopSQLReporter struct { ctx context.Context cancel context.CancelFunc - client ReportClient - // normalizedSQLMap is an map, whose keys are SQL digest strings and values are SQLMeta. + dataSinks []DataSink + dataSinkRegCh chan DataSink + + // paused indicates no active datasinks + paused *atomic2.Bool + + // normalizedSQLMap is a map, whose keys are SQL digest strings and values are SQLMeta. normalizedSQLMap atomic.Value // sync.Map sqlMapLength atomic2.Int64 - // normalizedPlanMap is an map, whose keys are plan digest strings and values are normalized plans **in binary**. + // normalizedPlanMap is a map, whose keys are plan digest strings and values are normalized plans **in binary**. // The normalized plans in binary can be decoded to string using the `planBinaryDecoder`. normalizedPlanMap atomic.Value // sync.Map planMapLength atomic2.Int64 + // calling decodePlan this can take a while, so should not block critical paths + decodePlan planBinaryDecodeFunc + collectCPUDataChan chan cpuData reportCollectedDataChan chan collectedData } @@ -143,14 +177,18 @@ type SQLMeta struct { // NewRemoteTopSQLReporter creates a new TopSQL reporter // -// planBinaryDecoder is a decoding function which will be called asynchronously to decode the plan binary to string -// MaxStatementsNum is the maximum SQL and plan number, which will restrict the memory usage of the internal LFU cache -func NewRemoteTopSQLReporter(client ReportClient) *RemoteTopSQLReporter { +// planBinaryDecodeFunc is a decoding function which will be called asynchronously to decode the plan binary to string +func NewRemoteTopSQLReporter(planBinaryDecodeFunc planBinaryDecodeFunc) *RemoteTopSQLReporter { ctx, cancel := context.WithCancel(context.Background()) tsr := &RemoteTopSQLReporter{ - ctx: ctx, - cancel: cancel, - client: client, + ctx: ctx, + cancel: cancel, + + paused: atomic2.NewBool(true), + dataSinkRegCh: make(chan DataSink), + + decodePlan: planBinaryDecodeFunc, + collectCPUDataChan: make(chan cpuData, 1), reportCollectedDataChan: make(chan collectedData, 1), } @@ -188,7 +226,7 @@ var ( // This function should be thread-safe, which means parallelly calling it in several goroutines should be fine. // It should also return immediately, and do any CPU-intensive job asynchronously. func (tsr *RemoteTopSQLReporter) RegisterSQL(sqlDigest []byte, normalizedSQL string, isInternal bool) { - if tsr.sqlMapLength.Load() >= variable.TopSQLVariable.MaxCollect.Load() { + if tsr.sqlMapLength.Load() >= MaxCollect.Load() { ignoreExceedSQLCounter.Inc() return } @@ -206,7 +244,7 @@ func (tsr *RemoteTopSQLReporter) RegisterSQL(sqlDigest []byte, normalizedSQL str // RegisterPlan is like RegisterSQL, but for normalized plan strings. // This function is thread-safe and efficient. func (tsr *RemoteTopSQLReporter) RegisterPlan(planDigest []byte, normalizedBinaryPlan string) { - if tsr.planMapLength.Load() >= variable.TopSQLVariable.MaxCollect.Load() { + if tsr.planMapLength.Load() >= MaxCollect.Load() { ignoreExceedPlanCounter.Inc() return } @@ -235,10 +273,23 @@ func (tsr *RemoteTopSQLReporter) Collect(timestamp uint64, records []tracecpu.SQ } } +// IsPaused returns whether RemoteTopSQLReporter is paused. The value also means whether TopSQL is enabled. +func (tsr *RemoteTopSQLReporter) IsPaused() bool { + return tsr.paused.Load() +} + +// DataSinkRegHandle returns a DataSinkRegHandle for DataSink registration. +func (tsr *RemoteTopSQLReporter) DataSinkRegHandle() DataSinkRegHandle { + return &RemoteDataSinkRegHandle{reporterDoneCh: tsr.ctx.Done(), registerCh: tsr.dataSinkRegCh} +} + // Close uses to close and release the reporter resource. func (tsr *RemoteTopSQLReporter) Close() { tsr.cancel() - tsr.client.Close() + for i := range tsr.dataSinks { + tsr.dataSinks[i].Close() + } + tsr.dataSinks = nil } func addEvictedCPUTime(collectTarget map[string]*dataPoints, timestamp uint64, totalCPUTimeMs uint32) { @@ -260,8 +311,8 @@ func addEvictedCPUTime(collectTarget map[string]*dataPoints, timestamp uint64, t others.CPUTimeMsTotal += uint64(totalCPUTimeMs) } -// addEvictedIntoSortedDataPoints adds the evict dataPoints into others. -// Attention, this function depend on others dataPoints is sorted, and this function will modify the evict dataPoints +// addEvictedIntoSortedDataPoints adds evicted dataPoints into others. +// Attention, this function depend on others dataPoints is sorted, and this function will modify evicted dataPoints // to make sure it is sorted by timestamp. func addEvictedIntoSortedDataPoints(others *dataPoints, evict *dataPoints) *dataPoints { if others == nil { @@ -321,9 +372,17 @@ func (tsr *RemoteTopSQLReporter) collectWorker() { defer util.Recover("top-sql", "collectWorker", nil, false) collectedData := make(map[string]*dataPoints) - currentReportInterval := variable.TopSQLVariable.ReportIntervalSeconds.Load() + currentReportInterval := ReportIntervalSeconds.Load() reportTicker := time.NewTicker(time.Second * time.Duration(currentReportInterval)) for { + tsr.acceptDataSinkRegs() + tsr.removeDownDataSinks() + if len(tsr.dataSinks) > 10 { + logutil.BgLogger().Warn("[top-sql] too many datasinks, keep 10 first", zap.Int("count", len(tsr.dataSinks))) + tsr.dataSinks = tsr.dataSinks[:10] + } + tsr.paused.Store(tsr.activeDataSinkCnt() == 0) + select { case data := <-tsr.collectCPUDataChan: // On receiving data to collect: Write to local data array, and retain records with most CPU time. @@ -331,7 +390,7 @@ func (tsr *RemoteTopSQLReporter) collectWorker() { case <-reportTicker.C: tsr.takeDataAndSendToReportChan(&collectedData) // Update `reportTicker` if report interval changed. - if newInterval := variable.TopSQLVariable.ReportIntervalSeconds.Load(); newInterval != currentReportInterval { + if newInterval := ReportIntervalSeconds.Load(); newInterval != currentReportInterval { currentReportInterval = newInterval reportTicker.Reset(time.Second * time.Duration(currentReportInterval)) } @@ -341,6 +400,43 @@ func (tsr *RemoteTopSQLReporter) collectWorker() { } } +// acceptDataSinkRegs accepts datasink registrations +func (tsr *RemoteTopSQLReporter) acceptDataSinkRegs() { + for { + select { + case dataSink := <-tsr.dataSinkRegCh: + tsr.dataSinks = append(tsr.dataSinks, dataSink) + default: + return + } + } +} + +// removeDownDataSinks removes all down dataSinks +func (tsr *RemoteTopSQLReporter) removeDownDataSinks() { + idx := 0 + for _, dataSink := range tsr.dataSinks { + if dataSink.IsDown() { + dataSink.Close() + continue + } + tsr.dataSinks[idx] = dataSink + idx++ + } + tsr.dataSinks = tsr.dataSinks[:idx] +} + +// activeDataSinkCnt gets the count of active datasinks +func (tsr *RemoteTopSQLReporter) activeDataSinkCnt() int { + pendingCnt := 0 + for _, dataSink := range tsr.dataSinks { + if dataSink.IsPaused() { + pendingCnt += 1 + } + } + return len(tsr.dataSinks) - pendingCnt +} + func encodeKey(buf *bytes.Buffer, sqlDigest, planDigest []byte) string { buf.Reset() buf.Write(sqlDigest) @@ -349,7 +445,7 @@ func encodeKey(buf *bytes.Buffer, sqlDigest, planDigest []byte) string { } func getTopNRecords(records []tracecpu.SQLCPUTimeRecord) (topN, shouldEvict []tracecpu.SQLCPUTimeRecord) { - maxStmt := int(variable.TopSQLVariable.MaxStatementCount.Load()) + maxStmt := int(MaxStatementCount.Load()) if len(records) <= maxStmt { return records, nil } @@ -361,7 +457,7 @@ func getTopNRecords(records []tracecpu.SQLCPUTimeRecord) (topN, shouldEvict []tr } func getTopNDataPoints(records []*dataPoints) (topN, shouldEvict []*dataPoints) { - maxStmt := int(variable.TopSQLVariable.MaxStatementCount.Load()) + maxStmt := int(MaxStatementCount.Load()) if len(records) <= maxStmt { return records, nil } @@ -383,7 +479,7 @@ func (tsr *RemoteTopSQLReporter) doCollect( records, evicted = getTopNRecords(records) keyBuf := bytes.NewBuffer(make([]byte, 0, 64)) - listCapacity := int(variable.TopSQLVariable.ReportIntervalSeconds.Load()/variable.TopSQLVariable.PrecisionSeconds.Load() + 1) + listCapacity := int(ReportIntervalSeconds.Load()/tracecpu.PrecisionSeconds.Load() + 1) if listCapacity < 1 { listCapacity = 1 } @@ -464,31 +560,58 @@ type collectedData struct { normalizedPlanMap *sync.Map } -// reportData contains data that reporter sends to the agent -type reportData struct { - // collectedData contains the topN collected records and the `others` record which aggregation all records that is out of Top N. - collectedData []*dataPoints - normalizedSQLMap *sync.Map - normalizedPlanMap *sync.Map +// ReportData contains data that reporter sends to the agent +type ReportData struct { + // CPUTimeRecords contains the topN collected records and the `others` record which aggregation all records that is out of Top N. + CPUTimeRecords []*tipb.CPUTimeRecord + SQLMetas []*tipb.SQLMeta + PlanMetas []*tipb.PlanMeta } -func (d *reportData) hasData() bool { - if len(d.collectedData) > 0 { - return true +func (d *ReportData) hasData() bool { + return len(d.CPUTimeRecords) != 0 || len(d.SQLMetas) != 0 || len(d.PlanMetas) != 0 +} + +func buildReportData(records []*dataPoints, sqlMap *sync.Map, planMap *sync.Map, decodePlan planBinaryDecodeFunc) ReportData { + res := ReportData{ + CPUTimeRecords: make([]*tipb.CPUTimeRecord, 0, len(records)), + SQLMetas: make([]*tipb.SQLMeta, 0, len(records)), + PlanMetas: make([]*tipb.PlanMeta, 0, len(records)), } - cnt := 0 - d.normalizedSQLMap.Range(func(key, value interface{}) bool { - cnt++ - return false + + for _, record := range records { + res.CPUTimeRecords = append(res.CPUTimeRecords, &tipb.CPUTimeRecord{ + RecordListTimestampSec: record.TimestampList, + RecordListCpuTimeMs: record.CPUTimeMsList, + SqlDigest: record.SQLDigest, + PlanDigest: record.PlanDigest, + }) + } + + sqlMap.Range(func(key, value interface{}) bool { + meta := value.(SQLMeta) + res.SQLMetas = append(res.SQLMetas, &tipb.SQLMeta{ + SqlDigest: []byte(key.(string)), + NormalizedSql: meta.normalizedSQL, + IsInternalSql: meta.isInternal, + }) + return true }) - if cnt > 0 { + + planMap.Range(func(key, value interface{}) bool { + planDecoded, errDecode := decodePlan(value.(string)) + if errDecode != nil { + logutil.BgLogger().Warn("[top-sql] decode plan failed", zap.Error(errDecode)) + return true + } + res.PlanMetas = append(res.PlanMetas, &tipb.PlanMeta{ + PlanDigest: []byte(key.(string)), + NormalizedPlan: planDecoded, + }) return true - } - d.normalizedPlanMap.Range(func(key, value interface{}) bool { - cnt++ - return false }) - return cnt > 0 + + return res } // reportWorker sends data to the gRPC endpoint from the `reportCollectedDataChan` one by one. @@ -510,13 +633,22 @@ func (tsr *RemoteTopSQLReporter) reportWorker() { } } -// getReportData gets reportData from the collectedData. +// getReportData gets ReportData from the collectedData. // This function will calculate the topN collected records and the `others` record which aggregation all records that is out of Top N. -func (tsr *RemoteTopSQLReporter) getReportData(collected collectedData) reportData { +func (tsr *RemoteTopSQLReporter) getReportData(collected collectedData) ReportData { + records, sqlMap, planMap := getTopN(collected) + return buildReportData(records, sqlMap, planMap, tsr.decodePlan) +} + +func getTopN(collected collectedData) (records []*dataPoints, sqlMap *sync.Map, planMap *sync.Map) { // Fetch TopN dataPoints. others := collected.records[keyOthers] delete(collected.records, keyOthers) - records := make([]*dataPoints, 0, len(collected.records)) + + records = make([]*dataPoints, 0, len(collected.records)) + sqlMap = collected.normalizedSQLMap + planMap = collected.normalizedPlanMap + for _, v := range collected.records { records = append(records, v) } @@ -539,38 +671,46 @@ func (tsr *RemoteTopSQLReporter) getReportData(collected collectedData) reportDa records = append(records, others) } - return reportData{ - collectedData: records, - normalizedSQLMap: collected.normalizedSQLMap, - normalizedPlanMap: collected.normalizedPlanMap, - } + return } -func (tsr *RemoteTopSQLReporter) doReport(data reportData) { +func (tsr *RemoteTopSQLReporter) doReport(data ReportData) { defer util.Recover("top-sql", "doReport", nil, false) if !data.hasData() { return } - agentAddr := config.GetGlobalConfig().TopSQL.ReceiverAddress timeout := reportTimeout failpoint.Inject("resetTimeoutForTest", func(val failpoint.Value) { if val.(bool) { - interval := time.Duration(variable.TopSQLVariable.ReportIntervalSeconds.Load()) * time.Second + interval := time.Duration(ReportIntervalSeconds.Load()) * time.Second if interval < timeout { timeout = interval } } }) - ctx, cancel := context.WithTimeout(tsr.ctx, timeout) - start := time.Now() - err := tsr.client.Send(ctx, agentAddr, data) - if err != nil { - logutil.BgLogger().Warn("[top-sql] client failed to send data", zap.Error(err)) - reportAllDurationFailedHistogram.Observe(time.Since(start).Seconds()) - } else { - reportAllDurationSuccHistogram.Observe(time.Since(start).Seconds()) + + deadline := time.Now().Add(timeout) + for i := range tsr.dataSinks { + tsr.dataSinks[i].Send(data, deadline) + } +} + +var _ DataSinkRegHandle = &RemoteDataSinkRegHandle{} + +// RemoteDataSinkRegHandle is used to receive DataSink registrations. +type RemoteDataSinkRegHandle struct { + registerCh chan DataSink + reporterDoneCh <-chan struct{} +} + +// Register implements DataSinkRegHandle interface. +func (r *RemoteDataSinkRegHandle) Register(dataSink DataSink) { + select { + case r.registerCh <- dataSink: + case <-r.reporterDoneCh: + logutil.BgLogger().Warn("[top-sql] failed to register datasink due to the reporter is down") + dataSink.Close() } - cancel() } diff --git a/util/topsql/reporter/reporter_test.go b/util/topsql/reporter/reporter_test.go index d4be70450e596..01f547a41b7b4 100644 --- a/util/topsql/reporter/reporter_test.go +++ b/util/topsql/reporter/reporter_test.go @@ -22,9 +22,8 @@ import ( "time" "github.com/pingcap/tidb/config" - "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/util/topsql/reporter/mock" - "github.com/pingcap/tidb/util/topsql/tracecpu" + "github.com/pingcap/tidb/util/tracecpu" "github.com/pingcap/tipb/go-tipb" "github.com/stretchr/testify/require" ) @@ -65,14 +64,16 @@ func mockPlanBinaryDecoderFunc(plan string) (string, error) { } func setupRemoteTopSQLReporter(maxStatementsNum, interval int, addr string) *RemoteTopSQLReporter { - variable.TopSQLVariable.MaxStatementCount.Store(int64(maxStatementsNum)) - variable.TopSQLVariable.ReportIntervalSeconds.Store(int64(interval)) + MaxStatementCount.Store(int64(maxStatementsNum)) + ReportIntervalSeconds.Store(int64(interval)) config.UpdateGlobal(func(conf *config.Config) { conf.TopSQL.ReceiverAddress = addr }) - rc := NewGRPCReportClient(mockPlanBinaryDecoderFunc) - ts := NewRemoteTopSQLReporter(rc) + ts := NewRemoteTopSQLReporter(mockPlanBinaryDecoderFunc) + ds := NewSingleTargetDataSink() + ts.DataSinkRegHandle().Register(ds) + return ts } @@ -83,7 +84,7 @@ func initializeCache(maxStatementsNum, interval int, addr string) *RemoteTopSQLR } func TestCollectAndSendBatch(t *testing.T) { - agentServer, err := mock.StartMockAgentServer() + agentServer, err := mock.StartMockReceiverServer() require.NoError(t, err) defer agentServer.Stop() @@ -122,7 +123,7 @@ func TestCollectAndSendBatch(t *testing.T) { } func TestCollectAndEvicted(t *testing.T) { - agentServer, err := mock.StartMockAgentServer() + agentServer, err := mock.StartMockReceiverServer() require.NoError(t, err) defer agentServer.Stop() @@ -187,7 +188,7 @@ func collectAndWait(tsr *RemoteTopSQLReporter, timestamp uint64, records []trace } func TestCollectAndTopN(t *testing.T) { - agentServer, err := mock.StartMockAgentServer() + agentServer, err := mock.StartMockReceiverServer() require.NoError(t, err) defer agentServer.Stop() @@ -281,7 +282,7 @@ func TestCollectCapacity(t *testing.T) { return records } - variable.TopSQLVariable.MaxCollect.Store(10000) + MaxCollect.Store(10000) registerSQL(5000) require.Equal(t, int64(5000), tsr.sqlMapLength.Load()) registerPlan(1000) @@ -292,13 +293,13 @@ func TestCollectCapacity(t *testing.T) { registerPlan(20000) require.Equal(t, int64(10000), tsr.planMapLength.Load()) - variable.TopSQLVariable.MaxCollect.Store(20000) + MaxCollect.Store(20000) registerSQL(50000) require.Equal(t, int64(20000), tsr.sqlMapLength.Load()) registerPlan(50000) require.Equal(t, int64(20000), tsr.planMapLength.Load()) - variable.TopSQLVariable.MaxStatementCount.Store(5000) + MaxStatementCount.Store(5000) collectedData := make(map[string]*dataPoints) tsr.doCollect(collectedData, 1, genRecord(20000)) require.Equal(t, 5001, len(collectedData)) @@ -389,7 +390,7 @@ func TestDataPoints(t *testing.T) { } func TestCollectInternal(t *testing.T) { - agentServer, err := mock.StartMockAgentServer() + agentServer, err := mock.StartMockReceiverServer() require.NoError(t, err) defer agentServer.Stop() @@ -425,6 +426,8 @@ func TestCollectInternal(t *testing.T) { func BenchmarkTopSQL_CollectAndIncrementFrequency(b *testing.B) { tsr := initializeCache(maxSQLNum, 120, ":23333") + defer tsr.Close() + for i := 0; i < b.N; i++ { populateCache(tsr, 0, maxSQLNum, uint64(i)) } @@ -432,6 +435,8 @@ func BenchmarkTopSQL_CollectAndIncrementFrequency(b *testing.B) { func BenchmarkTopSQL_CollectAndEvict(b *testing.B) { tsr := initializeCache(maxSQLNum, 120, ":23333") + defer tsr.Close() + begin := 0 end := maxSQLNum for i := 0; i < b.N; i++ { diff --git a/util/topsql/reporter/single_target.go b/util/topsql/reporter/single_target.go new file mode 100644 index 0000000000000..ac551a108fc39 --- /dev/null +++ b/util/topsql/reporter/single_target.go @@ -0,0 +1,289 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package reporter + +import ( + "context" + "math" + "sync" + "time" + + "github.com/pingcap/tidb/config" + "github.com/pingcap/tidb/util" + "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tipb/go-tipb" + "go.uber.org/zap" + "google.golang.org/grpc" + "google.golang.org/grpc/backoff" +) + +// SingleTargetDataSink reports data to grpc servers. +type SingleTargetDataSink struct { + curRPCAddr string + conn *grpc.ClientConn + sendTaskCh chan sendTask +} + +type sendTask struct { + data ReportData + deadline time.Time +} + +// NewSingleTargetDataSink returns a new SingleTargetDataSink +// +func NewSingleTargetDataSink() *SingleTargetDataSink { + dataSink := &SingleTargetDataSink{ + sendTaskCh: make(chan sendTask, 1), + } + + go util.WithRecovery(dataSink.run, nil) + return dataSink +} + +func (r *SingleTargetDataSink) run() { + for task := range r.sendTaskCh { + targetRPCAddr := config.GetGlobalConfig().TopSQL.ReceiverAddress + if targetRPCAddr == "" { + continue + } + + ctx, cancel := context.WithDeadline(context.Background(), task.deadline) + start := time.Now() + err := r.doSend(ctx, targetRPCAddr, task.data) + cancel() + if err != nil { + logutil.BgLogger().Warn("[top-sql] single target data sink failed to send data to receiver", zap.Error(err)) + reportAllDurationFailedHistogram.Observe(time.Since(start).Seconds()) + } else { + reportAllDurationSuccHistogram.Observe(time.Since(start).Seconds()) + } + } +} + +var _ DataSink = &SingleTargetDataSink{} + +// Send implements the DataSink interface. +func (r *SingleTargetDataSink) Send(data ReportData, deadline time.Time) { + select { + case r.sendTaskCh <- sendTask{data: data, deadline: deadline}: + // sent successfully + default: + ignoreReportChannelFullCounter.Inc() + logutil.BgLogger().Warn("[top-sql] report channel is full") + } +} + +// Currently the doSend will establish a new connection every time, which is suitable for a per-minute sending period +func (r *SingleTargetDataSink) doSend(ctx context.Context, addr string, data ReportData) (err error) { + err = r.tryEstablishConnection(ctx, addr) + if err != nil { + return + } + + var wg sync.WaitGroup + errCh := make(chan error, 3) + wg.Add(3) + + go func() { + defer wg.Done() + errCh <- r.sendBatchSQLMeta(ctx, data.SQLMetas) + }() + go func() { + defer wg.Done() + errCh <- r.sendBatchPlanMeta(ctx, data.PlanMetas) + }() + go func() { + defer wg.Done() + errCh <- r.sendBatchCPUTimeRecord(ctx, data.CPUTimeRecords) + }() + wg.Wait() + close(errCh) + for err = range errCh { + if err != nil { + return + } + } + + return +} + +// IsPaused implements DataSink interface. +func (r *SingleTargetDataSink) IsPaused() bool { + return len(config.GetGlobalConfig().TopSQL.ReceiverAddress) == 0 +} + +// IsDown implements DataSink interface. +func (r *SingleTargetDataSink) IsDown() bool { + return false +} + +// Close uses to close grpc connection. +func (r *SingleTargetDataSink) Close() { + close(r.sendTaskCh) + if r.conn == nil { + return + } + err := r.conn.Close() + if err != nil { + logutil.BgLogger().Warn("[top-sql] single target data sink failed to close connection", zap.Error(err)) + } + r.conn = nil +} + +// sendBatchCPUTimeRecord sends a batch of TopSQL records by stream. +func (r *SingleTargetDataSink) sendBatchCPUTimeRecord(ctx context.Context, records []*tipb.CPUTimeRecord) (err error) { + if len(records) == 0 { + return + } + + start := time.Now() + sentCount := 0 + defer func() { + topSQLReportRecordCounterHistogram.Observe(float64(sentCount)) + if err != nil { + reportRecordDurationFailedHistogram.Observe(time.Since(start).Seconds()) + } + reportRecordDurationSuccHistogram.Observe(time.Since(start).Seconds()) + }() + + client := tipb.NewTopSQLAgentClient(r.conn) + stream, err := client.ReportCPUTimeRecords(ctx) + if err != nil { + return err + } + for _, record := range records { + if err = stream.Send(record); err != nil { + return + } + sentCount += 1 + } + + // See https://pkg.go.dev/google.golang.org/grpc#ClientConn.NewStream for how to avoid leaking the stream + _, err = stream.CloseAndRecv() + return +} + +// sendBatchSQLMeta sends a batch of SQL metas by stream. +func (r *SingleTargetDataSink) sendBatchSQLMeta(ctx context.Context, sqlMetas []*tipb.SQLMeta) (err error) { + if len(sqlMetas) == 0 { + return + } + + start := time.Now() + sentCount := 0 + defer func() { + topSQLReportSQLCountHistogram.Observe(float64(sentCount)) + + if err != nil { + reportSQLDurationFailedHistogram.Observe(time.Since(start).Seconds()) + } + reportSQLDurationSuccHistogram.Observe(time.Since(start).Seconds()) + }() + + client := tipb.NewTopSQLAgentClient(r.conn) + stream, err := client.ReportSQLMeta(ctx) + if err != nil { + return err + } + + for _, meta := range sqlMetas { + if err = stream.Send(meta); err != nil { + return + } + sentCount += 1 + } + + // See https://pkg.go.dev/google.golang.org/grpc#ClientConn.NewStream for how to avoid leaking the stream + _, err = stream.CloseAndRecv() + return +} + +// sendBatchPlanMeta sends a batch of SQL metas by stream. +func (r *SingleTargetDataSink) sendBatchPlanMeta(ctx context.Context, planMetas []*tipb.PlanMeta) (err error) { + if len(planMetas) == 0 { + return nil + } + + start := time.Now() + sentCount := 0 + defer func() { + topSQLReportPlanCountHistogram.Observe(float64(sentCount)) + if err != nil { + reportPlanDurationFailedHistogram.Observe(time.Since(start).Seconds()) + } + reportPlanDurationSuccHistogram.Observe(time.Since(start).Seconds()) + }() + + client := tipb.NewTopSQLAgentClient(r.conn) + stream, err := client.ReportPlanMeta(ctx) + if err != nil { + return err + } + + for _, meta := range planMetas { + if err = stream.Send(meta); err != nil { + return err + } + sentCount += 1 + } + + // See https://pkg.go.dev/google.golang.org/grpc#ClientConn.NewStream for how to avoid leaking the stream + _, err = stream.CloseAndRecv() + return +} + +// tryEstablishConnection establishes the gRPC connection if connection is not established. +func (r *SingleTargetDataSink) tryEstablishConnection(ctx context.Context, targetRPCAddr string) (err error) { + if r.curRPCAddr == targetRPCAddr && r.conn != nil { + // Address is not changed, skip. + return nil + } + + if r.conn != nil { + err := r.conn.Close() + logutil.BgLogger().Warn("[top-sql] single target data sink failed to close connection", zap.Error(err)) + } + + r.conn, err = r.dial(ctx, targetRPCAddr) + if err != nil { + return err + } + r.curRPCAddr = targetRPCAddr + return nil +} + +func (r *SingleTargetDataSink) dial(ctx context.Context, targetRPCAddr string) (*grpc.ClientConn, error) { + dialCtx, cancel := context.WithTimeout(ctx, dialTimeout) + defer cancel() + return grpc.DialContext( + dialCtx, + targetRPCAddr, + grpc.WithBlock(), + grpc.WithInsecure(), + grpc.WithInitialWindowSize(grpcInitialWindowSize), + grpc.WithInitialConnWindowSize(grpcInitialConnWindowSize), + grpc.WithDefaultCallOptions( + grpc.MaxCallRecvMsgSize(math.MaxInt64), + ), + grpc.WithConnectParams(grpc.ConnectParams{ + Backoff: backoff.Config{ + BaseDelay: 100 * time.Millisecond, // Default was 1s. + Multiplier: 1.6, // Default + Jitter: 0.2, // Default + MaxDelay: 3 * time.Second, // Default was 120s. + }, + }), + ) +} diff --git a/util/topsql/topsql.go b/util/topsql/topsql.go index 2c227b4fd76d3..0d77547478a19 100644 --- a/util/topsql/topsql.go +++ b/util/topsql/topsql.go @@ -23,10 +23,11 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/tidb/parser" "github.com/pingcap/tidb/util/logutil" - "github.com/pingcap/tidb/util/plancodec" "github.com/pingcap/tidb/util/topsql/reporter" - "github.com/pingcap/tidb/util/topsql/tracecpu" + "github.com/pingcap/tidb/util/tracecpu" + "github.com/pingcap/tipb/go-tipb" "go.uber.org/zap" + "google.golang.org/grpc" ) const ( @@ -39,11 +40,31 @@ const ( var globalTopSQLReport reporter.TopSQLReporter // SetupTopSQL sets up the top-sql worker. -func SetupTopSQL() { - rc := reporter.NewGRPCReportClient(plancodec.DecodeNormalizedPlan) - globalTopSQLReport = reporter.NewRemoteTopSQLReporter(rc) +func SetupTopSQL(report reporter.TopSQLReporter) { + globalTopSQLReport = report + tracecpu.GlobalSQLCPUProfiler.SetCollector(globalTopSQLReport) tracecpu.GlobalSQLCPUProfiler.Run() + + // register single target datasink to reporter + singleTargetDataSink := reporter.NewSingleTargetDataSink() + globalTopSQLReport.DataSinkRegHandle().Register(singleTargetDataSink) +} + +// InstanceEnabled is used to check if TopSQL is enabled on the current instance. +func InstanceEnabled() bool { + if globalTopSQLReport == nil { + return false + } + return !globalTopSQLReport.IsPaused() +} + +// RegisterPubSubServer registers TopSQLPubSubService to the given gRPC server. +func RegisterPubSubServer(s *grpc.Server) { + if globalTopSQLReport != nil { + service := reporter.NewTopSQLPubSubService(globalTopSQLReport.DataSinkRegHandle()) + tipb.RegisterTopSQLPubSubServer(s, service) + } } // Close uses to close and release the top sql resource. diff --git a/util/topsql/topsql_test.go b/util/topsql/topsql_test.go index 9bb9968aea52b..305056b56b9e8 100644 --- a/util/topsql/topsql_test.go +++ b/util/topsql/topsql_test.go @@ -23,13 +23,15 @@ import ( "github.com/google/pprof/profile" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/parser" - "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/util/topsql" "github.com/pingcap/tidb/util/topsql/reporter" mockServer "github.com/pingcap/tidb/util/topsql/reporter/mock" - "github.com/pingcap/tidb/util/topsql/tracecpu" "github.com/pingcap/tidb/util/topsql/tracecpu/mock" + "github.com/pingcap/tidb/util/tracecpu" + "github.com/pingcap/tipb/go-tipb" "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/keepalive" ) type collectorWrapper struct { @@ -38,7 +40,10 @@ type collectorWrapper struct { func TestTopSQLCPUProfile(t *testing.T) { collector := mock.NewTopSQLCollector() - tracecpu.GlobalSQLCPUProfiler.SetCollector(&collectorWrapper{collector}) + report := &collectorWrapper{collector} + topsql.SetupTopSQL(report) + defer topsql.Close() + reqs := []struct { sql string plan string @@ -83,43 +88,23 @@ func TestTopSQLCPUProfile(t *testing.T) { } } -func TestIsEnabled(t *testing.T) { - setTopSQLEnable(false) - require.False(t, tracecpu.GlobalSQLCPUProfiler.IsEnabled()) - - setTopSQLEnable(true) - err := tracecpu.StartCPUProfile(bytes.NewBuffer(nil)) - require.NoError(t, err) - require.True(t, tracecpu.GlobalSQLCPUProfiler.IsEnabled()) - setTopSQLEnable(false) - require.True(t, tracecpu.GlobalSQLCPUProfiler.IsEnabled()) - err = tracecpu.StopCPUProfile() - require.NoError(t, err) - - setTopSQLEnable(false) - require.False(t, tracecpu.GlobalSQLCPUProfiler.IsEnabled()) - setTopSQLEnable(true) - require.True(t, tracecpu.GlobalSQLCPUProfiler.IsEnabled()) -} - func mockPlanBinaryDecoderFunc(plan string) (string, error) { return plan, nil } func TestTopSQLReporter(t *testing.T) { - server, err := mockServer.StartMockAgentServer() + server, err := mockServer.StartMockReceiverServer() require.NoError(t, err) - variable.TopSQLVariable.MaxStatementCount.Store(200) - variable.TopSQLVariable.ReportIntervalSeconds.Store(1) + reporter.MaxStatementCount.Store(200) + reporter.ReportIntervalSeconds.Store(1) config.UpdateGlobal(func(conf *config.Config) { conf.TopSQL.ReceiverAddress = server.Address() }) - client := reporter.NewGRPCReportClient(mockPlanBinaryDecoderFunc) - report := reporter.NewRemoteTopSQLReporter(client) - defer report.Close() + report := &collectorWrapper{reporter.NewRemoteTopSQLReporter(mockPlanBinaryDecoderFunc)} + topsql.SetupTopSQL(report) + defer topsql.Close() - tracecpu.GlobalSQLCPUProfiler.SetCollector(&collectorWrapper{report}) reqs := []struct { sql string plan string @@ -176,9 +161,163 @@ func TestTopSQLReporter(t *testing.T) { require.Equal(t, 2, len(checkSQLPlanMap)) } +func TestTopSQLPubSub(t *testing.T) { + reporter.MaxStatementCount.Store(200) + reporter.ReportIntervalSeconds.Store(1) + + report := reporter.NewRemoteTopSQLReporter(mockPlanBinaryDecoderFunc) + topsql.SetupTopSQL(report) + defer topsql.Close() + + server, err := mockServer.NewMockPubSubServer() + require.NoError(t, err) + + topsql.RegisterPubSubServer(server.Server()) + go server.Serve() + defer server.Stop() + + conn, err := grpc.Dial( + server.Address(), + grpc.WithBlock(), + grpc.WithInsecure(), + grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: 10 * time.Second, + Timeout: 3 * time.Second, + }), + ) + require.NoError(t, err) + defer conn.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + client := tipb.NewTopSQLPubSubClient(conn) + stream, err := client.Subscribe(ctx, &tipb.TopSQLSubRequest{}) + require.NoError(t, err) + + reqs := []struct { + sql string + plan string + }{ + {"select * from t where a=?", "point-get"}, + {"select * from t where a>?", "table-scan"}, + {"insert into t values (?)", ""}, + } + + closeCh := make(chan struct{}) + defer close(closeCh) + + digest2sql := make(map[string]string) + sql2plan := make(map[string]string) + for _, req := range reqs { + sql2plan[req.sql] = req.plan + sqlDigest := mock.GenSQLDigest(req.sql) + digest2sql[string(sqlDigest.Bytes())] = req.sql + + go func(sql, plan string) { + for { + select { + case <-closeCh: + return + default: + mockExecuteSQL(sql, plan) + } + } + }(req.sql, req.plan) + } + + sqlMetas := make(map[string]*tipb.SQLMeta) + planMetas := make(map[string]string) + records := make(map[string]*tipb.CPUTimeRecord) + + for { + r, err := stream.Recv() + if err != nil { + break + } + + if r.GetRecord() != nil { + rec := r.GetRecord() + if _, ok := records[string(rec.SqlDigest)]; !ok { + records[string(rec.SqlDigest)] = rec + } else { + cpu := records[string(rec.SqlDigest)] + if rec.PlanDigest != nil { + cpu.PlanDigest = rec.PlanDigest + } + cpu.RecordListTimestampSec = append(cpu.RecordListTimestampSec, rec.RecordListTimestampSec...) + cpu.RecordListCpuTimeMs = append(cpu.RecordListCpuTimeMs, rec.RecordListCpuTimeMs...) + } + } else if r.GetSqlMeta() != nil { + sql := r.GetSqlMeta() + if _, ok := sqlMetas[string(sql.SqlDigest)]; !ok { + sqlMetas[string(sql.SqlDigest)] = sql + } + } else if r.GetPlanMeta() != nil { + plan := r.GetPlanMeta() + if _, ok := planMetas[string(plan.PlanDigest)]; !ok { + planMetas[string(plan.PlanDigest)] = plan.NormalizedPlan + } + } + } + + checkSQLPlanMap := map[string]struct{}{} + for i := range records { + record := records[i] + require.Greater(t, len(record.RecordListCpuTimeMs), 0) + require.Greater(t, record.RecordListCpuTimeMs[0], uint32(0)) + sqlMeta, exist := sqlMetas[string(record.SqlDigest)] + require.True(t, exist) + expectedNormalizedSQL, exist := digest2sql[string(record.SqlDigest)] + require.True(t, exist) + require.Equal(t, expectedNormalizedSQL, sqlMeta.NormalizedSql) + + expectedNormalizedPlan := sql2plan[expectedNormalizedSQL] + if expectedNormalizedPlan == "" || len(record.PlanDigest) == 0 { + require.Equal(t, len(record.PlanDigest), 0) + continue + } + normalizedPlan, exist := planMetas[string(record.PlanDigest)] + require.True(t, exist) + require.Equal(t, expectedNormalizedPlan, normalizedPlan) + checkSQLPlanMap[expectedNormalizedSQL] = struct{}{} + } + require.Equal(t, len(checkSQLPlanMap), 2) +} + +func TestTopSQLPubSubReporterStopBeforePubSub(t *testing.T) { + report := reporter.NewRemoteTopSQLReporter(mockPlanBinaryDecoderFunc) + topsql.SetupTopSQL(report) + + server, err := mockServer.NewMockPubSubServer() + require.NoError(t, err) + + topsql.RegisterPubSubServer(server.Server()) + go server.Serve() + defer server.Stop() + + // stop topsql first + topsql.Close() + + // try to subscribe + conn, err := grpc.Dial(server.Address(), grpc.WithBlock(), grpc.WithInsecure()) + require.NoError(t, err) + defer conn.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + client := tipb.NewTopSQLPubSubClient(conn) + stream, err := client.Subscribe(ctx, &tipb.TopSQLSubRequest{}) + require.NoError(t, err) + + _, err = stream.Recv() + require.NotNil(t, err) +} + func TestMaxSQLAndPlanTest(t *testing.T) { collector := mock.NewTopSQLCollector() - tracecpu.GlobalSQLCPUProfiler.SetCollector(&collectorWrapper{collector}) + report := &collectorWrapper{collector} + topsql.SetupTopSQL(report) + defer topsql.Close() ctx := context.Background() @@ -209,10 +348,6 @@ func TestMaxSQLAndPlanTest(t *testing.T) { require.Empty(t, cPlan) } -func setTopSQLEnable(enabled bool) { - variable.TopSQLVariable.Enable.Store(enabled) -} - func mockExecuteSQL(sql, plan string) { ctx := context.Background() sqlDigest := mock.GenSQLDigest(sql) diff --git a/util/topsql/tracecpu/main_test.go b/util/topsql/tracecpu/main_test.go deleted file mode 100644 index 74352d78d7419..0000000000000 --- a/util/topsql/tracecpu/main_test.go +++ /dev/null @@ -1,60 +0,0 @@ -// Copyright 2021 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package tracecpu_test - -import ( - "bytes" - "testing" - - "github.com/google/pprof/profile" - "github.com/pingcap/tidb/config" - "github.com/pingcap/tidb/sessionctx/variable" - "github.com/pingcap/tidb/util/testbridge" - "github.com/pingcap/tidb/util/topsql/tracecpu" - "github.com/stretchr/testify/require" - "go.uber.org/goleak" -) - -func TestMain(m *testing.M) { - testbridge.WorkaroundGoCheckFlags() - - variable.TopSQLVariable.Enable.Store(false) - config.UpdateGlobal(func(conf *config.Config) { - conf.TopSQL.ReceiverAddress = "mock" - }) - variable.TopSQLVariable.PrecisionSeconds.Store(1) - tracecpu.GlobalSQLCPUProfiler.Run() - - opts := []goleak.Option{ - goleak.IgnoreTopFunction("time.Sleep"), - goleak.IgnoreTopFunction("runtime/pprof.readProfile"), - goleak.IgnoreTopFunction("internal/poll.runtime_pollWait"), - goleak.IgnoreTopFunction("github.com/pingcap/tidb/util/topsql/tracecpu.(*sqlCPUProfiler).startAnalyzeProfileWorker"), - } - - goleak.VerifyTestMain(m, opts...) -} - -func TestPProfCPUProfile(t *testing.T) { - buf := bytes.NewBuffer(nil) - err := tracecpu.StartCPUProfile(buf) - require.NoError(t, err) - // enable top sql. - variable.TopSQLVariable.Enable.Store(true) - err = tracecpu.StopCPUProfile() - require.NoError(t, err) - _, err = profile.Parse(buf) - require.NoError(t, err) -} diff --git a/util/tracecpu/main_test.go b/util/tracecpu/main_test.go new file mode 100644 index 0000000000000..84070228d7bce --- /dev/null +++ b/util/tracecpu/main_test.go @@ -0,0 +1,27 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tracecpu + +import ( + "testing" + + "github.com/pingcap/tidb/util/testbridge" + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + testbridge.WorkaroundGoCheckFlags() + goleak.VerifyTestMain(m) +} diff --git a/util/topsql/tracecpu/mock/mock.go b/util/tracecpu/mock/mock.go similarity index 77% rename from util/topsql/tracecpu/mock/mock.go rename to util/tracecpu/mock/mock.go index 62125d202f67c..97fc981efddf0 100644 --- a/util/topsql/tracecpu/mock/mock.go +++ b/util/tracecpu/mock/mock.go @@ -22,7 +22,8 @@ import ( "github.com/pingcap/tidb/parser" "github.com/pingcap/tidb/util/hack" "github.com/pingcap/tidb/util/logutil" - "github.com/pingcap/tidb/util/topsql/tracecpu" + "github.com/pingcap/tidb/util/topsql/reporter" + "github.com/pingcap/tidb/util/tracecpu" "go.uber.org/atomic" "go.uber.org/zap" ) @@ -48,6 +49,8 @@ func NewTopSQLCollector() *TopSQLCollector { } } +var _ reporter.TopSQLReporter = &TopSQLCollector{} + // Collect uses for testing. func (c *TopSQLCollector) Collect(ts uint64, stats []tracecpu.SQLCPUTimeRecord) { defer c.collectCnt.Inc() @@ -73,6 +76,11 @@ func (c *TopSQLCollector) Collect(ts uint64, stats []tracecpu.SQLCPUTimeRecord) } } +// IsPaused implements tracecpu.Collector +func (c *TopSQLCollector) IsPaused() bool { + return false +} + // GetSQLStatsBySQLWithRetry uses for testing. func (c *TopSQLCollector) GetSQLStatsBySQLWithRetry(sql string, planIsNotNull bool) []*tracecpu.SQLCPUTimeRecord { after := time.After(time.Second * 10) @@ -168,6 +176,11 @@ func (c *TopSQLCollector) WaitCollectCnt(count int64) { } } +// DataSinkRegHandle implements the interface. +func (c *TopSQLCollector) DataSinkRegHandle() reporter.DataSinkRegHandle { + return &noopRegHandle{} +} + // Close implements the interface. func (c *TopSQLCollector) Close() {} @@ -180,3 +193,37 @@ func GenSQLDigest(sql string) *parser.Digest { _, digest := parser.NormalizeDigest(sql) return digest } + +type noopRegHandle struct{} + +// Register implements reporter.DataSinkRegHandle +func (noopRegHandle) Register(dataSink reporter.DataSink) { + dataSink.Close() +} + +var _ reporter.DataSinkRegHandle = &noopRegHandle{} + +// ProfileController is a mock collector only for controlling profile. +type ProfileController struct { + paused *atomic.Bool +} + +// NewProfileController creates a new ProfileController +func NewProfileController(enabled bool) *ProfileController { + return &ProfileController{paused: atomic.NewBool(!enabled)} +} + +var _ tracecpu.Collector = &ProfileController{} + +// Collect implements tracecpu.Collector +func (*ProfileController) Collect(_ts uint64, _stats []tracecpu.SQLCPUTimeRecord) {} + +// IsPaused implements tracecpu.Collector +func (c *ProfileController) IsPaused() bool { + return c.paused.Load() +} + +// SetEnabled sets enabled +func (c *ProfileController) SetEnabled(v bool) { + c.paused.Store(!v) +} diff --git a/util/topsql/tracecpu/profile.go b/util/tracecpu/profile.go similarity index 89% rename from util/topsql/tracecpu/profile.go rename to util/tracecpu/profile.go index b08569d147eec..23412a0c7776a 100644 --- a/util/topsql/tracecpu/profile.go +++ b/util/tracecpu/profile.go @@ -24,14 +24,14 @@ import ( "runtime/pprof" "strconv" "sync" - "sync/atomic" "time" "github.com/google/pprof/profile" - "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/hack" "github.com/pingcap/tidb/util/logutil" + "go.uber.org/atomic" "go.uber.org/zap" ) @@ -39,16 +39,34 @@ const ( labelSQL = "sql" labelSQLDigest = "sql_digest" labelPlanDigest = "plan_digest" + + // DefPrecisionSeconds indicates that the default PrecisionSeconds is 1s + DefPrecisionSeconds = 1 + + // DefEnablePProfSQLCPU indicates that the default EnablePProfSQLCPU is false + DefEnablePProfSQLCPU = 0 ) -// GlobalSQLCPUProfiler is the global SQL stats profiler. -var GlobalSQLCPUProfiler = newSQLCPUProfiler() +var ( + // GlobalSQLCPUProfiler is the global SQL stats profiler. + GlobalSQLCPUProfiler = newSQLCPUProfiler() + + // EnablePProfSQLCPU is true means want to keep the `sql` label. Otherwise, remove the `sql` label. + EnablePProfSQLCPU = atomic.NewBool(false) + + // PrecisionSeconds indicates profile interval. + PrecisionSeconds = atomic.NewInt64(DefPrecisionSeconds) +) // Collector uses to collect SQL execution cpu time. type Collector interface { // Collect uses to collect the SQL execution cpu time. // ts is a Unix time, unit is second. Collect(ts uint64, stats []SQLCPUTimeRecord) + + // IsPaused indicates that the Collector is not expecting to receive records for now, + // and may resume in the future. + IsPaused() bool } // SQLCPUTimeRecord represents a single record of how much cpu time a sql plan consumes in one second. @@ -69,7 +87,11 @@ type sqlCPUProfiler struct { sync.Mutex ept *exportProfileTask } - collector atomic.Value + + collector struct { + sync.Mutex + v Collector + } } var ( @@ -95,21 +117,23 @@ func (sp *sqlCPUProfiler) Run() { } func (sp *sqlCPUProfiler) SetCollector(c Collector) { - sp.collector.Store(c) + sp.collector.Lock() + sp.collector.v = c + sp.collector.Unlock() } -func (sp *sqlCPUProfiler) GetCollector() Collector { - c, ok := sp.collector.Load().(Collector) - if !ok || c == nil { - return nil - } - return c +func (sp *sqlCPUProfiler) GetCollector() (res Collector) { + sp.collector.Lock() + res = sp.collector.v + sp.collector.Unlock() + + return } func (sp *sqlCPUProfiler) startCPUProfileWorker() { defer util.Recover("top-sql", "profileWorker", nil, false) for { - if sp.IsEnabled() { + if sp.ShouldProfile() { sp.doCPUProfile() } else { time.Sleep(time.Second) @@ -118,7 +142,8 @@ func (sp *sqlCPUProfiler) startCPUProfileWorker() { } func (sp *sqlCPUProfiler) doCPUProfile() { - intervalSecond := variable.TopSQLVariable.PrecisionSeconds.Load() + metrics.TopSQLProfileCounter.Inc() + intervalSecond := PrecisionSeconds.Load() task := sp.newProfileTask() if err := pprof.StartCPUProfile(task.buf); err != nil { // Sleep a while before retry. @@ -276,16 +301,24 @@ func (sp *sqlCPUProfiler) hasExportProfileTask() bool { return has } -// IsEnabled return true if it is(should be) enabled. It exports for tests. -func (sp *sqlCPUProfiler) IsEnabled() bool { - return variable.TopSQLEnabled() || sp.hasExportProfileTask() +// ShouldProfile return true if it's required to profile. It exports for tests. +func (sp *sqlCPUProfiler) ShouldProfile() bool { + if sp.hasExportProfileTask() { + return true + } + + c := sp.GetCollector() + if c == nil { + return false + } + return !c.IsPaused() } // StartCPUProfile same like pprof.StartCPUProfile. // Because the GlobalSQLCPUProfiler keep calling pprof.StartCPUProfile to fetch SQL cpu stats, other place (such pprof profile HTTP API handler) call pprof.StartCPUProfile will be failed, // other place should call tracecpu.StartCPUProfile instead of pprof.StartCPUProfile. func StartCPUProfile(w io.Writer) error { - if GlobalSQLCPUProfiler.IsEnabled() { + if GlobalSQLCPUProfiler.ShouldProfile() { return GlobalSQLCPUProfiler.startExportCPUProfile(w) } return pprof.StartCPUProfile(w) @@ -337,12 +370,11 @@ func (sp *sqlCPUProfiler) stopExportCPUProfile() error { // removeLabel uses to remove labels for export cpu profile data. // Since the sql_digest and plan_digest label is strange for other users. -// If `variable.EnablePProfSQLCPU` is true means wanto keep the `sql` label, otherwise, remove the `sql` label too. func (sp *sqlCPUProfiler) removeLabel(p *profile.Profile) { if p == nil { return } - keepLabelSQL := variable.EnablePProfSQLCPU.Load() + keepLabelSQL := EnablePProfSQLCPU.Load() for _, s := range p.Sample { for k := range s.Label { switch k { diff --git a/util/tracecpu/profile_test.go b/util/tracecpu/profile_test.go new file mode 100644 index 0000000000000..a92cee52a031e --- /dev/null +++ b/util/tracecpu/profile_test.go @@ -0,0 +1,61 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tracecpu_test + +import ( + "bytes" + "testing" + + "github.com/google/pprof/profile" + "github.com/pingcap/tidb/util/tracecpu" + "github.com/pingcap/tidb/util/tracecpu/mock" + "github.com/stretchr/testify/require" +) + +func TestShouldProfile(t *testing.T) { + controller := mock.NewProfileController(false) + tracecpu.GlobalSQLCPUProfiler.SetCollector(controller) + defer tracecpu.GlobalSQLCPUProfiler.SetCollector(nil) + + require.False(t, tracecpu.GlobalSQLCPUProfiler.ShouldProfile()) + + controller.SetEnabled(true) + err := tracecpu.StartCPUProfile(bytes.NewBuffer(nil)) + require.NoError(t, err) + require.True(t, tracecpu.GlobalSQLCPUProfiler.ShouldProfile()) + controller.SetEnabled(false) + require.True(t, tracecpu.GlobalSQLCPUProfiler.ShouldProfile()) + err = tracecpu.StopCPUProfile() + require.NoError(t, err) + + controller.SetEnabled(false) + require.False(t, tracecpu.GlobalSQLCPUProfiler.ShouldProfile()) + controller.SetEnabled(true) + require.True(t, tracecpu.GlobalSQLCPUProfiler.ShouldProfile()) +} + +func TestPProfCPUProfile(t *testing.T) { + buf := bytes.NewBuffer(nil) + err := tracecpu.StartCPUProfile(buf) + require.NoError(t, err) + // enable top sql. + ctl := mock.NewProfileController(true) + tracecpu.GlobalSQLCPUProfiler.SetCollector(ctl) + defer tracecpu.GlobalSQLCPUProfiler.SetCollector(nil) + err = tracecpu.StopCPUProfile() + require.NoError(t, err) + _, err = profile.Parse(buf) + require.NoError(t, err) +}