Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#10466
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
hongyunyan authored and ti-chi-bot committed Jun 4, 2024
1 parent 6277d9a commit bda46d7
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 37 deletions.
5 changes: 5 additions & 0 deletions cdc/sinkv2/eventsink/txn/txn_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,14 @@ func newSink(ctx context.Context, backends []backend,

g, ctx1 := errgroup.WithContext(ctx)
for i, backend := range backends {
<<<<<<< HEAD:cdc/sinkv2/eventsink/txn/txn_sink.go
w := newWorker(ctx1, i, backend, len(backends))
txnCh := sink.alive.conflictDetector.GetOutChByCacheID(int64(i))
g.Go(func() error { return w.runLoop(txnCh) })
=======
w := newWorker(ctx1, changefeedID, i, backend, len(backends))
g.Go(func() error { return w.run() })
>>>>>>> 50d96a6508 (mysql (ticdc): Improve the performance of the mysql sink by refining the transaction event batching logic (#10466)):cdc/sink/dmlsink/txn/txn_dml_sink.go
sink.workers = append(sink.workers, w)
}

Expand Down
100 changes: 67 additions & 33 deletions cdc/sinkv2/eventsink/txn/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type worker struct {
metricConflictDetectDuration prometheus.Observer
metricQueueDuration prometheus.Observer
metricTxnWorkerFlushDuration prometheus.Observer
metricTxnWorkerBusyRatio prometheus.Counter
metricTxnWorkerTotalDuration prometheus.Observer
metricTxnWorkerHandledRows prometheus.Counter

// Fields only used in the background loop.
Expand All @@ -61,8 +61,8 @@ func newWorker(ctx context.Context, ID int, backend backend, workerCount int) *w

metricConflictDetectDuration: txn.ConflictDetectDuration.WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricQueueDuration: txn.QueueDuration.WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricTxnWorkerFlushDuration: txn.WorkerFlushDuration.WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricTxnWorkerBusyRatio: txn.WorkerBusyRatio.WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricTxnWorkerFlushDuration: txn.WorkerFlushDuration.WithLabelValues(changefeedID.Namespace, changefeedID.ID, wid),
metricTxnWorkerTotalDuration: txn.WorkerTotalDuration.WithLabelValues(changefeedID.Namespace, changefeedID.ID, wid),
metricTxnWorkerHandledRows: txn.WorkerHandledRows.WithLabelValues(changefeedID.Namespace, changefeedID.ID, wid),

flushInterval: backend.MaxFlushInterval(),
Expand All @@ -71,8 +71,26 @@ func newWorker(ctx context.Context, ID int, backend backend, workerCount int) *w
}
}

<<<<<<< HEAD:cdc/sinkv2/eventsink/txn/worker.go
// Run a loop.
func (w *worker) runLoop(txnCh <-chan causality.TxnWithNotifier[*txnEvent]) error {
=======
// Add adds a txnEvent to the worker.
// The worker will call postTxnExecuted() after the txn executed.
// The postTxnExecuted will remove the txn related Node in the conflict detector's
// dependency graph and resolve related dependencies for these transacitons
// which depend on this executed txn.
func (w *worker) Add(txn *txnEvent, postTxnExecuted func()) {
w.txnCh.In() <- txnWithNotifier{txn, postTxnExecuted}
}

func (w *worker) close() {
w.txnCh.CloseAndDrain()
}

// Continuously get events from txnCh and call backend flush based on conditions.
func (w *worker) run() error {
>>>>>>> 50d96a6508 (mysql (ticdc): Improve the performance of the mysql sink by refining the transaction event batching logic (#10466)):cdc/sink/dmlsink/txn/worker.go
defer func() {
if err := w.backend.Close(); err != nil {
log.Info("Transaction dmlSink backend close fail",
Expand All @@ -85,43 +103,58 @@ func (w *worker) runLoop(txnCh <-chan causality.TxnWithNotifier[*txnEvent]) erro
zap.String("changefeedID", w.changefeed),
zap.Int("workerID", w.ID))

ticker := time.NewTicker(w.flushInterval)
defer ticker.Stop()

needFlush := false
var flushTimeSlice, totalTimeSlice time.Duration
overseerTicker := time.NewTicker(time.Second)
defer overseerTicker.Stop()
startToWork := time.Now()
start := time.Now()
for {
select {
case <-w.ctx.Done():
log.Info("Transaction dmlSink worker exits as canceled",
zap.String("changefeedID", w.changefeed),
zap.Int("workerID", w.ID))
return nil
<<<<<<< HEAD:cdc/sinkv2/eventsink/txn/worker.go
case txn := <-txnCh:
if txn.TxnEvent != nil {
needFlush = w.onEvent(txn.TxnEvent, txn.PostTxnExecuted)
=======
case txn := <-w.txnCh.Out():
// we get the data from txnCh.out until no more data here or reach the state that can be flushed.
// If no more data in txnCh.out, and also not reach the state that can be flushed,
// we will wait for 10ms and then do flush to avoid too much flush with small amount of txns.
if txn.txnEvent != nil {
needFlush := w.onEvent(txn)
if !needFlush {
delay := time.NewTimer(w.flushInterval)
for !needFlush {
select {
case txn := <-w.txnCh.Out():
needFlush = w.onEvent(txn)
case <-delay.C:
needFlush = true
}
}
// Release resources promptly
if !delay.Stop() {
select {
case <-delay.C:
default:
}
}
}
// needFlush must be true here, so we can do flush.
if err := w.doFlush(); err != nil {
log.Error("Transaction dmlSink worker exits unexpectly",
zap.String("changefeedID", w.changefeed),
zap.Int("workerID", w.ID),
zap.Error(err))
return err
}
// we record total time to calcuate the worker busy ratio.
// so we record the total time after flushing, to unified statistics on
// flush time and total time
w.metricTxnWorkerTotalDuration.Observe(time.Since(start).Seconds())
start = time.Now()
>>>>>>> 50d96a6508 (mysql (ticdc): Improve the performance of the mysql sink by refining the transaction event batching logic (#10466)):cdc/sink/dmlsink/txn/worker.go
}
case <-ticker.C:
needFlush = true
case now := <-overseerTicker.C:
totalTimeSlice = now.Sub(startToWork)
busyRatio := int(flushTimeSlice.Seconds() / totalTimeSlice.Seconds() * 1000)
w.metricTxnWorkerBusyRatio.Add(float64(busyRatio) / float64(w.workerCount))
startToWork = now
flushTimeSlice = 0
}
if needFlush {
if err := w.doFlush(&flushTimeSlice); err != nil {
log.Error("Transaction dmlSink worker exits unexpectly",
zap.String("changefeedID", w.changefeed),
zap.Int("workerID", w.ID),
zap.Error(err))
return err
}
needFlush = false
}
}
}
Expand All @@ -148,16 +181,17 @@ func (w *worker) onEvent(txn *txnEvent, postTxnExecuted func()) bool {
}

// doFlush flushes the backend.
<<<<<<< HEAD:cdc/sinkv2/eventsink/txn/worker.go
// It returns true only if it can no longer be flushed.
func (w *worker) doFlush(flushTimeSlice *time.Duration) error {
=======
func (w *worker) doFlush() error {
>>>>>>> 50d96a6508 (mysql (ticdc): Improve the performance of the mysql sink by refining the transaction event batching logic (#10466)):cdc/sink/dmlsink/txn/worker.go
if w.hasPending {
start := time.Now()
defer func() {
elapsed := time.Since(start)
*flushTimeSlice += elapsed
w.metricTxnWorkerFlushDuration.Observe(elapsed.Seconds())
w.metricTxnWorkerFlushDuration.Observe(time.Since(start).Seconds())
}()

if err := w.backend.Flush(w.ctx); err != nil {
log.Warn("Transaction sink backend flush fail",
zap.String("changefeedID", w.changefeed),
Expand Down
16 changes: 12 additions & 4 deletions cdc/sinkv2/metrics/txn/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,23 @@ var (
Name: "txn_worker_flush_duration",
Help: "Flush duration (s) for txn worker.",
Buckets: prometheus.ExponentialBuckets(0.001, 2, 20), // 1ms~524s
}, []string{"namespace", "changefeed"})
}, []string{"namespace", "changefeed", "id"})

WorkerBusyRatio = prometheus.NewCounterVec(
prometheus.CounterOpts{
WorkerTotalDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "ticdc",
<<<<<<< HEAD:cdc/sinkv2/metrics/txn/metrics.go
Subsystem: "sinkv2",
Name: "txn_worker_busy_ratio",
Help: "Busy ratio (X ms in 1s) for all workers.",
}, []string{"namespace", "changefeed"})
=======
Subsystem: "sink",
Name: "txn_worker_total_duration",
Help: "total duration (s) for txn worker.",
Buckets: prometheus.ExponentialBuckets(0.0001, 2, 20), // 1ms~524s
}, []string{"namespace", "changefeed", "id"})
>>>>>>> 50d96a6508 (mysql (ticdc): Improve the performance of the mysql sink by refining the transaction event batching logic (#10466)):cdc/sink/metrics/txn/metrics.go

WorkerHandledRows = prometheus.NewCounterVec(
prometheus.CounterOpts{
Expand Down Expand Up @@ -86,7 +94,7 @@ func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(ConflictDetectDuration)
registry.MustRegister(QueueDuration)
registry.MustRegister(WorkerFlushDuration)
registry.MustRegister(WorkerBusyRatio)
registry.MustRegister(WorkerTotalDuration)
registry.MustRegister(WorkerHandledRows)
registry.MustRegister(SinkDMLBatchCommit)
registry.MustRegister(SinkDMLBatchCallback)
Expand Down
6 changes: 6 additions & 0 deletions metrics/grafana/ticdc.json
Original file line number Diff line number Diff line change
Expand Up @@ -7481,9 +7481,15 @@
"targets": [
{
"exemplar": true,
<<<<<<< HEAD
"expr": "sum(rate(ticdc_sinkv2_txn_worker_busy_ratio{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\",changefeed=~\"$changefeed\",instance=~\"$ticdc_instance\"}[1m])/10) by (changefeed,instance)",
"interval": "",
"legendFormat": "{{changefeed}}-{{instance}}",
=======
"expr": "sum(rate(ticdc_sink_txn_worker_flush_duration_sum{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\",namespace=~\"$namespace\",changefeed=~\"$changefeed\",instance=~\"$ticdc_instance\"}[1m])) by (namespace,changefeed,instance,id) /sum(rate(ticdc_sink_txn_worker_total_duration_sum{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\",namespace=~\"$namespace\",changefeed=~\"$changefeed\",instance=~\"$ticdc_instance\"}[1m])) by (namespace,changefeed,instance,id) *100",
"interval": "",
"legendFormat": "{{namespace}}-{{changefeed}}-{{instance}}-worker-{{id}}",
>>>>>>> 50d96a6508 (mysql (ticdc): Improve the performance of the mysql sink by refining the transaction event batching logic (#10466))
"queryType": "randomWalk",
"refId": "A"
}
Expand Down

0 comments on commit bda46d7

Please sign in to comment.