Skip to content

Commit

Permalink
mysql-sink: add more performance related metrics (#702)
Browse files Browse the repository at this point in the history
  • Loading branch information
amyangfei authored Jul 1, 2020
1 parent 6c70ab0 commit bd0ce53
Show file tree
Hide file tree
Showing 4 changed files with 282 additions and 41 deletions.
17 changes: 17 additions & 0 deletions cdc/sink/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,28 @@ var (
Name: "execution_error",
Help: "total count of execution errors",
}, []string{"capture", "changefeed"})
conflictDetectDurationHis = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "ticdc",
Subsystem: "sink",
Name: "conflict_detect_duration",
Help: "Bucketed histogram of conflict detect time (s) for single DML statement",
Buckets: prometheus.ExponentialBuckets(0.00005, 2, 21),
}, []string{"capture", "changefeed"})
bucketSizeCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "ticdc",
Subsystem: "sink",
Name: "bucket_size",
Help: "size of the DML bucket",
}, []string{"capture", "changefeed", "bucket"})
)

// InitMetrics registers all metrics in this file
func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(execBatchHistogram)
registry.MustRegister(execTxnHistogram)
registry.MustRegister(executionErrorCounter)
registry.MustRegister(conflictDetectDurationHis)
registry.MustRegister(bucketSizeCounter)
}
101 changes: 61 additions & 40 deletions cdc/sink/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/pingcap/ticdc/pkg/util"
tddl "github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/infoschema"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
Expand All @@ -69,6 +70,10 @@ type mysqlSink struct {
unresolvedTxns map[model.TableName][]*model.Txn

statistics *Statistics

// metrics used by mysql sink only
metricConflictDetectDurationHis prometheus.Observer
metricBucketSizeCounters []prometheus.Counter
}

func (s *mysqlSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
Expand Down Expand Up @@ -345,7 +350,9 @@ func newMySQLSink(ctx context.Context, sinkURI *url.URL, dsn *dmysql.Config, fil
if err != nil {
return nil, errors.Trace(err)
}
params.workerCount = c
if c > 0 {
params.workerCount = c
}
}
s = sinkURI.Query().Get("max-txn-row")
if s != "" {
Expand Down Expand Up @@ -405,12 +412,22 @@ func newMySQLSink(ctx context.Context, sinkURI *url.URL, dsn *dmysql.Config, fil
db.SetMaxIdleConns(params.workerCount)
db.SetMaxOpenConns(params.workerCount)

metricConflictDetectDurationHis := conflictDetectDurationHis.WithLabelValues(
params.captureAddr, params.changefeedID)
metricBucketSizeCounters := make([]prometheus.Counter, params.workerCount)
for i := 0; i < params.workerCount; i++ {
metricBucketSizeCounters[i] = bucketSizeCounter.WithLabelValues(
params.captureAddr, params.changefeedID, strconv.Itoa(i))
}

sink := &mysqlSink{
db: db,
unresolvedTxns: make(map[model.TableName][]*model.Txn),
params: params,
filter: filter,
statistics: NewStatistics("mysql", opts),
db: db,
unresolvedTxns: make(map[model.TableName][]*model.Txn),
params: params,
filter: filter,
statistics: NewStatistics("mysql", opts),
metricConflictDetectDurationHis: metricConflictDetectDurationHis,
metricBucketSizeCounters: metricBucketSizeCounters,
}

if val, ok := opts[mark.OptCyclicConfig]; ok {
Expand All @@ -431,21 +448,12 @@ func newMySQLSink(ctx context.Context, sinkURI *url.URL, dsn *dmysql.Config, fil
}

func (s *mysqlSink) concurrentExec(ctx context.Context, txnsGroup map[model.TableName][]*model.Txn) error {
return concurrentExec(ctx, txnsGroup, s.params.workerCount, s.params.maxTxnRow, s.execDMLs)
}

func concurrentExec(
ctx context.Context, txnsGroup map[model.TableName][]*model.Txn, nWorkers, maxTxnRow int,
execDMLs func(context.Context, []*model.RowChangedEvent, uint64, int) error,
) error {
if nWorkers == 0 {
nWorkers = defaultParams.workerCount
}
nWorkers := s.params.workerCount
workers := make([]*mysqlSinkWorker, nWorkers)
errg, ctx := errgroup.WithContext(ctx)
for i := 0; i < nWorkers; i++ {
i := i
workers[i] = newMySQLSinkWorker(maxTxnRow, i, execDMLs)
workers[i] = newMySQLSinkWorker(s.params.maxTxnRow, i, s.metricBucketSizeCounters[i], s.execDMLs)
errg.Go(func() error {
return workers[i].run(ctx)
})
Expand All @@ -457,21 +465,26 @@ func concurrentExec(
causality.add(txn.Keys, idx)
workers[idx].appendTxn(ctx, txn)
}
resolveConflict := func(txn *model.Txn) {
if conflict, idx := causality.detectConflict(txn.Keys); conflict {
if idx >= 0 {
sendFn(txn, idx)
return
}
for _, w := range workers {
w.waitAllTxnsExecuted()
}
causality.reset()
}
sendFn(txn, rowsChIdx)
rowsChIdx++
rowsChIdx = rowsChIdx % nWorkers
}
for _, txns := range txnsGroup {
for _, txn := range txns {
if conflict, idx := causality.detectConflict(txn.Keys); conflict {
if idx >= 0 {
sendFn(txn, idx)
continue
}
for _, w := range workers {
w.waitAllTxnsExecuted()
}
causality.reset()
}
sendFn(txn, rowsChIdx)
rowsChIdx++
rowsChIdx = rowsChIdx % nWorkers
startTime := time.Now()
resolveConflict(txn)
s.metricConflictDetectDurationHis.Observe(time.Since(startTime).Seconds())
}
}
for _, w := range workers {
Expand All @@ -481,19 +494,26 @@ func concurrentExec(
}

type mysqlSinkWorker struct {
txnCh chan *model.Txn
txnWg sync.WaitGroup
maxTxnRow int
bucket int
execDMLs func(context.Context, []*model.RowChangedEvent, uint64, int) error
txnCh chan *model.Txn
txnWg sync.WaitGroup
maxTxnRow int
bucket int
execDMLs func(context.Context, []*model.RowChangedEvent, uint64, int) error
metricBucketSize prometheus.Counter
}

func newMySQLSinkWorker(maxTxnRow int, bucket int, execDMLs func(context.Context, []*model.RowChangedEvent, uint64, int) error) *mysqlSinkWorker {
func newMySQLSinkWorker(
maxTxnRow int,
bucket int,
metricBucketSize prometheus.Counter,
execDMLs func(context.Context, []*model.RowChangedEvent, uint64, int) error,
) *mysqlSinkWorker {
return &mysqlSinkWorker{
txnCh: make(chan *model.Txn, 1024),
maxTxnRow: maxTxnRow,
bucket: bucket,
execDMLs: execDMLs,
txnCh: make(chan *model.Txn, 1024),
maxTxnRow: maxTxnRow,
bucket: bucket,
metricBucketSize: metricBucketSize,
execDMLs: execDMLs,
}
}

Expand Down Expand Up @@ -546,6 +566,7 @@ func (w *mysqlSinkWorker) run(ctx context.Context) (err error) {
}
toExecRows = toExecRows[:0]
w.txnWg.Add(-1 * txnNum)
w.metricBucketSize.Add(float64(txnNum))
txnNum = 0
lastExecTime = time.Now()
return nil
Expand Down
1 change: 1 addition & 0 deletions cdc/sink/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ func (s MySQLSinkSuite) TestMysqlSinkWorker(c *check.C) {
var outputRows [][]*model.RowChangedEvent
var outputReplicaIDs []uint64
w := newMySQLSinkWorker(tc.maxTxnRow, 1,
bucketSizeCounter.WithLabelValues("capture", "changefeed", "1"),
func(ctx context.Context, events []*model.RowChangedEvent, replicaID uint64, bucket int) error {
outputRows = append(outputRows, events)
outputReplicaIDs = append(outputReplicaIDs, replicaID)
Expand Down
Loading

0 comments on commit bd0ce53

Please sign in to comment.