From 4c9cdbf0e225ab43f76bf591dd9df0e880404542 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Mon, 20 Dec 2021 10:24:45 +0800 Subject: [PATCH 1/8] topsql: add pubsub datasink Signed-off-by: Zhenchi --- server/rpc_server.go | 2 + util/topsql/reporter/mock/pubsub.go | 67 +++++++ util/topsql/reporter/pubsub.go | 267 ++++++++++++++++++++++++++++ util/topsql/topsql.go | 12 +- util/topsql/topsql_test.go | 163 +++++++++++++++++ 5 files changed, 510 insertions(+), 1 deletion(-) create mode 100644 util/topsql/reporter/mock/pubsub.go create mode 100644 util/topsql/reporter/pubsub.go diff --git a/server/rpc_server.go b/server/rpc_server.go index 674047781a6bd..3b23539c0bac1 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/memory" + "github.com/pingcap/tidb/util/topsql" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/keepalive" @@ -64,6 +65,7 @@ func NewRPCServer(config *config.Config, dom *domain.Domain, sm util.SessionMana } diagnosticspb.RegisterDiagnosticsServer(s, rpcSrv) tikvpb.RegisterTikvServer(s, rpcSrv) + topsql.RegisterPubSubServer(s) return s } diff --git a/util/topsql/reporter/mock/pubsub.go b/util/topsql/reporter/mock/pubsub.go new file mode 100644 index 0000000000000..493d95c17f827 --- /dev/null +++ b/util/topsql/reporter/mock/pubsub.go @@ -0,0 +1,67 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mock + +import ( + "fmt" + "net" + + "github.com/pingcap/tidb/util/logutil" + "go.uber.org/zap" + "google.golang.org/grpc" +) + +type mockPubSubServer struct { + addr string + listen net.Listener + grpcServer *grpc.Server +} + +// NewMockPubSubServer creates a mock publisher server. +func NewMockPubSubServer() (*mockPubSubServer, error) { + addr := "127.0.0.1:0" + lis, err := net.Listen("tcp", addr) + if err != nil { + return nil, err + } + server := grpc.NewServer() + + return &mockPubSubServer{ + addr: fmt.Sprintf("127.0.0.1:%d", lis.Addr().(*net.TCPAddr).Port), + listen: lis, + grpcServer: server, + }, nil +} + +func (svr *mockPubSubServer) Serve() { + err := svr.grpcServer.Serve(svr.listen) + if err != nil { + logutil.BgLogger().Warn("[top-sql] mock pubsub server serve failed", zap.Error(err)) + } +} + +func (svr *mockPubSubServer) Server() *grpc.Server { + return svr.grpcServer +} + +func (svr *mockPubSubServer) Address() string { + return svr.addr +} + +func (svr *mockPubSubServer) Stop() { + if svr.grpcServer != nil { + svr.grpcServer.Stop() + } +} diff --git a/util/topsql/reporter/pubsub.go b/util/topsql/reporter/pubsub.go new file mode 100644 index 0000000000000..7d01c077e058f --- /dev/null +++ b/util/topsql/reporter/pubsub.go @@ -0,0 +1,267 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package reporter + +import ( + "context" + "errors" + "time" + + "github.com/pingcap/tidb/util" + "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tipb/go-tipb" + "go.uber.org/zap" +) + +// TopSQLPubSubService implements tipb.TopSQLPubSubServer. +// +// If a client subscribes to TopSQL records, the TopSQLPubSubService is responsible +// for registering an associated DataSink to the reporter. Then the DataSink sends +// data to the client periodically. +type TopSQLPubSubService struct { + dataSinkRegisterer DataSinkRegisterer +} + +// NewTopSQLPubSubService creates a new TopSQLPubSubService. +func NewTopSQLPubSubService(dataSinkRegisterer DataSinkRegisterer) *TopSQLPubSubService { + return &TopSQLPubSubService{dataSinkRegisterer: dataSinkRegisterer} +} + +var _ tipb.TopSQLPubSubServer = &TopSQLPubSubService{} + +// Subscribe registers dataSinks to the reporter and redirects data received from reporter +// to subscribers associated with those dataSinks. +func (ps *TopSQLPubSubService) Subscribe(_ *tipb.TopSQLSubRequest, stream tipb.TopSQLPubSub_SubscribeServer) error { + ds := newPubSubDataSink(stream, ps.dataSinkRegisterer) + if err := ps.dataSinkRegisterer.Register(ds); err != nil { + return err + } + return ds.run() +} + +type pubSubDataSink struct { + ctx context.Context + cancel context.CancelFunc + + stream tipb.TopSQLPubSub_SubscribeServer + sendTaskCh chan sendTask + + // for deregister + registerer DataSinkRegisterer +} + +func newPubSubDataSink(stream tipb.TopSQLPubSub_SubscribeServer, registerer DataSinkRegisterer) *pubSubDataSink { + ctx, cancel := context.WithCancel(stream.Context()) + + return &pubSubDataSink{ + ctx: ctx, + cancel: cancel, + + stream: stream, + sendTaskCh: make(chan sendTask, 1), + + registerer: registerer, + } +} + +var _ DataSink = &pubSubDataSink{} + +func (ds *pubSubDataSink) TrySend(data *ReportData, deadline time.Time) error { + select { + case ds.sendTaskCh <- sendTask{data: data, deadline: deadline}: + return nil + case <-ds.ctx.Done(): + return ds.ctx.Err() + default: + ignoreReportChannelFullCounter.Inc() + return errors.New("the channel of pubsub dataSink is full") + } +} + +func (ds *pubSubDataSink) OnReporterClosing() { + ds.cancel() +} + +func (ds *pubSubDataSink) run() error { + defer func() { + ds.registerer.Deregister(ds) + ds.cancel() + }() + + for { + select { + case task := <-ds.sendTaskCh: + ctx, cancel := context.WithDeadline(ds.ctx, task.deadline) + var err error + + start := time.Now() + go util.WithRecovery(func() { + defer cancel() + err = ds.doSend(ctx, task.data) + + if err != nil { + reportAllDurationFailedHistogram.Observe(time.Since(start).Seconds()) + } else { + reportAllDurationSuccHistogram.Observe(time.Since(start).Seconds()) + } + }, nil) + + // When the deadline is exceeded, the closure inside `go util.WithRecovery` above may not notice that + // immediately because it can be blocked by `stream.Send`. + // In order to clean up resources as quickly as possible, we let that closure run in an individual goroutine, + // and wait for timeout here. + <-ctx.Done() + + if errors.Is(ctx.Err(), context.DeadlineExceeded) { + logutil.BgLogger().Warn( + "[top-sql] pubsub datasink failed to send data to subscriber due to deadline exceeded", + zap.Time("deadline", task.deadline), + ) + return ctx.Err() + } + + if err != nil { + logutil.BgLogger().Warn( + "[top-sql] pubsub datasink failed to send data to subscriber", + zap.Error(err), + ) + return err + } + case <-ds.ctx.Done(): + return ds.ctx.Err() + } + } +} + +func (ds *pubSubDataSink) doSend(ctx context.Context, data *ReportData) error { + if err := ds.sendCPUTime(ctx, data.CPUTimeRecords); err != nil { + return err + } + if err := ds.sendSQLMeta(ctx, data.SQLMetas); err != nil { + return err + } + return ds.sendPlanMeta(ctx, data.PlanMetas) +} + +func (ds *pubSubDataSink) sendCPUTime(ctx context.Context, records []tipb.CPUTimeRecord) (err error) { + if len(records) == 0 { + return + } + + start := time.Now() + sentCount := 0 + defer func() { + topSQLReportRecordCounterHistogram.Observe(float64(sentCount)) + if err != nil { + reportRecordDurationFailedHistogram.Observe(time.Since(start).Seconds()) + } else { + reportRecordDurationSuccHistogram.Observe(time.Since(start).Seconds()) + } + }() + + cpuRecord := &tipb.TopSQLSubResponse_Record{} + r := &tipb.TopSQLSubResponse{RespOneof: cpuRecord} + + for i := range records { + cpuRecord.Record = &records[i] + if err = ds.stream.Send(r); err != nil { + return + } + sentCount += 1 + + select { + case <-ctx.Done(): + err = ctx.Err() + return + default: + } + } + + return +} + +func (ds *pubSubDataSink) sendSQLMeta(ctx context.Context, sqlMetas []tipb.SQLMeta) (err error) { + if len(sqlMetas) == 0 { + return + } + + start := time.Now() + sentCount := 0 + defer func() { + topSQLReportSQLCountHistogram.Observe(float64(sentCount)) + if err != nil { + reportSQLDurationFailedHistogram.Observe(time.Since(start).Seconds()) + } else { + reportSQLDurationSuccHistogram.Observe(time.Since(start).Seconds()) + } + }() + + sqlMeta := &tipb.TopSQLSubResponse_SqlMeta{} + r := &tipb.TopSQLSubResponse{RespOneof: sqlMeta} + + for i := range sqlMetas { + sqlMeta.SqlMeta = &sqlMetas[i] + if err = ds.stream.Send(r); err != nil { + return + } + sentCount += 1 + + select { + case <-ctx.Done(): + err = ctx.Err() + return + default: + } + } + + return +} + +func (ds *pubSubDataSink) sendPlanMeta(ctx context.Context, planMetas []tipb.PlanMeta) (err error) { + if len(planMetas) == 0 { + return + } + + start := time.Now() + sentCount := 0 + defer func() { + topSQLReportPlanCountHistogram.Observe(float64(sentCount)) + if err != nil { + reportPlanDurationFailedHistogram.Observe(time.Since(start).Seconds()) + } else { + reportPlanDurationSuccHistogram.Observe(time.Since(start).Seconds()) + } + }() + + planMeta := &tipb.TopSQLSubResponse_PlanMeta{} + r := &tipb.TopSQLSubResponse{RespOneof: planMeta} + + for i := range planMetas { + planMeta.PlanMeta = &planMetas[i] + if err = ds.stream.Send(r); err != nil { + return + } + sentCount += 1 + + select { + case <-ctx.Done(): + err = ctx.Err() + return + default: + } + } + + return +} diff --git a/util/topsql/topsql.go b/util/topsql/topsql.go index 8f2aac1566642..9668b1d72ad44 100644 --- a/util/topsql/topsql.go +++ b/util/topsql/topsql.go @@ -26,7 +26,9 @@ import ( "github.com/pingcap/tidb/util/plancodec" "github.com/pingcap/tidb/util/topsql/reporter" "github.com/pingcap/tidb/util/topsql/tracecpu" + "github.com/pingcap/tipb/go-tipb" "go.uber.org/zap" + "google.golang.org/grpc" ) const ( @@ -37,7 +39,7 @@ const ( ) var ( - globalTopSQLReport reporter.TopSQLReporter + globalTopSQLReport *reporter.RemoteTopSQLReporter singleTargetDataSink *reporter.SingleTargetDataSink ) @@ -52,6 +54,14 @@ func SetupTopSQL() { tracecpu.GlobalSQLCPUProfiler.Run() } +// RegisterPubSubServer registers TopSQLPubSubService to the given gRPC server. +func RegisterPubSubServer(s *grpc.Server) { + if globalTopSQLReport != nil { + service := reporter.NewTopSQLPubSubService(globalTopSQLReport) + tipb.RegisterTopSQLPubSubServer(s, service) + } +} + // Close uses to close and release the top sql resource. func Close() { if singleTargetDataSink != nil { diff --git a/util/topsql/topsql_test.go b/util/topsql/topsql_test.go index d4aabc746ce0e..6a68eef31cc5d 100644 --- a/util/topsql/topsql_test.go +++ b/util/topsql/topsql_test.go @@ -29,7 +29,10 @@ import ( mockServer "github.com/pingcap/tidb/util/topsql/reporter/mock" "github.com/pingcap/tidb/util/topsql/tracecpu" "github.com/pingcap/tidb/util/topsql/tracecpu/mock" + "github.com/pingcap/tipb/go-tipb" "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/keepalive" ) type collectorWrapper struct { @@ -213,6 +216,166 @@ func TestMaxSQLAndPlanTest(t *testing.T) { require.Empty(t, cPlan) } +func TestTopSQLPubSub(t *testing.T) { + variable.TopSQLVariable.MaxStatementCount.Store(200) + variable.TopSQLVariable.ReportIntervalSeconds.Store(1) + config.UpdateGlobal(func(conf *config.Config) { + conf.TopSQL.ReceiverAddress = "mock" + }) + + report := reporter.NewRemoteTopSQLReporter(mockPlanBinaryDecoderFunc) + defer report.Close() + tracecpu.GlobalSQLCPUProfiler.SetCollector(&collectorWrapper{report}) + + server, err := mockServer.NewMockPubSubServer() + require.NoError(t, err) + pubsubService := reporter.NewTopSQLPubSubService(report) + tipb.RegisterTopSQLPubSubServer(server.Server(), pubsubService) + go server.Serve() + defer server.Stop() + + conn, err := grpc.Dial( + server.Address(), + grpc.WithBlock(), + grpc.WithInsecure(), + grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: 10 * time.Second, + Timeout: 3 * time.Second, + }), + ) + require.NoError(t, err) + defer conn.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + client := tipb.NewTopSQLPubSubClient(conn) + stream, err := client.Subscribe(ctx, &tipb.TopSQLSubRequest{}) + require.NoError(t, err) + + reqs := []struct { + sql string + plan string + }{ + {"select * from t where a=?", "point-get"}, + {"select * from t where a>?", "table-scan"}, + {"insert into t values (?)", ""}, + } + + digest2sql := make(map[string]string) + sql2plan := make(map[string]string) + for _, req := range reqs { + sql2plan[req.sql] = req.plan + sqlDigest := mock.GenSQLDigest(req.sql) + digest2sql[string(sqlDigest.Bytes())] = req.sql + + go func(sql, plan string) { + for { + select { + case <-ctx.Done(): + return + default: + mockExecuteSQL(sql, plan) + } + } + }(req.sql, req.plan) + } + + sqlMetas := make(map[string]*tipb.SQLMeta) + planMetas := make(map[string]string) + records := make(map[string]*tipb.CPUTimeRecord) + + for { + r, err := stream.Recv() + if err != nil { + break + } + + if r.GetRecord() != nil { + rec := r.GetRecord() + if _, ok := records[string(rec.SqlDigest)]; !ok { + records[string(rec.SqlDigest)] = rec + } else { + cpu := records[string(rec.SqlDigest)] + if rec.PlanDigest != nil { + cpu.PlanDigest = rec.PlanDigest + } + cpu.RecordListTimestampSec = append(cpu.RecordListTimestampSec, rec.RecordListTimestampSec...) + cpu.RecordListCpuTimeMs = append(cpu.RecordListCpuTimeMs, rec.RecordListCpuTimeMs...) + } + } else if r.GetSqlMeta() != nil { + sql := r.GetSqlMeta() + if _, ok := sqlMetas[string(sql.SqlDigest)]; !ok { + sqlMetas[string(sql.SqlDigest)] = sql + } + } else if r.GetPlanMeta() != nil { + plan := r.GetPlanMeta() + if _, ok := planMetas[string(plan.PlanDigest)]; !ok { + planMetas[string(plan.PlanDigest)] = plan.NormalizedPlan + } + } + } + + checkSQLPlanMap := map[string]struct{}{} + for i := range records { + record := records[i] + require.Greater(t, len(record.RecordListCpuTimeMs), 0) + require.Greater(t, record.RecordListCpuTimeMs[0], uint32(0)) + sqlMeta, exist := sqlMetas[string(record.SqlDigest)] + require.True(t, exist) + expectedNormalizedSQL, exist := digest2sql[string(record.SqlDigest)] + require.True(t, exist) + require.Equal(t, expectedNormalizedSQL, sqlMeta.NormalizedSql) + + expectedNormalizedPlan := sql2plan[expectedNormalizedSQL] + if expectedNormalizedPlan == "" || len(record.PlanDigest) == 0 { + require.Equal(t, len(record.PlanDigest), 0) + continue + } + normalizedPlan, exist := planMetas[string(record.PlanDigest)] + require.True(t, exist) + require.Equal(t, expectedNormalizedPlan, normalizedPlan) + checkSQLPlanMap[expectedNormalizedSQL] = struct{}{} + } + require.Equal(t, len(checkSQLPlanMap), 2) +} + +func TestPubSubWhenReporterIsStopped(t *testing.T) { + report := reporter.NewRemoteTopSQLReporter(mockPlanBinaryDecoderFunc) + + server, err := mockServer.NewMockPubSubServer() + require.NoError(t, err) + + pubsubService := reporter.NewTopSQLPubSubService(report) + tipb.RegisterTopSQLPubSubServer(server.Server(), pubsubService) + go server.Serve() + defer server.Stop() + + // stop reporter first + report.Close() + + // try to subscribe + conn, err := grpc.Dial( + server.Address(), + grpc.WithBlock(), + grpc.WithInsecure(), + grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: 10 * time.Second, + Timeout: 3 * time.Second, + }), + ) + require.NoError(t, err) + defer conn.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + client := tipb.NewTopSQLPubSubClient(conn) + stream, err := client.Subscribe(ctx, &tipb.TopSQLSubRequest{}) + require.NoError(t, err) + + _, err = stream.Recv() + require.Error(t, err, "reporter is closed") +} + func setTopSQLEnable(enabled bool) { variable.TopSQLVariable.Enable.Store(enabled) } From 9c9ea90cb01f8d30d91e7d7fa97fb9ad64386221 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Mon, 20 Dec 2021 14:31:03 +0800 Subject: [PATCH 2/8] fix single target Signed-off-by: Zhenchi --- sessionctx/variable/tidb_vars.go | 2 +- util/topsql/reporter/single_target.go | 16 +++++++++------- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index ee01348a76441..ccec9e1b5a8fc 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -832,5 +832,5 @@ type TopSQL struct { // TopSQLEnabled uses to check whether enabled the top SQL feature. func TopSQLEnabled() bool { - return TopSQLVariable.Enable.Load() && config.GetGlobalConfig().TopSQL.ReceiverAddress != "" + return TopSQLVariable.Enable.Load() } diff --git a/util/topsql/reporter/single_target.go b/util/topsql/reporter/single_target.go index 3ea61d75f633a..90834b2b904c3 100644 --- a/util/topsql/reporter/single_target.go +++ b/util/topsql/reporter/single_target.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tipb/go-tipb" + "go.uber.org/atomic" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/backoff" @@ -38,7 +39,7 @@ type SingleTargetDataSink struct { conn *grpc.ClientConn sendTaskCh chan sendTask - registered bool + registered *atomic.Bool registerer DataSinkRegisterer } @@ -53,7 +54,7 @@ func NewSingleTargetDataSink(registerer DataSinkRegisterer) *SingleTargetDataSin conn: nil, sendTaskCh: make(chan sendTask, 1), - registered: false, + registered: atomic.NewBool(false), registerer: registerer, } @@ -64,6 +65,7 @@ func NewSingleTargetDataSink(registerer DataSinkRegisterer) *SingleTargetDataSin logutil.BgLogger().Warn("failed to register single target datasink", zap.Error(err)) return nil } + dataSink.registered.Store(true) } go dataSink.recoverRun() @@ -119,17 +121,17 @@ func (ds *SingleTargetDataSink) run() (rerun bool) { } func (ds *SingleTargetDataSink) tryRegister(addr string) error { - if addr == "" && ds.registered { + if addr == "" && ds.registered.Load() { ds.registerer.Deregister(ds) - ds.registered = false + ds.registered.Store(false) return nil } - if addr != "" && !ds.registered { + if addr != "" && !ds.registered.Load() { if err := ds.registerer.Register(ds); err != nil { return err } - ds.registered = true + ds.registered.Store(true) } return nil } @@ -160,7 +162,7 @@ func (ds *SingleTargetDataSink) OnReporterClosing() { func (ds *SingleTargetDataSink) Close() { ds.cancel() - if ds.registered { + if ds.registered.Load() { ds.registerer.Deregister(ds) } } From 484cf7c818d354cd5eaff7da2c1e46ffb26c0738 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Tue, 21 Dec 2021 09:00:05 +0800 Subject: [PATCH 3/8] rename Signed-off-by: Zhenchi --- sessionctx/variable/sysvar.go | 7 +------ util/topsql/reporter/single_target.go | 6 ++++-- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 3491f28bc73dc..9ea2c41f704ca 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -1245,12 +1245,7 @@ var defaultSysVars = []*SysVar{ return nil }}, // variable for top SQL feature. - {Scope: ScopeGlobal, Name: TiDBEnableTopSQL, Value: BoolToOnOff(DefTiDBTopSQLEnable), Type: TypeBool, Hidden: true, AllowEmpty: true, GetGlobal: func(s *SessionVars) (string, error) { - return BoolToOnOff(TopSQLVariable.Enable.Load()), nil - }, SetGlobal: func(vars *SessionVars, s string) error { - TopSQLVariable.Enable.Store(TiDBOptOn(s)) - return nil - }, GlobalConfigName: GlobalConfigEnableTopSQL}, + {Scope: ScopeGlobal, Name: TiDBEnableTopSQL, Value: BoolToOnOff(DefTiDBTopSQLEnable), Type: TypeBool, Hidden: true, AllowEmpty: true, GlobalConfigName: GlobalConfigEnableTopSQL}, {Scope: ScopeGlobal, Name: TiDBTopSQLPrecisionSeconds, Value: strconv.Itoa(DefTiDBTopSQLPrecisionSeconds), Type: TypeInt, Hidden: true, MinValue: 1, MaxValue: math.MaxInt64, GetGlobal: func(s *SessionVars) (string, error) { return strconv.FormatInt(TopSQLVariable.PrecisionSeconds.Load(), 10), nil }, SetGlobal: func(vars *SessionVars, s string) error { diff --git a/util/topsql/reporter/single_target.go b/util/topsql/reporter/single_target.go index 90834b2b904c3..358926535356b 100644 --- a/util/topsql/reporter/single_target.go +++ b/util/topsql/reporter/single_target.go @@ -113,20 +113,22 @@ func (ds *SingleTargetDataSink) run() (rerun bool) { targetRPCAddr = config.GetGlobalConfig().TopSQL.ReceiverAddress } - if err := ds.tryRegister(targetRPCAddr); err != nil { + if err := ds.trySwitchRegistration(targetRPCAddr); err != nil { logutil.BgLogger().Warn("failed to register the single target datasink", zap.Error(err)) return false } } } -func (ds *SingleTargetDataSink) tryRegister(addr string) error { +func (ds *SingleTargetDataSink) trySwitchRegistration(addr string) error { + // deregister if `addr` is empty and registered before if addr == "" && ds.registered.Load() { ds.registerer.Deregister(ds) ds.registered.Store(false) return nil } + // register if `add` is not empty and not registered before if addr != "" && !ds.registered.Load() { if err := ds.registerer.Register(ds); err != nil { return err From c7d00308517698f8515a3d8172b480a152a78f91 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Tue, 21 Dec 2021 09:01:08 +0800 Subject: [PATCH 4/8] move a comment Signed-off-by: Zhenchi --- util/topsql/reporter/single_target.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/util/topsql/reporter/single_target.go b/util/topsql/reporter/single_target.go index 358926535356b..f9c9dd6c5107e 100644 --- a/util/topsql/reporter/single_target.go +++ b/util/topsql/reporter/single_target.go @@ -114,7 +114,6 @@ func (ds *SingleTargetDataSink) run() (rerun bool) { } if err := ds.trySwitchRegistration(targetRPCAddr); err != nil { - logutil.BgLogger().Warn("failed to register the single target datasink", zap.Error(err)) return false } } @@ -131,6 +130,7 @@ func (ds *SingleTargetDataSink) trySwitchRegistration(addr string) error { // register if `add` is not empty and not registered before if addr != "" && !ds.registered.Load() { if err := ds.registerer.Register(ds); err != nil { + logutil.BgLogger().Warn("failed to register the single target datasink", zap.Error(err)) return err } ds.registered.Store(true) From 2199aebe3f0fb3e9326ac7961e8af6c5b0a6486e Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Tue, 21 Dec 2021 09:05:53 +0800 Subject: [PATCH 5/8] keep sql global setting Signed-off-by: Zhenchi --- sessionctx/variable/sysvar.go | 7 ++++++- util/topsql/topsql_test.go | 3 --- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 9ea2c41f704ca..3491f28bc73dc 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -1245,7 +1245,12 @@ var defaultSysVars = []*SysVar{ return nil }}, // variable for top SQL feature. - {Scope: ScopeGlobal, Name: TiDBEnableTopSQL, Value: BoolToOnOff(DefTiDBTopSQLEnable), Type: TypeBool, Hidden: true, AllowEmpty: true, GlobalConfigName: GlobalConfigEnableTopSQL}, + {Scope: ScopeGlobal, Name: TiDBEnableTopSQL, Value: BoolToOnOff(DefTiDBTopSQLEnable), Type: TypeBool, Hidden: true, AllowEmpty: true, GetGlobal: func(s *SessionVars) (string, error) { + return BoolToOnOff(TopSQLVariable.Enable.Load()), nil + }, SetGlobal: func(vars *SessionVars, s string) error { + TopSQLVariable.Enable.Store(TiDBOptOn(s)) + return nil + }, GlobalConfigName: GlobalConfigEnableTopSQL}, {Scope: ScopeGlobal, Name: TiDBTopSQLPrecisionSeconds, Value: strconv.Itoa(DefTiDBTopSQLPrecisionSeconds), Type: TypeInt, Hidden: true, MinValue: 1, MaxValue: math.MaxInt64, GetGlobal: func(s *SessionVars) (string, error) { return strconv.FormatInt(TopSQLVariable.PrecisionSeconds.Load(), 10), nil }, SetGlobal: func(vars *SessionVars, s string) error { diff --git a/util/topsql/topsql_test.go b/util/topsql/topsql_test.go index 6a68eef31cc5d..462c6ffb70aa2 100644 --- a/util/topsql/topsql_test.go +++ b/util/topsql/topsql_test.go @@ -219,9 +219,6 @@ func TestMaxSQLAndPlanTest(t *testing.T) { func TestTopSQLPubSub(t *testing.T) { variable.TopSQLVariable.MaxStatementCount.Store(200) variable.TopSQLVariable.ReportIntervalSeconds.Store(1) - config.UpdateGlobal(func(conf *config.Config) { - conf.TopSQL.ReceiverAddress = "mock" - }) report := reporter.NewRemoteTopSQLReporter(mockPlanBinaryDecoderFunc) defer report.Close() From b26fbac196cf81b2b0b923c801f9bf659311f58d Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Tue, 21 Dec 2021 09:06:52 +0800 Subject: [PATCH 6/8] fix Signed-off-by: Zhenchi --- util/topsql/reporter/single_target.go | 1 + 1 file changed, 1 insertion(+) diff --git a/util/topsql/reporter/single_target.go b/util/topsql/reporter/single_target.go index f9c9dd6c5107e..9643325a6809a 100644 --- a/util/topsql/reporter/single_target.go +++ b/util/topsql/reporter/single_target.go @@ -166,6 +166,7 @@ func (ds *SingleTargetDataSink) Close() { if ds.registered.Load() { ds.registerer.Deregister(ds) + ds.registered.Store(false) } } From 27d972f303310208ad64f15da24d4e80e4c416bf Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Tue, 21 Dec 2021 11:02:24 +0800 Subject: [PATCH 7/8] fix not add defer Signed-off-by: Zhenchi --- util/topsql/reporter/single_target.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/util/topsql/reporter/single_target.go b/util/topsql/reporter/single_target.go index 9643325a6809a..2c7c6a1bb7791 100644 --- a/util/topsql/reporter/single_target.go +++ b/util/topsql/reporter/single_target.go @@ -177,12 +177,14 @@ func (ds *SingleTargetDataSink) doSend(addr string, task sendTask) { var err error start := time.Now() - if err != nil { - logutil.BgLogger().Warn("[top-sql] single target data sink failed to send data to receiver", zap.Error(err)) - reportAllDurationFailedHistogram.Observe(time.Since(start).Seconds()) - } else { - reportAllDurationSuccHistogram.Observe(time.Since(start).Seconds()) - } + defer func() { + if err != nil { + logutil.BgLogger().Warn("[top-sql] single target data sink failed to send data to receiver", zap.Error(err)) + reportAllDurationFailedHistogram.Observe(time.Since(start).Seconds()) + } else { + reportAllDurationSuccHistogram.Observe(time.Since(start).Seconds()) + } + }() ctx, cancel := context.WithDeadline(context.Background(), task.deadline) defer cancel() From 080b6a1c03a55b7c68dcaccd136626123d010116 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Tue, 21 Dec 2021 11:04:53 +0800 Subject: [PATCH 8/8] fix single target wrong metrics Signed-off-by: Zhenchi --- util/topsql/reporter/single_target.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/util/topsql/reporter/single_target.go b/util/topsql/reporter/single_target.go index 2c7c6a1bb7791..3744702ba26d6 100644 --- a/util/topsql/reporter/single_target.go +++ b/util/topsql/reporter/single_target.go @@ -230,8 +230,9 @@ func (ds *SingleTargetDataSink) sendBatchCPUTimeRecord(ctx context.Context, reco topSQLReportRecordCounterHistogram.Observe(float64(sentCount)) if err != nil { reportRecordDurationFailedHistogram.Observe(time.Since(start).Seconds()) + } else { + reportRecordDurationSuccHistogram.Observe(time.Since(start).Seconds()) } - reportRecordDurationSuccHistogram.Observe(time.Since(start).Seconds()) }() client := tipb.NewTopSQLAgentClient(ds.conn) @@ -261,11 +262,11 @@ func (ds *SingleTargetDataSink) sendBatchSQLMeta(ctx context.Context, sqlMetas [ sentCount := 0 defer func() { topSQLReportSQLCountHistogram.Observe(float64(sentCount)) - if err != nil { reportSQLDurationFailedHistogram.Observe(time.Since(start).Seconds()) + } else { + reportSQLDurationSuccHistogram.Observe(time.Since(start).Seconds()) } - reportSQLDurationSuccHistogram.Observe(time.Since(start).Seconds()) }() client := tipb.NewTopSQLAgentClient(ds.conn) @@ -298,8 +299,9 @@ func (ds *SingleTargetDataSink) sendBatchPlanMeta(ctx context.Context, planMetas topSQLReportPlanCountHistogram.Observe(float64(sentCount)) if err != nil { reportPlanDurationFailedHistogram.Observe(time.Since(start).Seconds()) + } else { + reportPlanDurationSuccHistogram.Observe(time.Since(start).Seconds()) } - reportPlanDurationSuccHistogram.Observe(time.Since(start).Seconds()) }() client := tipb.NewTopSQLAgentClient(ds.conn)