From f25075dcc9635326f2603aaf8e42162918ded5ee Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Tue, 11 Jun 2024 20:35:59 +0800 Subject: [PATCH 1/2] puller(ticdc): fix wrong update splitting behavior after table scheduling (#11269) (#11283) close pingcap/tiflow#11219 --- cdc/model/kv.go | 18 +++++ cdc/processor/processor.go | 32 +++++---- cdc/processor/sinkmanager/manager.go | 15 ++++- cdc/processor/sinkmanager/manager_test.go | 4 +- .../sinkmanager/manager_test_helper.go | 4 +- cdc/processor/sinkmanager/redo_log_worker.go | 2 +- .../sinkmanager/table_sink_worker.go | 2 +- .../sinkmanager/table_sink_wrapper.go | 25 +++++-- cdc/processor/sourcemanager/manager.go | 65 ++++--------------- cdc/puller/ddl_puller.go | 3 +- cdc/puller/ddl_puller_test.go | 4 +- cdc/puller/multiplexing_puller.go | 16 +++-- cdc/puller/multiplexing_puller_test.go | 7 +- 13 files changed, 103 insertions(+), 94 deletions(-) diff --git a/cdc/model/kv.go b/cdc/model/kv.go index 201c4b4f494..56aceac8077 100644 --- a/cdc/model/kv.go +++ b/cdc/model/kv.go @@ -16,6 +16,7 @@ package model import ( + "errors" "fmt" "github.com/pingcap/tiflow/cdc/processor/tablepb" @@ -110,3 +111,20 @@ func (v *RawKVEntry) String() string { func (v *RawKVEntry) ApproximateDataSize() int64 { return int64(len(v.Key) + len(v.Value) + len(v.OldValue)) } + +// ShouldSplitKVEntry checks whether the raw kv entry should be splitted. +type ShouldSplitKVEntry func(raw *RawKVEntry) bool + +// SplitUpdateKVEntry splits the raw kv entry into a delete entry and an insert entry. +func SplitUpdateKVEntry(raw *RawKVEntry) (*RawKVEntry, *RawKVEntry, error) { + if raw == nil { + return nil, nil, errors.New("nil event cannot be split") + } + deleteKVEntry := *raw + deleteKVEntry.Value = nil + + insertKVEntry := *raw + insertKVEntry.OldValue = nil + + return &deleteKVEntry, &insertKVEntry, nil +} diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index 382d229d5f0..76bdbfb502f 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -223,13 +223,13 @@ func (p *processor) AddTableSpan( zap.Bool("isPrepare", isPrepare)) } - p.sinkManager.r.AddTable( + table := p.sinkManager.r.AddTable( span, startTs, p.latestInfo.TargetTs) if p.redo.r.Enabled() { p.redo.r.AddTable(span, startTs) } - p.sourceManager.r.AddTable(span, p.getTableName(ctx, span.TableID), startTs) + p.sourceManager.r.AddTable(span, p.getTableName(ctx, span.TableID), startTs, table.GetReplicaTs) return true, nil } @@ -488,18 +488,6 @@ func isProcessorIgnorableError(err error) bool { return false } -// needPullerSafeModeAtStart returns true if the scheme is mysql compatible. -// pullerSafeMode means to split all update kv entries whose commitTS -// is older then the start time of this changefeed. -func needPullerSafeModeAtStart(sinkURIStr string) (bool, error) { - sinkURI, err := url.Parse(sinkURIStr) - if err != nil { - return false, cerror.WrapError(cerror.ErrSinkURIInvalid, err) - } - scheme := sink.GetScheme(sinkURI) - return sink.IsMySQLCompatibleScheme(scheme), nil -} - // Tick implements the `orchestrator.State` interface // the `info` parameter is sent by metadata store, the `info` must be the latest value snapshot. // the `status` parameter is sent by metadata store, the `status` must be the latest value snapshot. @@ -600,6 +588,16 @@ func (p *processor) tick(ctx context.Context) (error, error) { return nil, warning } +// isMysqlCompatibleBackend returns true if the sinkURIStr is mysql compatible. +func isMysqlCompatibleBackend(sinkURIStr string) (bool, error) { + sinkURI, err := url.Parse(sinkURIStr) + if err != nil { + return false, cerror.WrapError(cerror.ErrSinkURIInvalid, err) + } + scheme := sink.GetScheme(sinkURI) + return sink.IsMySQLCompatibleScheme(scheme), nil +} + // lazyInitImpl create Filter, SchemaStorage, Mounter instances at the first tick. func (p *processor) lazyInitImpl(etcdCtx context.Context) (err error) { if p.initialized.Load() { @@ -659,7 +657,7 @@ func (p *processor) lazyInitImpl(etcdCtx context.Context) (err error) { return errors.Trace(err) } - pullerSafeModeAtStart, err := needPullerSafeModeAtStart(p.latestInfo.SinkURI) + isMysqlBackend, err := isMysqlCompatibleBackend(p.latestInfo.SinkURI) if err != nil { return errors.Trace(err) } @@ -667,14 +665,14 @@ func (p *processor) lazyInitImpl(etcdCtx context.Context) (err error) { p.changefeedID, p.upstream, p.mg.r, sortEngine, util.GetOrZero(cfConfig.BDRMode), util.GetOrZero(cfConfig.EnableTableMonitor), - pullerSafeModeAtStart) + isMysqlBackend) p.sourceManager.name = "SourceManager" p.sourceManager.changefeedID = p.changefeedID p.sourceManager.spawn(prcCtx) p.sinkManager.r = sinkmanager.New( p.changefeedID, p.latestInfo.SinkURI, cfConfig, p.upstream, - p.ddlHandler.r.schemaStorage, p.redo.r, p.sourceManager.r) + p.ddlHandler.r.schemaStorage, p.redo.r, p.sourceManager.r, isMysqlBackend) p.sinkManager.name = "SinkManager" p.sinkManager.changefeedID = p.changefeedID p.sinkManager.spawn(prcCtx) diff --git a/cdc/processor/sinkmanager/manager.go b/cdc/processor/sinkmanager/manager.go index b1ac8e4a6bd..a0804e62e49 100644 --- a/cdc/processor/sinkmanager/manager.go +++ b/cdc/processor/sinkmanager/manager.go @@ -128,6 +128,9 @@ type SinkManager struct { // wg is used to wait for all workers to exit. wg sync.WaitGroup + // isMysqlBackend indicates whether the backend is MySQL compatible. + isMysqlBackend bool + // Metric for table sink. metricsTableSinkTotalRows prometheus.Counter @@ -143,6 +146,7 @@ func New( schemaStorage entry.SchemaStorage, redoDMLMgr redo.DMLManager, sourceManager *sourcemanager.SourceManager, + isMysqlBackend bool, ) *SinkManager { m := &SinkManager{ changefeedID: changefeedID, @@ -156,7 +160,7 @@ func New( sinkTaskChan: make(chan *sinkTask), sinkWorkerAvailable: make(chan struct{}, 1), sinkRetry: retry.NewInfiniteErrorRetry(), - + isMysqlBackend: isMysqlBackend, metricsTableSinkTotalRows: tablesinkmetrics.TotalRowsCountCounter. WithLabelValues(changefeedID.Namespace, changefeedID.ID), @@ -298,6 +302,11 @@ func (m *SinkManager) Run(ctx context.Context, warnings ...chan<- error) (err er if cerror.IsDupEntryError(err) { return errors.Trace(err) } + + if m.isMysqlBackend { + // For MySQL backend, we should restart sink. Let owner to handle the error. + return errors.Trace(err) + } } // If the error is retryable, we should retry to re-establish the internal resources. @@ -832,7 +841,7 @@ func (m *SinkManager) UpdateBarrierTs(globalBarrierTs model.Ts, tableBarrier map } // AddTable adds a table(TableSink) to the sink manager. -func (m *SinkManager) AddTable(span tablepb.Span, startTs model.Ts, targetTs model.Ts) { +func (m *SinkManager) AddTable(span tablepb.Span, startTs model.Ts, targetTs model.Ts) *tableSinkWrapper { sinkWrapper := newTableSinkWrapper( m.changefeedID, span, @@ -860,7 +869,6 @@ func (m *SinkManager) AddTable(span tablepb.Span, startTs model.Ts, targetTs mod zap.String("namespace", m.changefeedID.Namespace), zap.String("changefeed", m.changefeedID.ID), zap.Stringer("span", &span)) - return } m.sinkMemQuota.AddTable(span) m.redoMemQuota.AddTable(span) @@ -870,6 +878,7 @@ func (m *SinkManager) AddTable(span tablepb.Span, startTs model.Ts, targetTs mod zap.Stringer("span", &span), zap.Uint64("startTs", startTs), zap.Uint64("version", sinkWrapper.version)) + return sinkWrapper } // StartTable sets the table(TableSink) state to replicating. diff --git a/cdc/processor/sinkmanager/manager_test.go b/cdc/processor/sinkmanager/manager_test.go index 362ab722606..0532dcb3b92 100644 --- a/cdc/processor/sinkmanager/manager_test.go +++ b/cdc/processor/sinkmanager/manager_test.go @@ -135,7 +135,7 @@ func TestAddTable(t *testing.T) { require.Equal(t, 0, manager.sinkProgressHeap.len(), "Not started table shout not in progress heap") err := manager.StartTable(span, 1) require.NoError(t, err) - require.Equal(t, uint64(0x7ffffffffffbffff), tableSink.(*tableSinkWrapper).replicateTs) + require.Equal(t, uint64(0x7ffffffffffbffff), tableSink.(*tableSinkWrapper).replicateTs.Load()) progress := manager.sinkProgressHeap.pop() require.Equal(t, span, progress.span) @@ -359,7 +359,7 @@ func TestSinkManagerRunWithErrors(t *testing.T) { span := spanz.TableIDToComparableSpan(1) - source.AddTable(span, "test", 100) + source.AddTable(span, "test", 100, func() model.Ts { return 0 }) manager.AddTable(span, 100, math.MaxUint64) manager.StartTable(span, 100) source.Add(span, model.NewResolvedPolymorphicEvent(0, 101)) diff --git a/cdc/processor/sinkmanager/manager_test_helper.go b/cdc/processor/sinkmanager/manager_test_helper.go index 8f7fd65a3b4..3e1e483fc0f 100644 --- a/cdc/processor/sinkmanager/manager_test_helper.go +++ b/cdc/processor/sinkmanager/manager_test_helper.go @@ -71,7 +71,7 @@ func CreateManagerWithMemEngine( sourceManager.WaitForReady(ctx) sinkManager := New(changefeedID, changefeedInfo.SinkURI, - changefeedInfo.Config, up, schemaStorage, nil, sourceManager) + changefeedInfo.Config, up, schemaStorage, nil, sourceManager, false) go func() { handleError(sinkManager.Run(ctx)) }() sinkManager.WaitForReady(ctx) @@ -92,6 +92,6 @@ func NewManagerWithMemEngine( schemaStorage := &entry.MockSchemaStorage{Resolved: math.MaxUint64} sourceManager := sourcemanager.NewForTest(changefeedID, up, mg, sortEngine, false) sinkManager := New(changefeedID, changefeedInfo.SinkURI, - changefeedInfo.Config, up, schemaStorage, redoMgr, sourceManager) + changefeedInfo.Config, up, schemaStorage, redoMgr, sourceManager, false) return sinkManager, sourceManager, sortEngine } diff --git a/cdc/processor/sinkmanager/redo_log_worker.go b/cdc/processor/sinkmanager/redo_log_worker.go index 7ce4457e6f3..cb970afd02e 100644 --- a/cdc/processor/sinkmanager/redo_log_worker.go +++ b/cdc/processor/sinkmanager/redo_log_worker.go @@ -127,7 +127,7 @@ func (w *redoWorker) handleTask(ctx context.Context, task *redoTask) (finalErr e // NOTICE: The event can be filtered by the event filter. if e.Row != nil { // For all events, we add table replicate ts, so mysql sink can determine safe-mode. - e.Row.ReplicatingTs = task.tableSink.replicateTs + e.Row.ReplicatingTs = task.tableSink.replicateTs.Load() x, size = handleRowChangedEvents(w.changefeedID, task.span, e) advancer.appendEvents(x, size) } diff --git a/cdc/processor/sinkmanager/table_sink_worker.go b/cdc/processor/sinkmanager/table_sink_worker.go index cff16a168bd..d1bd8b60df6 100644 --- a/cdc/processor/sinkmanager/table_sink_worker.go +++ b/cdc/processor/sinkmanager/table_sink_worker.go @@ -204,7 +204,7 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e // NOTICE: The event can be filtered by the event filter. if e.Row != nil { // For all rows, we add table replicate ts, so mysql sink can determine safe-mode. - e.Row.ReplicatingTs = task.tableSink.replicateTs + e.Row.ReplicatingTs = task.tableSink.GetReplicaTs() x, size := handleRowChangedEvents(w.changefeedID, task.span, e) advancer.appendEvents(x, size) } diff --git a/cdc/processor/sinkmanager/table_sink_wrapper.go b/cdc/processor/sinkmanager/table_sink_wrapper.go index f354372371b..dddba30ea2a 100644 --- a/cdc/processor/sinkmanager/table_sink_wrapper.go +++ b/cdc/processor/sinkmanager/table_sink_wrapper.go @@ -15,6 +15,7 @@ package sinkmanager import ( "context" + "math" "sort" "sync" "sync/atomic" @@ -77,7 +78,7 @@ type tableSinkWrapper struct { receivedSorterResolvedTs atomic.Uint64 // replicateTs is the ts that the table sink has started to replicate. - replicateTs model.Ts + replicateTs atomic.Uint64 genReplicateTs func(ctx context.Context) (model.Ts, error) // lastCleanTime indicates the last time the table has been cleaned. @@ -90,6 +91,11 @@ type tableSinkWrapper struct { rangeEventCountsMu sync.Mutex } +// GetReplicaTs returns the replicate ts of the table sink. +func (t *tableSinkWrapper) GetReplicaTs() model.Ts { + return t.replicateTs.Load() +} + type rangeEventCount struct { // firstPos and lastPos are used to merge many rangeEventCount into one. firstPos sorter.Position @@ -132,31 +138,34 @@ func newTableSinkWrapper( res.receivedSorterResolvedTs.Store(startTs) res.barrierTs.Store(startTs) + res.replicateTs.Store(math.MaxUint64) return res } func (t *tableSinkWrapper) start(ctx context.Context, startTs model.Ts) (err error) { - if t.replicateTs != 0 { + if t.replicateTs.Load() != math.MaxUint64 { log.Panic("The table sink has already started", zap.String("namespace", t.changefeed.Namespace), zap.String("changefeed", t.changefeed.ID), zap.Stringer("span", &t.span), zap.Uint64("startTs", startTs), - zap.Uint64("oldReplicateTs", t.replicateTs), + zap.Uint64("oldReplicateTs", t.replicateTs.Load()), ) } // FIXME(qupeng): it can be re-fetched later instead of fails. - if t.replicateTs, err = t.genReplicateTs(ctx); err != nil { + ts, err := t.genReplicateTs(ctx) + if err != nil { return errors.Trace(err) } + t.replicateTs.Store(ts) log.Info("Sink is started", zap.String("namespace", t.changefeed.Namespace), zap.String("changefeed", t.changefeed.ID), zap.Stringer("span", &t.span), zap.Uint64("startTs", startTs), - zap.Uint64("replicateTs", t.replicateTs), + zap.Uint64("replicateTs", ts), ) // This start ts maybe greater than the initial start ts of the table sink. @@ -379,14 +388,16 @@ func (t *tableSinkWrapper) checkTableSinkHealth() (err error) { // committed at downstream but we don't know. So we need to update `replicateTs` // of the table so that we can re-send those events later. func (t *tableSinkWrapper) restart(ctx context.Context) (err error) { - if t.replicateTs, err = t.genReplicateTs(ctx); err != nil { + ts, err := t.genReplicateTs(ctx) + if err != nil { return errors.Trace(err) } + t.replicateTs.Store(ts) log.Info("Sink is restarted", zap.String("namespace", t.changefeed.Namespace), zap.String("changefeed", t.changefeed.ID), zap.Stringer("span", &t.span), - zap.Uint64("replicateTs", t.replicateTs)) + zap.Uint64("replicateTs", ts)) return nil } diff --git a/cdc/processor/sourcemanager/manager.go b/cdc/processor/sourcemanager/manager.go index b252dc44437..3afb21f26bb 100644 --- a/cdc/processor/sourcemanager/manager.go +++ b/cdc/processor/sourcemanager/manager.go @@ -17,7 +17,6 @@ import ( "context" "time" - "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/entry" "github.com/pingcap/tiflow/cdc/kv" @@ -28,13 +27,9 @@ import ( "github.com/pingcap/tiflow/cdc/processor/tablepb" "github.com/pingcap/tiflow/cdc/puller" "github.com/pingcap/tiflow/pkg/config" - cerrors "github.com/pingcap/tiflow/pkg/errors" - "github.com/pingcap/tiflow/pkg/retry" "github.com/pingcap/tiflow/pkg/txnutil" "github.com/pingcap/tiflow/pkg/upstream" - "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikv" - pd "github.com/tikv/pd/client" "go.uber.org/zap" ) @@ -55,7 +50,8 @@ type SourceManager struct { engine sorter.SortEngine // Used to indicate whether the changefeed is in BDR mode. bdrMode bool - startTs model.Ts + + safeModeAtStart bool enableTableMonitor bool puller *puller.MultiplexingPuller @@ -92,21 +88,8 @@ func NewForTest( } } -func isOldUpdateKVEntry(raw *model.RawKVEntry, thresholdTs model.Ts) bool { - return raw != nil && raw.IsUpdate() && raw.CRTs < thresholdTs -} - -func splitUpdateKVEntry(raw *model.RawKVEntry) (*model.RawKVEntry, *model.RawKVEntry, error) { - if raw == nil { - return nil, nil, errors.New("nil event cannot be split") - } - deleteKVEntry := *raw - deleteKVEntry.Value = nil - - insertKVEntry := *raw - insertKVEntry.OldValue = nil - - return &deleteKVEntry, &insertKVEntry, nil +func isOldUpdateKVEntry(raw *model.RawKVEntry, getReplicaTs func() model.Ts) bool { + return raw != nil && raw.IsUpdate() && raw.CRTs < getReplicaTs() } func newSourceManager( @@ -126,6 +109,7 @@ func newSourceManager( engine: engine, bdrMode: bdrMode, enableTableMonitor: enableTableMonitor, + safeModeAtStart: safeModeAtStart, } serverConfig := config.GetGlobalServerConfig() @@ -138,15 +122,15 @@ func newSourceManager( // consume add raw kv entry to the engine. // It will be called by the puller when new raw kv entry is received. - consume := func(ctx context.Context, raw *model.RawKVEntry, spans []tablepb.Span) error { + consume := func(ctx context.Context, raw *model.RawKVEntry, spans []tablepb.Span, shouldSplitKVEntry model.ShouldSplitKVEntry) error { if len(spans) > 1 { log.Panic("DML puller subscribes multiple spans", zap.String("namespace", mgr.changefeedID.Namespace), zap.String("changefeed", mgr.changefeedID.ID)) } if raw != nil { - if safeModeAtStart && isOldUpdateKVEntry(raw, mgr.startTs) { - deleteKVEntry, insertKVEntry, err := splitUpdateKVEntry(raw) + if shouldSplitKVEntry(raw) { + deleteKVEntry, insertKVEntry, err := model.SplitUpdateKVEntry(raw) if err != nil { return err } @@ -175,13 +159,17 @@ func newSourceManager( } // AddTable adds a table to the source manager. Start puller and register table to the engine. -func (m *SourceManager) AddTable(span tablepb.Span, tableName string, startTs model.Ts) { +func (m *SourceManager) AddTable(span tablepb.Span, tableName string, startTs model.Ts, getReplicaTs func() model.Ts) { // Add table to the engine first, so that the engine can receive the events from the puller. m.engine.AddTable(span, startTs) + shouldSplitKVEntry := func(raw *model.RawKVEntry) bool { + return m.safeModeAtStart && isOldUpdateKVEntry(raw, getReplicaTs) + } + // Only nil in unit tests. if m.puller != nil { - m.puller.Subscribe([]tablepb.Span{span}, startTs, tableName) + m.puller.Subscribe([]tablepb.Span{span}, startTs, tableName, shouldSplitKVEntry) } } @@ -227,11 +215,6 @@ func (m *SourceManager) Run(ctx context.Context, _ ...chan<- error) error { if m.puller == nil { return nil } - startTs, err := getCurrentTs(ctx, m.up.PDClient) - if err != nil { - return err - } - m.startTs = startTs return m.puller.Run(ctx) } @@ -273,23 +256,3 @@ func (m *SourceManager) Close() { func (m *SourceManager) Add(span tablepb.Span, events ...*model.PolymorphicEvent) { m.engine.Add(span, events...) } - -func getCurrentTs(ctx context.Context, pdClient pd.Client) (model.Ts, error) { - backoffBaseDelayInMs := int64(100) - totalRetryDuration := 10 * time.Second - var replicateTs model.Ts - err := retry.Do(ctx, func() error { - phy, logic, err := pdClient.GetTS(ctx) - if err != nil { - return errors.Trace(err) - } - replicateTs = oracle.ComposeTS(phy, logic) - return nil - }, retry.WithBackoffBaseDelay(backoffBaseDelayInMs), - retry.WithTotalRetryDuratoin(totalRetryDuration), - retry.WithIsRetryableErr(cerrors.IsRetryableError)) - if err != nil { - return model.Ts(0), errors.Trace(err) - } - return replicateTs, nil -} diff --git a/cdc/puller/ddl_puller.go b/cdc/puller/ddl_puller.go index e50ad06bf72..b738b027b97 100644 --- a/cdc/puller/ddl_puller.go +++ b/cdc/puller/ddl_puller.go @@ -121,7 +121,7 @@ func NewDDLJobPuller( slots, hasher := 1, func(tablepb.Span, int) int { return 0 } ddlJobPuller.mp = NewMultiplexingPuller(changefeed, client, up.PDClock, ddlJobPuller.Input, slots, hasher, 1) - ddlJobPuller.mp.Subscribe(ddlSpans, checkpointTs, memorysorter.DDLPullerTableName) + ddlJobPuller.mp.Subscribe(ddlSpans, checkpointTs, memorysorter.DDLPullerTableName, func(_ *model.RawKVEntry) bool { return false }) return ddlJobPuller } @@ -174,6 +174,7 @@ func (p *ddlJobPullerImpl) Input( ctx context.Context, rawDDL *model.RawKVEntry, _ []tablepb.Span, + _ model.ShouldSplitKVEntry, ) error { p.sorter.AddEntry(ctx, model.NewPolymorphicEvent(rawDDL)) return nil diff --git a/cdc/puller/ddl_puller_test.go b/cdc/puller/ddl_puller_test.go index 2b08ddd3da8..d34422d0f71 100644 --- a/cdc/puller/ddl_puller_test.go +++ b/cdc/puller/ddl_puller_test.go @@ -66,12 +66,12 @@ func tsToRawKVEntry(_ *testing.T, ts model.Ts) *model.RawKVEntry { func inputDDL(t *testing.T, puller *ddlJobPullerImpl, job *timodel.Job) { rawJob := jonToRawKVEntry(t, job) - puller.Input(context.Background(), rawJob, []tablepb.Span{}) + puller.Input(context.Background(), rawJob, []tablepb.Span{}, func(_ *model.RawKVEntry) bool { return false }) } func inputTs(t *testing.T, puller *ddlJobPullerImpl, ts model.Ts) { rawTs := tsToRawKVEntry(t, ts) - puller.Input(context.Background(), rawTs, []tablepb.Span{}) + puller.Input(context.Background(), rawTs, []tablepb.Span{}, func(_ *model.RawKVEntry) bool { return false }) } func waitResolvedTs(t *testing.T, p DDLJobPuller, targetTs model.Ts) { diff --git a/cdc/puller/multiplexing_puller.go b/cdc/puller/multiplexing_puller.go index f9fc25d64b8..792be0bab40 100644 --- a/cdc/puller/multiplexing_puller.go +++ b/cdc/puller/multiplexing_puller.go @@ -134,7 +134,7 @@ type MultiplexingPuller struct { changefeed model.ChangeFeedID client *kv.SharedClient pdClock pdutil.Clock - consume func(context.Context, *model.RawKVEntry, []tablepb.Span) error + consume func(context.Context, *model.RawKVEntry, []tablepb.Span, model.ShouldSplitKVEntry) error // inputChannelIndexer is used to determine which input channel to use for a given span. inputChannelIndexer func(span tablepb.Span, workerCount int) int @@ -172,7 +172,7 @@ func NewMultiplexingPuller( changefeed model.ChangeFeedID, client *kv.SharedClient, pdClock pdutil.Clock, - consume func(context.Context, *model.RawKVEntry, []tablepb.Span) error, + consume func(context.Context, *model.RawKVEntry, []tablepb.Span, model.ShouldSplitKVEntry) error, workerCount int, inputChannelIndexer func(tablepb.Span, int) int, resolvedTsAdvancerCount int, @@ -197,16 +197,22 @@ func NewMultiplexingPuller( } // Subscribe some spans. They will share one same resolved timestamp progress. -func (p *MultiplexingPuller) Subscribe(spans []tablepb.Span, startTs model.Ts, tableName string) { +func (p *MultiplexingPuller) Subscribe( + spans []tablepb.Span, + startTs model.Ts, + tableName string, + shouldSplitKVEntry model.ShouldSplitKVEntry, +) { p.subscriptions.Lock() defer p.subscriptions.Unlock() - p.subscribe(spans, startTs, tableName) + p.subscribe(spans, startTs, tableName, shouldSplitKVEntry) } func (p *MultiplexingPuller) subscribe( spans []tablepb.Span, startTs model.Ts, tableName string, + shouldSplitKVEntry model.ShouldSplitKVEntry, ) { for _, span := range spans { // Base on the current design, a MultiplexingPuller is only used for one changefeed. @@ -240,7 +246,7 @@ func (p *MultiplexingPuller) subscribe( progress.consume.RLock() defer progress.consume.RUnlock() if !progress.consume.removed { - return p.consume(ctx, raw, spans) + return p.consume(ctx, raw, spans, shouldSplitKVEntry) } return nil } diff --git a/cdc/puller/multiplexing_puller_test.go b/cdc/puller/multiplexing_puller_test.go index d3d495e94eb..6563c0f350f 100644 --- a/cdc/puller/multiplexing_puller_test.go +++ b/cdc/puller/multiplexing_puller_test.go @@ -31,7 +31,7 @@ import ( func newMultiplexingPullerForTest(outputCh chan<- *model.RawKVEntry) *MultiplexingPuller { cfg := &config.ServerConfig{Debug: &config.DebugConfig{Puller: &config.PullerConfig{LogRegionDetails: false}}} client := kv.NewSharedClient(model.ChangeFeedID{}, cfg, false, nil, nil, nil, nil, nil) - consume := func(ctx context.Context, e *model.RawKVEntry, _ []tablepb.Span) error { + consume := func(ctx context.Context, e *model.RawKVEntry, _ []tablepb.Span, _ model.ShouldSplitKVEntry) error { select { case <-ctx.Done(): return ctx.Err() @@ -87,7 +87,10 @@ func TestMultiplexingPullerResolvedForward(t *testing.T) { spans := []tablepb.Span{spanz.ToSpan([]byte("t_a"), []byte("t_e"))} spans[0].TableID = 1 - puller.subscribe(spans, 996, "test") + shouldSplitKVEntry := func(raw *model.RawKVEntry) bool { + return false + } + puller.subscribe(spans, 996, "test", shouldSplitKVEntry) subID := puller.subscriptions.n.GetV(spans[0]).subID for _, event := range events { puller.inputChs[0] <- kv.MultiplexingEvent{RegionFeedEvent: event, SubscriptionID: subID} From b1eda2dcb7dad9b46f5faf9590a620ae15f1af1c Mon Sep 17 00:00:00 2001 From: lidezhu <47731263+lidezhu@users.noreply.github.com> Date: Thu, 13 Jun 2024 11:07:43 +0800 Subject: [PATCH 2/2] Update cdc/processor/sinkmanager/manager.go Co-authored-by: CharlesCheung <61726649+CharlesCheung96@users.noreply.github.com> --- cdc/processor/sinkmanager/manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdc/processor/sinkmanager/manager.go b/cdc/processor/sinkmanager/manager.go index a0804e62e49..d7b986a86f5 100644 --- a/cdc/processor/sinkmanager/manager.go +++ b/cdc/processor/sinkmanager/manager.go @@ -304,7 +304,7 @@ func (m *SinkManager) Run(ctx context.Context, warnings ...chan<- error) (err er } if m.isMysqlBackend { - // For MySQL backend, we should restart sink. Let owner to handle the error. + // For MySQL backend, we should restart changefeed. Let owner to handle the error. return errors.Trace(err) } }