diff --git a/cdc/sinkv2/eventsink/txn/worker.go b/cdc/sinkv2/eventsink/txn/worker.go index b85cafa5c4f..c387cec835f 100644 --- a/cdc/sinkv2/eventsink/txn/worker.go +++ b/cdc/sinkv2/eventsink/txn/worker.go @@ -20,7 +20,6 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/contextutil" - "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sinkv2/metrics/txn" "github.com/pingcap/tiflow/cdc/sinkv2/tablesink/state" "github.com/pingcap/tiflow/pkg/causality" @@ -47,8 +46,6 @@ type worker struct { flushInterval time.Duration hasPending bool postTxnExecutedCallbacks []func() - - lastSlowConflictDetectLog map[model.TableID]time.Time } func newWorker(ctx context.Context, ID int, backend backend, workerCount int) *worker { @@ -71,8 +68,6 @@ func newWorker(ctx context.Context, ID int, backend backend, workerCount int) *w flushInterval: backend.MaxFlushInterval(), hasPending: false, postTxnExecutedCallbacks: make([]func(), 0, 1024), - - lastSlowConflictDetectLog: make(map[model.TableID]time.Time), } } @@ -90,9 +85,6 @@ func (w *worker) run(txnCh <-chan causality.TxnWithNotifier[*txnEvent]) error { zap.String("changefeedID", w.changefeed), zap.Int("workerID", w.ID)) - cleanSlowLogHistory := time.NewTicker(time.Hour) - defer cleanSlowLogHistory.Stop() - start := time.Now() for { select { @@ -101,15 +93,6 @@ func (w *worker) run(txnCh <-chan causality.TxnWithNotifier[*txnEvent]) error { zap.String("changefeedID", w.changefeed), zap.Int("workerID", w.ID)) return nil - case <-cleanSlowLogHistory.C: - lastSlowConflictDetectLog := w.lastSlowConflictDetectLog - w.lastSlowConflictDetectLog = make(map[model.TableID]time.Time) - now := time.Now() - for tableID, lastLog := range lastSlowConflictDetectLog { - if now.Sub(lastLog) <= time.Minute { - w.lastSlowConflictDetectLog[tableID] = lastLog - } - } case txn := <-txnCh: // we get the data from txnCh.out until no more data here or reach the state that can be flushed. // If no more data in txnCh.out, and also not reach the state that can be flushed, @@ -166,24 +149,8 @@ func (w *worker) onEvent(txn *txnEvent, postTxnExecuted func()) bool { return false } - conflictDetectTime := txn.conflictResolved.Sub(txn.start).Seconds() - w.metricConflictDetectDuration.Observe(conflictDetectTime) + w.metricConflictDetectDuration.Observe(txn.conflictResolved.Sub(txn.start).Seconds()) w.metricQueueDuration.Observe(time.Since(txn.start).Seconds()) - - // Log tables which conflict detect time larger than 1 minute. - if conflictDetectTime > float64(60) { - now := time.Now() - // Log slow conflict detect tables every minute. - if lastLog, ok := w.lastSlowConflictDetectLog[txn.Event.TableInfo.ID]; !ok || now.Sub(lastLog) > time.Minute { - log.Warn("Transaction dmlSink finds a slow transaction in conflict detector", - zap.String("changefeedID", w.changefeed), - zap.Int("workerID", w.ID), - zap.Int64("TableID", txn.Event.TableInfo.ID), - zap.Float64("seconds", conflictDetectTime)) - w.lastSlowConflictDetectLog[txn.Event.Table.TableID] = now - } - } - w.metricTxnWorkerHandledRows.Add(float64(len(txn.Event.Rows))) w.postTxnExecutedCallbacks = append(w.postTxnExecutedCallbacks, postTxnExecuted) return w.backend.OnTxnEvent(txn.TxnCallbackableEvent)