Skip to content

Commit

Permalink
topsql: support multiple datasinks (#30808)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhongzc authored Dec 17, 2021
1 parent dd95dd6 commit c0888f5
Show file tree
Hide file tree
Showing 7 changed files with 326 additions and 90 deletions.
8 changes: 7 additions & 1 deletion server/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1536,7 +1536,13 @@ func TestTopSQLAgent(t *testing.T) {
dbt.MustExec("set @@global.tidb_top_sql_report_interval_seconds=2;")
dbt.MustExec("set @@global.tidb_top_sql_max_statement_count=5;")

r := reporter.NewRemoteTopSQLReporter(reporter.NewSingleTargetDataSink(), plancodec.DecodeNormalizedPlan)
r := reporter.NewRemoteTopSQLReporter(plancodec.DecodeNormalizedPlan)
s := reporter.NewSingleTargetDataSink(r)
defer func() {
r.Close()
s.Close()
}()

tracecpu.GlobalSQLCPUProfiler.SetCollector(&collectorWrapper{r})

// TODO: change to ensure that the right sql statements are reported, not just counts
Expand Down
12 changes: 2 additions & 10 deletions util/topsql/reporter/datasink.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,6 @@ type DataSink interface {
// the specified deadline, or the sink is closed, an error will be returned.
TrySend(data *ReportData, deadline time.Time) error

// IsPaused indicates that the DataSink is not expecting to receive records for now
// and may resume in the future.
IsPaused() bool

// IsDown indicates that the DataSink has been down and can be cleared.
// Note that: once a DataSink is down, it cannot go back to be up.
IsDown() bool

// Close cleans up resources owned by this DataSink
Close()
// OnReporterClosing notifies DataSink that the reporter is closing.
OnReporterClosing()
}
90 changes: 80 additions & 10 deletions util/topsql/reporter/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package reporter
import (
"bytes"
"context"
"errors"
"sort"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -53,6 +54,12 @@ type TopSQLReporter interface {
Close()
}

// DataSinkRegisterer is for registering DataSink
type DataSinkRegisterer interface {
Register(dataSink DataSink) error
Deregister(dataSink DataSink)
}

type cpuData struct {
timestamp uint64
records []tracecpu.SQLCPUTimeRecord
Expand Down Expand Up @@ -118,9 +125,11 @@ type planBinaryDecodeFunc func(string) (string, error)
// RemoteTopSQLReporter implements a TopSQL reporter that sends data to a remote agent
// This should be called periodically to collect TopSQL resource usage metrics
type RemoteTopSQLReporter struct {
ctx context.Context
cancel context.CancelFunc
dataSink DataSink
ctx context.Context
cancel context.CancelFunc

dataSinkMu sync.Mutex
dataSinks map[DataSink]struct{}

// normalizedSQLMap is an map, whose keys are SQL digest strings and values are SQLMeta.
normalizedSQLMap atomic.Value // sync.Map
Expand Down Expand Up @@ -148,12 +157,14 @@ type SQLMeta struct {
//
// planBinaryDecoder is a decoding function which will be called asynchronously to decode the plan binary to string
// MaxStatementsNum is the maximum SQL and plan number, which will restrict the memory usage of the internal LFU cache
func NewRemoteTopSQLReporter(dataSink DataSink, decodePlan planBinaryDecodeFunc) *RemoteTopSQLReporter {
func NewRemoteTopSQLReporter(decodePlan planBinaryDecodeFunc) *RemoteTopSQLReporter {
ctx, cancel := context.WithCancel(context.Background())
tsr := &RemoteTopSQLReporter{
ctx: ctx,
cancel: cancel,
dataSink: dataSink,
ctx: ctx,
cancel: cancel,

dataSinks: make(map[DataSink]struct{}, 10),

collectCPUDataChan: make(chan cpuData, 1),
reportCollectedDataChan: make(chan collectedData, 1),
decodePlan: decodePlan,
Expand Down Expand Up @@ -222,6 +233,47 @@ func (tsr *RemoteTopSQLReporter) RegisterPlan(planDigest []byte, normalizedBinar
}
}

var _ DataSinkRegisterer = &RemoteTopSQLReporter{}

// Register implements DataSinkRegisterer interface.
func (tsr *RemoteTopSQLReporter) Register(dataSink DataSink) error {
tsr.dataSinkMu.Lock()
defer tsr.dataSinkMu.Unlock()

select {
case <-tsr.ctx.Done():
return errors.New("reporter is closed")
default:
if len(tsr.dataSinks) >= 10 {
return errors.New("too many datasinks")
}

tsr.dataSinks[dataSink] = struct{}{}

if len(tsr.dataSinks) > 0 {
variable.TopSQLVariable.Enable.Store(true)
}

return nil
}
}

// Deregister implements DataSinkRegisterer interface.
func (tsr *RemoteTopSQLReporter) Deregister(dataSink DataSink) {
tsr.dataSinkMu.Lock()
defer tsr.dataSinkMu.Unlock()

select {
case <-tsr.ctx.Done():
default:
delete(tsr.dataSinks, dataSink)

if len(tsr.dataSinks) == 0 {
variable.TopSQLVariable.Enable.Store(false)
}
}
}

// Collect receives CPU time records for processing. WARN: It will drop the records if the processing is not in time.
// This function is thread-safe and efficient.
func (tsr *RemoteTopSQLReporter) Collect(timestamp uint64, records []tracecpu.SQLCPUTimeRecord) {
Expand All @@ -242,7 +294,15 @@ func (tsr *RemoteTopSQLReporter) Collect(timestamp uint64, records []tracecpu.SQ
// Close uses to close and release the reporter resource.
func (tsr *RemoteTopSQLReporter) Close() {
tsr.cancel()
tsr.dataSink.Close()

var m map[DataSink]struct{}
tsr.dataSinkMu.Lock()
m, tsr.dataSinks = tsr.dataSinks, make(map[DataSink]struct{})
tsr.dataSinkMu.Unlock()

for d := range m {
d.OnReporterClosing()
}
}

func addEvictedCPUTime(collectTarget map[string]*dataPoints, timestamp uint64, totalCPUTimeMs uint32) {
Expand Down Expand Up @@ -585,7 +645,17 @@ func (tsr *RemoteTopSQLReporter) doReport(data *ReportData) {
}
})
deadline := time.Now().Add(timeout)
if err := tsr.dataSink.TrySend(data, deadline); err != nil {
logutil.BgLogger().Warn("[top-sql] failed to send data to datasink", zap.Error(err))

tsr.dataSinkMu.Lock()
dataSinks := make([]DataSink, 0, len(tsr.dataSinks))
for ds := range tsr.dataSinks {
dataSinks = append(dataSinks, ds)
}
tsr.dataSinkMu.Unlock()

for _, ds := range dataSinks {
if err := ds.TrySend(data, deadline); err != nil {
logutil.BgLogger().Warn("[top-sql] failed to send data to datasink", zap.Error(err))
}
}
}
156 changes: 137 additions & 19 deletions util/topsql/reporter/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,32 +64,53 @@ func mockPlanBinaryDecoderFunc(plan string) (string, error) {
return plan, nil
}

func setupRemoteTopSQLReporter(maxStatementsNum, interval int, addr string) *RemoteTopSQLReporter {
type mockDataSink struct {
ch chan *ReportData
}

func newMockDataSink(ch chan *ReportData) DataSink {
return &mockDataSink{ch: ch}
}

var _ DataSink = &mockDataSink{}

func (ds *mockDataSink) TrySend(data *ReportData, _ time.Time) error {
ds.ch <- data
return nil
}

func (ds *mockDataSink) OnReporterClosing() {
}

func setupRemoteTopSQLReporter(maxStatementsNum, interval int, addr string) (*RemoteTopSQLReporter, *SingleTargetDataSink) {
variable.TopSQLVariable.MaxStatementCount.Store(int64(maxStatementsNum))
variable.TopSQLVariable.MaxCollect.Store(10000)
variable.TopSQLVariable.ReportIntervalSeconds.Store(int64(interval))
config.UpdateGlobal(func(conf *config.Config) {
conf.TopSQL.ReceiverAddress = addr
})

rc := NewSingleTargetDataSink()
ts := NewRemoteTopSQLReporter(rc, mockPlanBinaryDecoderFunc)
return ts
ts := NewRemoteTopSQLReporter(mockPlanBinaryDecoderFunc)
ds := NewSingleTargetDataSink(ts)
return ts, ds
}

func initializeCache(maxStatementsNum, interval int, addr string) *RemoteTopSQLReporter {
ts := setupRemoteTopSQLReporter(maxStatementsNum, interval, addr)
func initializeCache(maxStatementsNum, interval int, addr string) (*RemoteTopSQLReporter, *SingleTargetDataSink) {
ts, ds := setupRemoteTopSQLReporter(maxStatementsNum, interval, addr)
populateCache(ts, 0, maxStatementsNum, 1)
return ts
return ts, ds
}

func TestCollectAndSendBatch(t *testing.T) {
agentServer, err := mock.StartMockAgentServer()
require.NoError(t, err)
defer agentServer.Stop()

tsr := setupRemoteTopSQLReporter(maxSQLNum, 1, agentServer.Address())
defer tsr.Close()
tsr, ds := setupRemoteTopSQLReporter(maxSQLNum, 1, agentServer.Address())
defer func() {
ds.Close()
tsr.Close()
}()
populateCache(tsr, 0, maxSQLNum, 1)

agentServer.WaitCollectCnt(1, time.Second*5)
Expand Down Expand Up @@ -127,8 +148,11 @@ func TestCollectAndEvicted(t *testing.T) {
require.NoError(t, err)
defer agentServer.Stop()

tsr := setupRemoteTopSQLReporter(maxSQLNum, 1, agentServer.Address())
defer tsr.Close()
tsr, ds := setupRemoteTopSQLReporter(maxSQLNum, 1, agentServer.Address())
defer func() {
ds.Close()
tsr.Close()
}()
populateCache(tsr, 0, maxSQLNum*2, 2)

agentServer.WaitCollectCnt(1, time.Second*10)
Expand Down Expand Up @@ -192,8 +216,11 @@ func TestCollectAndTopN(t *testing.T) {
require.NoError(t, err)
defer agentServer.Stop()

tsr := setupRemoteTopSQLReporter(2, 1, agentServer.Address())
defer tsr.Close()
tsr, ds := setupRemoteTopSQLReporter(2, 1, agentServer.Address())
defer func() {
ds.Close()
tsr.Close()
}()

records := []tracecpu.SQLCPUTimeRecord{
newSQLCPUTimeRecord(tsr, 1, 1),
Expand Down Expand Up @@ -257,8 +284,11 @@ func TestCollectAndTopN(t *testing.T) {
}

func TestCollectCapacity(t *testing.T) {
tsr := setupRemoteTopSQLReporter(maxSQLNum, 60, "")
defer tsr.Close()
tsr, ds := setupRemoteTopSQLReporter(maxSQLNum, 60, "")
defer func() {
ds.Close()
tsr.Close()
}()

registerSQL := func(n int) {
for i := 0; i < n; i++ {
Expand Down Expand Up @@ -398,8 +428,11 @@ func TestCollectInternal(t *testing.T) {
require.NoError(t, err)
defer agentServer.Stop()

tsr := setupRemoteTopSQLReporter(3000, 1, agentServer.Address())
defer tsr.Close()
tsr, ds := setupRemoteTopSQLReporter(3000, 1, agentServer.Address())
defer func() {
ds.Close()
tsr.Close()
}()

records := []tracecpu.SQLCPUTimeRecord{
newSQLCPUTimeRecord(tsr, 1, 1),
Expand Down Expand Up @@ -428,15 +461,100 @@ func TestCollectInternal(t *testing.T) {
}
}

func TestMultipleDataSinks(t *testing.T) {
variable.TopSQLVariable.ReportIntervalSeconds.Store(1)

tsr := NewRemoteTopSQLReporter(mockPlanBinaryDecoderFunc)
defer tsr.Close()

var chs []chan *ReportData
for i := 0; i < 7; i++ {
chs = append(chs, make(chan *ReportData, 1))
}
var dss []DataSink
for _, ch := range chs {
dss = append(dss, newMockDataSink(ch))
}
for _, ds := range dss {
require.NoError(t, tsr.Register(ds))
}

records := []tracecpu.SQLCPUTimeRecord{
newSQLCPUTimeRecord(tsr, 1, 2),
}
tsr.Collect(3, records)

for _, ch := range chs {
d := <-ch
require.NotNil(t, d)
require.Equal(t, []tipb.CPUTimeRecord{{
SqlDigest: []byte("sqlDigest1"),
PlanDigest: []byte("planDigest1"),
RecordListTimestampSec: []uint64{3},
RecordListCpuTimeMs: []uint32{2},
}}, d.CPUTimeRecords)

require.Equal(t, []tipb.SQLMeta{{
SqlDigest: []byte("sqlDigest1"),
NormalizedSql: "sqlNormalized1",
}}, d.SQLMetas)

require.Equal(t, []tipb.PlanMeta{{
PlanDigest: []byte("planDigest1"),
NormalizedPlan: "planNormalized1",
}}, d.PlanMetas)
}

// deregister half of dataSinks
for i := 0; i < 7; i += 2 {
tsr.Deregister(dss[i])
}

records = []tracecpu.SQLCPUTimeRecord{
newSQLCPUTimeRecord(tsr, 4, 5),
}
tsr.Collect(6, records)

for i := 1; i < 7; i += 2 {
d := <-chs[i]
require.NotNil(t, d)
require.Equal(t, []tipb.CPUTimeRecord{{
SqlDigest: []byte("sqlDigest4"),
PlanDigest: []byte("planDigest4"),
RecordListTimestampSec: []uint64{6},
RecordListCpuTimeMs: []uint32{5},
}}, d.CPUTimeRecords)

require.Equal(t, []tipb.SQLMeta{{
SqlDigest: []byte("sqlDigest4"),
NormalizedSql: "sqlNormalized4",
IsInternalSql: true,
}}, d.SQLMetas)

require.Equal(t, []tipb.PlanMeta{{
PlanDigest: []byte("planDigest4"),
NormalizedPlan: "planNormalized4",
}}, d.PlanMetas)
}

for i := 0; i < 7; i += 2 {
select {
case <-chs[i]:
require.Fail(t, "unexpected to receive messages")
default:
}
}
}

func BenchmarkTopSQL_CollectAndIncrementFrequency(b *testing.B) {
tsr := initializeCache(maxSQLNum, 120, ":23333")
tsr, _ := initializeCache(maxSQLNum, 120, ":23333")
for i := 0; i < b.N; i++ {
populateCache(tsr, 0, maxSQLNum, uint64(i))
}
}

func BenchmarkTopSQL_CollectAndEvict(b *testing.B) {
tsr := initializeCache(maxSQLNum, 120, ":23333")
tsr, _ := initializeCache(maxSQLNum, 120, ":23333")
begin := 0
end := maxSQLNum
for i := 0; i < b.N; i++ {
Expand Down
Loading

0 comments on commit c0888f5

Please sign in to comment.