Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

topsql: add pubsub datasink #30860

Merged
merged 11 commits into from
Dec 21, 2021
2 changes: 2 additions & 0 deletions server/rpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/topsql"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
Expand Down Expand Up @@ -64,6 +65,7 @@ func NewRPCServer(config *config.Config, dom *domain.Domain, sm util.SessionMana
}
diagnosticspb.RegisterDiagnosticsServer(s, rpcSrv)
tikvpb.RegisterTikvServer(s, rpcSrv)
topsql.RegisterPubSubServer(s)
return s
}

Expand Down
2 changes: 1 addition & 1 deletion sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -832,5 +832,5 @@ type TopSQL struct {

// TopSQLEnabled uses to check whether enabled the top SQL feature.
func TopSQLEnabled() bool {
return TopSQLVariable.Enable.Load() && config.GetGlobalConfig().TopSQL.ReceiverAddress != ""
return TopSQLVariable.Enable.Load()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this variable is no longer controllable, how about moving it out to be a atomic.Bool instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about what 'controllable' means.

For now, the report will modify it to indicate if someone subscribes to topsql data. Some components read it to determine whether to set tag and so on.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By "controllable" I mean user can set it and control the behavior of the system. Variables are usually controllable things, while status like things are usually placed inside SHOW STATUS. I don't have strong opinion of where it can be placed or whether it should be ever known by the TiDB user. What I know is that variable may be not a good place now.

cc @crazycs520

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can move this variable into topsql pkg later.

}
67 changes: 67 additions & 0 deletions util/topsql/reporter/mock/pubsub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package mock

import (
"fmt"
"net"

"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
"google.golang.org/grpc"
)

type mockPubSubServer struct {
addr string
listen net.Listener
grpcServer *grpc.Server
}

// NewMockPubSubServer creates a mock publisher server.
func NewMockPubSubServer() (*mockPubSubServer, error) {
addr := "127.0.0.1:0"
lis, err := net.Listen("tcp", addr)
if err != nil {
return nil, err
}
server := grpc.NewServer()

return &mockPubSubServer{
addr: fmt.Sprintf("127.0.0.1:%d", lis.Addr().(*net.TCPAddr).Port),
listen: lis,
grpcServer: server,
}, nil
}

func (svr *mockPubSubServer) Serve() {
err := svr.grpcServer.Serve(svr.listen)
if err != nil {
logutil.BgLogger().Warn("[top-sql] mock pubsub server serve failed", zap.Error(err))
}
}

func (svr *mockPubSubServer) Server() *grpc.Server {
return svr.grpcServer
}

func (svr *mockPubSubServer) Address() string {
return svr.addr
}

func (svr *mockPubSubServer) Stop() {
if svr.grpcServer != nil {
svr.grpcServer.Stop()
}
}
267 changes: 267 additions & 0 deletions util/topsql/reporter/pubsub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,267 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package reporter

import (
"context"
"errors"
"time"

"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tipb/go-tipb"
"go.uber.org/zap"
)

// TopSQLPubSubService implements tipb.TopSQLPubSubServer.
//
// If a client subscribes to TopSQL records, the TopSQLPubSubService is responsible
// for registering an associated DataSink to the reporter. Then the DataSink sends
// data to the client periodically.
type TopSQLPubSubService struct {
dataSinkRegisterer DataSinkRegisterer
}

// NewTopSQLPubSubService creates a new TopSQLPubSubService.
func NewTopSQLPubSubService(dataSinkRegisterer DataSinkRegisterer) *TopSQLPubSubService {
return &TopSQLPubSubService{dataSinkRegisterer: dataSinkRegisterer}
}

var _ tipb.TopSQLPubSubServer = &TopSQLPubSubService{}

// Subscribe registers dataSinks to the reporter and redirects data received from reporter
// to subscribers associated with those dataSinks.
func (ps *TopSQLPubSubService) Subscribe(_ *tipb.TopSQLSubRequest, stream tipb.TopSQLPubSub_SubscribeServer) error {
ds := newPubSubDataSink(stream, ps.dataSinkRegisterer)
if err := ps.dataSinkRegisterer.Register(ds); err != nil {
return err
}
return ds.run()
}

type pubSubDataSink struct {
ctx context.Context
cancel context.CancelFunc

stream tipb.TopSQLPubSub_SubscribeServer
sendTaskCh chan sendTask

// for deregister
registerer DataSinkRegisterer
}

func newPubSubDataSink(stream tipb.TopSQLPubSub_SubscribeServer, registerer DataSinkRegisterer) *pubSubDataSink {
ctx, cancel := context.WithCancel(stream.Context())

return &pubSubDataSink{
ctx: ctx,
cancel: cancel,

stream: stream,
sendTaskCh: make(chan sendTask, 1),

registerer: registerer,
}
}

var _ DataSink = &pubSubDataSink{}

func (ds *pubSubDataSink) TrySend(data *ReportData, deadline time.Time) error {
select {
case ds.sendTaskCh <- sendTask{data: data, deadline: deadline}:
return nil
case <-ds.ctx.Done():
return ds.ctx.Err()
default:
ignoreReportChannelFullCounter.Inc()
return errors.New("the channel of pubsub dataSink is full")
}
}

func (ds *pubSubDataSink) OnReporterClosing() {
ds.cancel()
}

func (ds *pubSubDataSink) run() error {
defer func() {
ds.registerer.Deregister(ds)
ds.cancel()
}()

for {
select {
case task := <-ds.sendTaskCh:
ctx, cancel := context.WithDeadline(ds.ctx, task.deadline)
var err error

start := time.Now()
go util.WithRecovery(func() {
defer cancel()
err = ds.doSend(ctx, task.data)

if err != nil {
reportAllDurationFailedHistogram.Observe(time.Since(start).Seconds())
} else {
reportAllDurationSuccHistogram.Observe(time.Since(start).Seconds())
}
}, nil)

// When the deadline is exceeded, the closure inside `go util.WithRecovery` above may not notice that
// immediately because it can be blocked by `stream.Send`.
breezewish marked this conversation as resolved.
Show resolved Hide resolved
// In order to clean up resources as quickly as possible, we let that closure run in an individual goroutine,
// and wait for timeout here.
<-ctx.Done()

if errors.Is(ctx.Err(), context.DeadlineExceeded) {
logutil.BgLogger().Warn(
"[top-sql] pubsub datasink failed to send data to subscriber due to deadline exceeded",
zap.Time("deadline", task.deadline),
)
return ctx.Err()
}

if err != nil {
logutil.BgLogger().Warn(
"[top-sql] pubsub datasink failed to send data to subscriber",
zap.Error(err),
)
return err
}
case <-ds.ctx.Done():
return ds.ctx.Err()
}
}
}

func (ds *pubSubDataSink) doSend(ctx context.Context, data *ReportData) error {
if err := ds.sendCPUTime(ctx, data.CPUTimeRecords); err != nil {
return err
}
if err := ds.sendSQLMeta(ctx, data.SQLMetas); err != nil {
return err
}
return ds.sendPlanMeta(ctx, data.PlanMetas)
}

func (ds *pubSubDataSink) sendCPUTime(ctx context.Context, records []tipb.CPUTimeRecord) (err error) {
if len(records) == 0 {
return
}

start := time.Now()
sentCount := 0
defer func() {
topSQLReportRecordCounterHistogram.Observe(float64(sentCount))
if err != nil {
reportRecordDurationFailedHistogram.Observe(time.Since(start).Seconds())
} else {
reportRecordDurationSuccHistogram.Observe(time.Since(start).Seconds())
}
}()

cpuRecord := &tipb.TopSQLSubResponse_Record{}
r := &tipb.TopSQLSubResponse{RespOneof: cpuRecord}

for i := range records {
cpuRecord.Record = &records[i]
if err = ds.stream.Send(r); err != nil {
return
}
sentCount += 1

select {
case <-ctx.Done():
err = ctx.Err()
return
default:
}
}

return
}

func (ds *pubSubDataSink) sendSQLMeta(ctx context.Context, sqlMetas []tipb.SQLMeta) (err error) {
if len(sqlMetas) == 0 {
return
}

start := time.Now()
sentCount := 0
defer func() {
topSQLReportSQLCountHistogram.Observe(float64(sentCount))
if err != nil {
reportSQLDurationFailedHistogram.Observe(time.Since(start).Seconds())
} else {
reportSQLDurationSuccHistogram.Observe(time.Since(start).Seconds())
}
}()

sqlMeta := &tipb.TopSQLSubResponse_SqlMeta{}
r := &tipb.TopSQLSubResponse{RespOneof: sqlMeta}

for i := range sqlMetas {
sqlMeta.SqlMeta = &sqlMetas[i]
if err = ds.stream.Send(r); err != nil {
return
}
sentCount += 1

select {
case <-ctx.Done():
err = ctx.Err()
return
default:
}
}

return
}

func (ds *pubSubDataSink) sendPlanMeta(ctx context.Context, planMetas []tipb.PlanMeta) (err error) {
if len(planMetas) == 0 {
return
}

start := time.Now()
sentCount := 0
defer func() {
topSQLReportPlanCountHistogram.Observe(float64(sentCount))
if err != nil {
reportPlanDurationFailedHistogram.Observe(time.Since(start).Seconds())
} else {
reportPlanDurationSuccHistogram.Observe(time.Since(start).Seconds())
}
}()

planMeta := &tipb.TopSQLSubResponse_PlanMeta{}
r := &tipb.TopSQLSubResponse{RespOneof: planMeta}

for i := range planMetas {
planMeta.PlanMeta = &planMetas[i]
if err = ds.stream.Send(r); err != nil {
return
}
sentCount += 1

select {
case <-ctx.Done():
err = ctx.Err()
return
default:
}
}

return
}
Loading