diff --git a/cdc/model/kv.go b/cdc/model/kv.go index 0c5fe65d5c7..1b0977b59bb 100644 --- a/cdc/model/kv.go +++ b/cdc/model/kv.go @@ -16,6 +16,7 @@ package model import ( + "errors" "fmt" "github.com/pingcap/tiflow/pkg/regionspan" @@ -110,3 +111,20 @@ func (v *RawKVEntry) String() string { func (v *RawKVEntry) ApproximateDataSize() int64 { return int64(len(v.Key) + len(v.Value) + len(v.OldValue)) } + +// ShouldSplitKVEntry checks whether the raw kv entry should be splitted. +type ShouldSplitKVEntry func(raw *RawKVEntry) bool + +// SplitUpdateKVEntry splits the raw kv entry into a delete entry and an insert entry. +func SplitUpdateKVEntry(raw *RawKVEntry) (*RawKVEntry, *RawKVEntry, error) { + if raw == nil { + return nil, nil, errors.New("nil event cannot be split") + } + deleteKVEntry := *raw + deleteKVEntry.Value = nil + + insertKVEntry := *raw + insertKVEntry.OldValue = nil + + return &deleteKVEntry, &insertKVEntry, nil +} diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index 5aba460e0a3..865f2d72485 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -219,11 +219,11 @@ func (p *processor) AddTable( } if p.pullBasedSinking { - p.sinkManager.AddTable(tableID, startTs, p.changefeed.Info.TargetTs) + table := p.sinkManager.AddTable(tableID, startTs, p.changefeed.Info.TargetTs) if p.redoDMLMgr.Enabled() { p.redoDMLMgr.AddTable(tableID, startTs) } - p.sourceManager.AddTable(ctx.(cdcContext.Context), tableID, p.getTableName(ctx, tableID), startTs) + p.sourceManager.AddTable(ctx.(cdcContext.Context), tableID, p.getTableName(ctx, tableID), startTs, table.GetReplicaTs) } else { table, err := p.createTablePipeline( ctx.(cdcContext.Context), tableID, &model.TableReplicaInfo{StartTs: startTs}) @@ -232,7 +232,6 @@ func (p *processor) AddTable( } p.tables[tableID] = table } - return true, nil } @@ -570,18 +569,6 @@ func isProcessorIgnorableError(err error) bool { return false } -// needPullerSafeModeAtStart returns true if the scheme is mysql compatible. -// pullerSafeMode means to split all update kv entries whose commitTS -// is older then the start time of this changefeed. -func needPullerSafeModeAtStart(sinkURIStr string) (bool, error) { - sinkURI, err := url.Parse(sinkURIStr) - if err != nil { - return false, cerror.WrapError(cerror.ErrSinkURIInvalid, err) - } - scheme := sink.GetScheme(sinkURI) - return sink.IsMySQLCompatibleScheme(scheme), nil -} - // Tick implements the `orchestrator.State` interface // the `state` parameter is sent by the etcd worker, the `state` must be a snapshot of KVs in etcd // The main logic of processor is in this function, including the calculation of many kinds of ts, @@ -757,6 +744,16 @@ func (p *processor) createTaskPosition() (skipThisTick bool) { return true } +// isMysqlCompatibleBackend returns true if the sinkURIStr is mysql compatible. +func isMysqlCompatibleBackend(sinkURIStr string) (bool, error) { + sinkURI, err := url.Parse(sinkURIStr) + if err != nil { + return false, cerror.WrapError(cerror.ErrSinkURIInvalid, err) + } + scheme := sink.GetScheme(sinkURI) + return sink.IsMySQLCompatibleScheme(scheme), nil +} + // lazyInitImpl create Filter, SchemaStorage, Mounter instances at the first tick. func (p *processor) lazyInitImpl(ctx cdcContext.Context) error { if p.initialized { @@ -847,16 +844,16 @@ func (p *processor) lazyInitImpl(ctx cdcContext.Context) error { zap.Duration("duration", time.Since(start))) return errors.Trace(err) } - pullerSafeModeAtStart, err := needPullerSafeModeAtStart(p.changefeed.Info.SinkURI) + isMysqlBackend, err := isMysqlCompatibleBackend(p.changefeed.Info.SinkURI) if err != nil { return errors.Trace(err) } p.sourceManager = sourcemanager.New(p.changefeedID, p.upstream, p.mg, - sortEngine, p.errCh, p.changefeed.Info.Config.BDRMode, pullerSafeModeAtStart) + sortEngine, p.errCh, p.changefeed.Info.Config.BDRMode, isMysqlBackend) p.sinkManager, err = sinkmanager.New(stdCtx, p.changefeedID, p.changefeed.Info, p.upstream, p.schemaStorage, p.redoDMLMgr, p.sourceManager, - p.errCh, p.warnCh, p.metricsTableSinkTotalRows, p.metricsTableSinkFlushLagDuration) + p.errCh, p.warnCh, isMysqlBackend, p.metricsTableSinkTotalRows, p.metricsTableSinkFlushLagDuration) if err != nil { log.Info("Processor creates sink manager fail", zap.String("namespace", p.changefeedID.Namespace), diff --git a/cdc/processor/sinkmanager/manager.go b/cdc/processor/sinkmanager/manager.go index 5801343903a..85d8099e23e 100644 --- a/cdc/processor/sinkmanager/manager.go +++ b/cdc/processor/sinkmanager/manager.go @@ -128,6 +128,9 @@ type SinkManager struct { // wg is used to wait for all workers to exit. wg sync.WaitGroup + // isMysqlBackend indicates whether the backend is MySQL compatible. + isMysqlBackend bool + // Metric for table sink. metricsTableSinkTotalRows prometheus.Counter @@ -145,6 +148,7 @@ func New( sourceManager *sourcemanager.SourceManager, errChan chan error, warnChan chan error, + isMysqlBackend bool, metricsTableSinkTotalRows prometheus.Counter, metricsTableSinkFlushLagDuration prometheus.Observer, ) (*SinkManager, error) { @@ -160,6 +164,7 @@ func New( sinkTaskChan: make(chan *sinkTask), sinkWorkerAvailable: make(chan struct{}, 1), sinkRetry: retry.NewInfiniteErrorRetry(), + isMysqlBackend: isMysqlBackend, metricsTableSinkTotalRows: metricsTableSinkTotalRows, @@ -312,6 +317,11 @@ func (m *SinkManager) run(ctx context.Context, warnings ...chan<- error) (err er if cerror.IsDupEntryError(err) { return errors.Trace(err) } + + if m.isMysqlBackend { + // For MySQL backend, we should restart sink. Let owner to handle the error. + return errors.Trace(err) + } } // If the error is retryable, we should retry to re-establish the internal resources. @@ -848,7 +858,7 @@ func (m *SinkManager) UpdateBarrierTs(globalBarrierTs model.Ts, tableBarrier map } // AddTable adds a table(TableSink) to the sink manager. -func (m *SinkManager) AddTable(tableID model.TableID, startTs model.Ts, targetTs model.Ts) { +func (m *SinkManager) AddTable(tableID model.TableID, startTs model.Ts, targetTs model.Ts) *tableSinkWrapper { sinkWrapper := newTableSinkWrapper( m.changefeedID, tableID, @@ -876,7 +886,6 @@ func (m *SinkManager) AddTable(tableID model.TableID, startTs model.Ts, targetTs zap.String("namespace", m.changefeedID.Namespace), zap.String("changefeed", m.changefeedID.ID), zap.Int64("tableID", tableID)) - return } m.sinkMemQuota.AddTable(tableID) m.redoMemQuota.AddTable(tableID) @@ -886,6 +895,7 @@ func (m *SinkManager) AddTable(tableID model.TableID, startTs model.Ts, targetTs zap.Int64("tableID", tableID), zap.Uint64("startTs", startTs), zap.Uint64("version", sinkWrapper.version)) + return sinkWrapper } // StartTable sets the table(TableSink) state to replicating. diff --git a/cdc/processor/sinkmanager/manager_test.go b/cdc/processor/sinkmanager/manager_test.go index 12818eeeec8..b6d98f68da4 100644 --- a/cdc/processor/sinkmanager/manager_test.go +++ b/cdc/processor/sinkmanager/manager_test.go @@ -63,6 +63,7 @@ func createManagerWithMemEngine( &entry.MockSchemaStorage{Resolved: math.MaxUint64}, nil, sm, errChan, errChan, + false, prometheus.NewCounter(prometheus.CounterOpts{}), prometheus.NewHistogram(prometheus.HistogramOpts{})) require.NoError(t, err) @@ -166,7 +167,7 @@ func TestAddTable(t *testing.T) { require.Equal(t, 0, manager.sinkProgressHeap.len(), "Not started table shout not in progress heap") err := manager.StartTable(tableID, 1) require.NoError(t, err) - require.Equal(t, uint64(0x7ffffffffffbffff), tableSink.(*tableSinkWrapper).replicateTs) + require.Equal(t, uint64(0x7ffffffffffbffff), tableSink.(*tableSinkWrapper).replicateTs.Load()) progress := manager.sinkProgressHeap.pop() require.Equal(t, tableID, progress.tableID) diff --git a/cdc/processor/sinkmanager/redo_log_worker.go b/cdc/processor/sinkmanager/redo_log_worker.go index 2e7b566cd69..a8e4d642e85 100644 --- a/cdc/processor/sinkmanager/redo_log_worker.go +++ b/cdc/processor/sinkmanager/redo_log_worker.go @@ -276,8 +276,8 @@ func (w *redoWorker) handleTask(ctx context.Context, task *redoTask) (finalErr e var x []*model.RowChangedEvent var size uint64 if e.Row != nil { - // For all rows, we add table replicate ts, so mysql sink can determine safe-mode. - e.Row.ReplicatingTs = task.tableSink.replicateTs + // For all events, we add table replicate ts, so mysql sink can determine safe-mode. + e.Row.ReplicatingTs = task.tableSink.replicateTs.Load() x, size = handleRowChangedEvents(w.changefeedID, task.tableID, e) usedMemSize += size rows = append(rows, x...) diff --git a/cdc/processor/sinkmanager/table_sink_worker.go b/cdc/processor/sinkmanager/table_sink_worker.go index 4a55b92983f..b11e4347d73 100644 --- a/cdc/processor/sinkmanager/table_sink_worker.go +++ b/cdc/processor/sinkmanager/table_sink_worker.go @@ -363,7 +363,7 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e // NOTICE: The event can be filtered by the event filter. if e.Row != nil { // For all rows, we add table replicate ts, so mysql sink can determine safe-mode. - e.Row.ReplicatingTs = task.tableSink.replicateTs + e.Row.ReplicatingTs = task.tableSink.GetReplicaTs() x, size := handleRowChangedEvents(w.changefeedID, task.tableID, e) events = append(events, x...) allEventSize += size diff --git a/cdc/processor/sinkmanager/table_sink_wrapper.go b/cdc/processor/sinkmanager/table_sink_wrapper.go index 6d253413cc7..839b661eec7 100644 --- a/cdc/processor/sinkmanager/table_sink_wrapper.go +++ b/cdc/processor/sinkmanager/table_sink_wrapper.go @@ -15,6 +15,7 @@ package sinkmanager import ( "context" + "math" "sort" "sync" "sync/atomic" @@ -76,7 +77,7 @@ type tableSinkWrapper struct { receivedSorterResolvedTs atomic.Uint64 // replicateTs is the ts that the table sink has started to replicate. - replicateTs model.Ts + replicateTs atomic.Uint64 genReplicateTs func(ctx context.Context) (model.Ts, error) // lastCleanTime indicates the last time the table has been cleaned. @@ -89,6 +90,11 @@ type tableSinkWrapper struct { rangeEventCountsMu sync.Mutex } +// GetReplicaTs returns the replicate ts of the table sink. +func (t *tableSinkWrapper) GetReplicaTs() model.Ts { + return t.replicateTs.Load() +} + type rangeEventCount struct { // firstPos and lastPos are used to merge many rangeEventCount into one. firstPos engine.Position @@ -131,31 +137,34 @@ func newTableSinkWrapper( res.receivedSorterResolvedTs.Store(startTs) res.barrierTs.Store(startTs) + res.replicateTs.Store(math.MaxUint64) return res } func (t *tableSinkWrapper) start(ctx context.Context, startTs model.Ts) (err error) { - if t.replicateTs != 0 { + if t.replicateTs.Load() != math.MaxUint64 { log.Panic("The table sink has already started", zap.String("namespace", t.changefeed.Namespace), zap.String("changefeed", t.changefeed.ID), zap.Int64("tableID", t.tableID), zap.Uint64("startTs", startTs), - zap.Uint64("oldReplicateTs", t.replicateTs), + zap.Uint64("oldReplicateTs", t.replicateTs.Load()), ) } // FIXME(qupeng): it can be re-fetched later instead of fails. - if t.replicateTs, err = t.genReplicateTs(ctx); err != nil { + ts, err := t.genReplicateTs(ctx) + if err != nil { return errors.Trace(err) } + t.replicateTs.Store(ts) log.Info("Sink is started", zap.String("namespace", t.changefeed.Namespace), zap.String("changefeed", t.changefeed.ID), zap.Int64("tableID", t.tableID), zap.Uint64("startTs", startTs), - zap.Uint64("replicateTs", t.replicateTs), + zap.Uint64("replicateTs", ts), ) // This start ts maybe greater than the initial start ts of the table sink. @@ -378,14 +387,16 @@ func (t *tableSinkWrapper) checkTableSinkHealth() (err error) { // committed at downstream but we don't know. So we need to update `replicateTs` // of the table so that we can re-send those events later. func (t *tableSinkWrapper) restart(ctx context.Context) (err error) { - if t.replicateTs, err = t.genReplicateTs(ctx); err != nil { + ts, err := t.genReplicateTs(ctx) + if err != nil { return errors.Trace(err) } + t.replicateTs.Store(ts) log.Info("Sink is restarted", zap.String("namespace", t.changefeed.Namespace), zap.String("changefeed", t.changefeed.ID), zap.Int64("tableID", t.tableID), - zap.Uint64("replicateTs", t.replicateTs)) + zap.Uint64("replicateTs", ts)) return nil } diff --git a/cdc/processor/sourcemanager/manager.go b/cdc/processor/sourcemanager/manager.go index 9064da82cb6..2ec2b205ba3 100644 --- a/cdc/processor/sourcemanager/manager.go +++ b/cdc/processor/sourcemanager/manager.go @@ -14,11 +14,9 @@ package sourcemanager import ( - "context" "sync" "time" - "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/entry" "github.com/pingcap/tiflow/cdc/model" @@ -28,10 +26,7 @@ import ( "github.com/pingcap/tiflow/cdc/puller" cdccontext "github.com/pingcap/tiflow/pkg/context" cerrors "github.com/pingcap/tiflow/pkg/errors" - "github.com/pingcap/tiflow/pkg/retry" "github.com/pingcap/tiflow/pkg/upstream" - "github.com/tikv/client-go/v2/oracle" - pd "github.com/tikv/pd/client" "go.uber.org/zap" ) @@ -56,7 +51,6 @@ type SourceManager struct { bdrMode bool safeModeAtStart bool - startTs model.Ts } // New creates a new source manager. @@ -69,13 +63,6 @@ func New( bdrMode bool, safeModeAtStart bool, ) *SourceManager { - startTs, err := getCurrentTs(context.Background(), up.PDClient) - if err != nil { - log.Panic("Cannot get current ts when creating source manager", - zap.String("namespace", changefeedID.Namespace), - zap.String("changefeed", changefeedID.ID)) - return nil - } return &SourceManager{ changefeedID: changefeedID, up: up, @@ -84,35 +71,21 @@ func New( errChan: errChan, bdrMode: bdrMode, safeModeAtStart: safeModeAtStart, - startTs: startTs, } } -func isOldUpdateKVEntry(raw *model.RawKVEntry, thresholdTs model.Ts) bool { - return raw != nil && raw.IsUpdate() && raw.CRTs < thresholdTs -} - -func splitUpdateKVEntry(raw *model.RawKVEntry) (*model.RawKVEntry, *model.RawKVEntry, error) { - if raw == nil { - return nil, nil, errors.New("nil event cannot be split") - } - deleteKVEntry := *raw - deleteKVEntry.Value = nil - - insertKVEntry := *raw - insertKVEntry.OldValue = nil - - return &deleteKVEntry, &insertKVEntry, nil +func isOldUpdateKVEntry(raw *model.RawKVEntry, getReplicaTs func() model.Ts) bool { + return raw != nil && raw.IsUpdate() && raw.CRTs < getReplicaTs() } // AddTable adds a table to the source manager. Start puller and register table to the engine. -func (m *SourceManager) AddTable(ctx cdccontext.Context, tableID model.TableID, tableName string, startTs model.Ts) { +func (m *SourceManager) AddTable(ctx cdccontext.Context, tableID model.TableID, tableName string, startTs model.Ts, getReplicaTs func() model.Ts) { // Add table to the engine first, so that the engine can receive the events from the puller. m.engine.AddTable(tableID) shouldSplitKVEntry := func(raw *model.RawKVEntry) bool { - return m.safeModeAtStart && isOldUpdateKVEntry(raw, m.startTs) + return m.safeModeAtStart && isOldUpdateKVEntry(raw, getReplicaTs) } - p := pullerwrapper.NewPullerWrapper(m.changefeedID, tableID, tableName, startTs, m.bdrMode, shouldSplitKVEntry, splitUpdateKVEntry) + p := pullerwrapper.NewPullerWrapper(m.changefeedID, tableID, tableName, startTs, m.bdrMode, shouldSplitKVEntry) p.Start(ctx, m.up, m.engine, m.errChan) m.pullers.Store(tableID, p) } @@ -185,23 +158,3 @@ func (m *SourceManager) Close() error { zap.Duration("cost", time.Since(start))) return nil } - -func getCurrentTs(ctx context.Context, pdClient pd.Client) (model.Ts, error) { - backoffBaseDelayInMs := int64(100) - totalRetryDuration := 10 * time.Second - var replicateTs model.Ts - err := retry.Do(ctx, func() error { - phy, logic, err := pdClient.GetTS(ctx) - if err != nil { - return errors.Trace(err) - } - replicateTs = oracle.ComposeTS(phy, logic) - return nil - }, retry.WithBackoffBaseDelay(backoffBaseDelayInMs), - retry.WithTotalRetryDuratoin(totalRetryDuration), - retry.WithIsRetryableErr(cerrors.IsRetryableError)) - if err != nil { - return model.Ts(0), errors.Trace(err) - } - return replicateTs, nil -} diff --git a/cdc/processor/sourcemanager/puller/puller_wrapper.go b/cdc/processor/sourcemanager/puller/puller_wrapper.go index 1fb7a2ac934..5dd94bb212a 100644 --- a/cdc/processor/sourcemanager/puller/puller_wrapper.go +++ b/cdc/processor/sourcemanager/puller/puller_wrapper.go @@ -32,12 +32,6 @@ import ( "go.uber.org/zap" ) -// ShouldSplitKVEntry checks whether the raw kv entry should be splitted. -type ShouldSplitKVEntry func(raw *model.RawKVEntry) bool - -// SplitUpdateKVEntry splits the raw kv entry into a delete entry and an insert entry. -type SplitUpdateKVEntry func(raw *model.RawKVEntry) (*model.RawKVEntry, *model.RawKVEntry, error) - // Wrapper is a wrapper of puller used by source manager. type Wrapper struct { changefeed model.ChangeFeedID @@ -51,8 +45,7 @@ type Wrapper struct { wg sync.WaitGroup bdrMode bool - shouldSplitKVEntry ShouldSplitKVEntry - splitUpdateKVEntry SplitUpdateKVEntry + shouldSplitKVEntry model.ShouldSplitKVEntry } // NewPullerWrapper creates a new puller wrapper. @@ -62,8 +55,7 @@ func NewPullerWrapper( tableName string, startTs model.Ts, bdrMode bool, - shouldSplitKVEntry ShouldSplitKVEntry, - splitUpdateKVEntry SplitUpdateKVEntry, + shouldSplitKVEntry model.ShouldSplitKVEntry, ) *Wrapper { return &Wrapper{ changefeed: changefeed, @@ -72,7 +64,6 @@ func NewPullerWrapper( startTs: startTs, bdrMode: bdrMode, shouldSplitKVEntry: shouldSplitKVEntry, - splitUpdateKVEntry: splitUpdateKVEntry, } } @@ -146,7 +137,7 @@ func (n *Wrapper) Start( continue } if n.shouldSplitKVEntry(rawKV) { - deleteKVEntry, insertKVEntry, err := n.splitUpdateKVEntry(rawKV) + deleteKVEntry, insertKVEntry, err := model.SplitUpdateKVEntry(rawKV) if err != nil { log.Panic("failed to split update kv entry", zap.Error(err)) return