Skip to content

Commit

Permalink
puller(ticdc): fix wrong update splitting behavior after table schedu…
Browse files Browse the repository at this point in the history
…ling (#11269) (#11283)

close #11219
  • Loading branch information
ti-chi-bot authored and lidezhu committed Jun 11, 2024
1 parent 3887861 commit f25075d
Show file tree
Hide file tree
Showing 13 changed files with 103 additions and 94 deletions.
18 changes: 18 additions & 0 deletions cdc/model/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package model

import (
"errors"
"fmt"

"github.com/pingcap/tiflow/cdc/processor/tablepb"
Expand Down Expand Up @@ -110,3 +111,20 @@ func (v *RawKVEntry) String() string {
func (v *RawKVEntry) ApproximateDataSize() int64 {
return int64(len(v.Key) + len(v.Value) + len(v.OldValue))
}

// ShouldSplitKVEntry checks whether the raw kv entry should be splitted.
type ShouldSplitKVEntry func(raw *RawKVEntry) bool

// SplitUpdateKVEntry splits the raw kv entry into a delete entry and an insert entry.
func SplitUpdateKVEntry(raw *RawKVEntry) (*RawKVEntry, *RawKVEntry, error) {
if raw == nil {
return nil, nil, errors.New("nil event cannot be split")
}
deleteKVEntry := *raw
deleteKVEntry.Value = nil

insertKVEntry := *raw
insertKVEntry.OldValue = nil

return &deleteKVEntry, &insertKVEntry, nil

Check warning on line 129 in cdc/model/kv.go

View check run for this annotation

Codecov / codecov/patch

cdc/model/kv.go#L119-L129

Added lines #L119 - L129 were not covered by tests
}
32 changes: 15 additions & 17 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,13 +223,13 @@ func (p *processor) AddTableSpan(
zap.Bool("isPrepare", isPrepare))
}

p.sinkManager.r.AddTable(
table := p.sinkManager.r.AddTable(
span, startTs, p.latestInfo.TargetTs)
if p.redo.r.Enabled() {
p.redo.r.AddTable(span, startTs)
}
p.sourceManager.r.AddTable(span, p.getTableName(ctx, span.TableID), startTs)

p.sourceManager.r.AddTable(span, p.getTableName(ctx, span.TableID), startTs, table.GetReplicaTs)
return true, nil
}

Expand Down Expand Up @@ -488,18 +488,6 @@ func isProcessorIgnorableError(err error) bool {
return false
}

// needPullerSafeModeAtStart returns true if the scheme is mysql compatible.
// pullerSafeMode means to split all update kv entries whose commitTS
// is older then the start time of this changefeed.
func needPullerSafeModeAtStart(sinkURIStr string) (bool, error) {
sinkURI, err := url.Parse(sinkURIStr)
if err != nil {
return false, cerror.WrapError(cerror.ErrSinkURIInvalid, err)
}
scheme := sink.GetScheme(sinkURI)
return sink.IsMySQLCompatibleScheme(scheme), nil
}

// Tick implements the `orchestrator.State` interface
// the `info` parameter is sent by metadata store, the `info` must be the latest value snapshot.
// the `status` parameter is sent by metadata store, the `status` must be the latest value snapshot.
Expand Down Expand Up @@ -600,6 +588,16 @@ func (p *processor) tick(ctx context.Context) (error, error) {
return nil, warning
}

// isMysqlCompatibleBackend returns true if the sinkURIStr is mysql compatible.
func isMysqlCompatibleBackend(sinkURIStr string) (bool, error) {
sinkURI, err := url.Parse(sinkURIStr)
if err != nil {
return false, cerror.WrapError(cerror.ErrSinkURIInvalid, err)
}
scheme := sink.GetScheme(sinkURI)
return sink.IsMySQLCompatibleScheme(scheme), nil

Check warning on line 598 in cdc/processor/processor.go

View check run for this annotation

Codecov / codecov/patch

cdc/processor/processor.go#L592-L598

Added lines #L592 - L598 were not covered by tests
}

// lazyInitImpl create Filter, SchemaStorage, Mounter instances at the first tick.
func (p *processor) lazyInitImpl(etcdCtx context.Context) (err error) {
if p.initialized.Load() {
Expand Down Expand Up @@ -659,22 +657,22 @@ func (p *processor) lazyInitImpl(etcdCtx context.Context) (err error) {
return errors.Trace(err)
}

pullerSafeModeAtStart, err := needPullerSafeModeAtStart(p.latestInfo.SinkURI)
isMysqlBackend, err := isMysqlCompatibleBackend(p.latestInfo.SinkURI)

Check warning on line 660 in cdc/processor/processor.go

View check run for this annotation

Codecov / codecov/patch

cdc/processor/processor.go#L660

Added line #L660 was not covered by tests
if err != nil {
return errors.Trace(err)
}
p.sourceManager.r = sourcemanager.New(
p.changefeedID, p.upstream, p.mg.r,
sortEngine, util.GetOrZero(cfConfig.BDRMode),
util.GetOrZero(cfConfig.EnableTableMonitor),
pullerSafeModeAtStart)
isMysqlBackend)

Check warning on line 668 in cdc/processor/processor.go

View check run for this annotation

Codecov / codecov/patch

cdc/processor/processor.go#L668

Added line #L668 was not covered by tests
p.sourceManager.name = "SourceManager"
p.sourceManager.changefeedID = p.changefeedID
p.sourceManager.spawn(prcCtx)

p.sinkManager.r = sinkmanager.New(
p.changefeedID, p.latestInfo.SinkURI, cfConfig, p.upstream,
p.ddlHandler.r.schemaStorage, p.redo.r, p.sourceManager.r)
p.ddlHandler.r.schemaStorage, p.redo.r, p.sourceManager.r, isMysqlBackend)

Check warning on line 675 in cdc/processor/processor.go

View check run for this annotation

Codecov / codecov/patch

cdc/processor/processor.go#L675

Added line #L675 was not covered by tests
p.sinkManager.name = "SinkManager"
p.sinkManager.changefeedID = p.changefeedID
p.sinkManager.spawn(prcCtx)
Expand Down
15 changes: 12 additions & 3 deletions cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ type SinkManager struct {
// wg is used to wait for all workers to exit.
wg sync.WaitGroup

// isMysqlBackend indicates whether the backend is MySQL compatible.
isMysqlBackend bool

// Metric for table sink.
metricsTableSinkTotalRows prometheus.Counter

Expand All @@ -143,6 +146,7 @@ func New(
schemaStorage entry.SchemaStorage,
redoDMLMgr redo.DMLManager,
sourceManager *sourcemanager.SourceManager,
isMysqlBackend bool,
) *SinkManager {
m := &SinkManager{
changefeedID: changefeedID,
Expand All @@ -156,7 +160,7 @@ func New(
sinkTaskChan: make(chan *sinkTask),
sinkWorkerAvailable: make(chan struct{}, 1),
sinkRetry: retry.NewInfiniteErrorRetry(),

isMysqlBackend: isMysqlBackend,
metricsTableSinkTotalRows: tablesinkmetrics.TotalRowsCountCounter.
WithLabelValues(changefeedID.Namespace, changefeedID.ID),

Expand Down Expand Up @@ -298,6 +302,11 @@ func (m *SinkManager) Run(ctx context.Context, warnings ...chan<- error) (err er
if cerror.IsDupEntryError(err) {
return errors.Trace(err)
}

if m.isMysqlBackend {
// For MySQL backend, we should restart sink. Let owner to handle the error.
return errors.Trace(err)
}

Check warning on line 309 in cdc/processor/sinkmanager/manager.go

View check run for this annotation

Codecov / codecov/patch

cdc/processor/sinkmanager/manager.go#L306-L309

Added lines #L306 - L309 were not covered by tests
}

// If the error is retryable, we should retry to re-establish the internal resources.
Expand Down Expand Up @@ -832,7 +841,7 @@ func (m *SinkManager) UpdateBarrierTs(globalBarrierTs model.Ts, tableBarrier map
}

// AddTable adds a table(TableSink) to the sink manager.
func (m *SinkManager) AddTable(span tablepb.Span, startTs model.Ts, targetTs model.Ts) {
func (m *SinkManager) AddTable(span tablepb.Span, startTs model.Ts, targetTs model.Ts) *tableSinkWrapper {
sinkWrapper := newTableSinkWrapper(
m.changefeedID,
span,
Expand Down Expand Up @@ -860,7 +869,6 @@ func (m *SinkManager) AddTable(span tablepb.Span, startTs model.Ts, targetTs mod
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Stringer("span", &span))
return
}
m.sinkMemQuota.AddTable(span)
m.redoMemQuota.AddTable(span)
Expand All @@ -870,6 +878,7 @@ func (m *SinkManager) AddTable(span tablepb.Span, startTs model.Ts, targetTs mod
zap.Stringer("span", &span),
zap.Uint64("startTs", startTs),
zap.Uint64("version", sinkWrapper.version))
return sinkWrapper
}

// StartTable sets the table(TableSink) state to replicating.
Expand Down
4 changes: 2 additions & 2 deletions cdc/processor/sinkmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func TestAddTable(t *testing.T) {
require.Equal(t, 0, manager.sinkProgressHeap.len(), "Not started table shout not in progress heap")
err := manager.StartTable(span, 1)
require.NoError(t, err)
require.Equal(t, uint64(0x7ffffffffffbffff), tableSink.(*tableSinkWrapper).replicateTs)
require.Equal(t, uint64(0x7ffffffffffbffff), tableSink.(*tableSinkWrapper).replicateTs.Load())

progress := manager.sinkProgressHeap.pop()
require.Equal(t, span, progress.span)
Expand Down Expand Up @@ -359,7 +359,7 @@ func TestSinkManagerRunWithErrors(t *testing.T) {

span := spanz.TableIDToComparableSpan(1)

source.AddTable(span, "test", 100)
source.AddTable(span, "test", 100, func() model.Ts { return 0 })
manager.AddTable(span, 100, math.MaxUint64)
manager.StartTable(span, 100)
source.Add(span, model.NewResolvedPolymorphicEvent(0, 101))
Expand Down
4 changes: 2 additions & 2 deletions cdc/processor/sinkmanager/manager_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func CreateManagerWithMemEngine(
sourceManager.WaitForReady(ctx)

sinkManager := New(changefeedID, changefeedInfo.SinkURI,
changefeedInfo.Config, up, schemaStorage, nil, sourceManager)
changefeedInfo.Config, up, schemaStorage, nil, sourceManager, false)
go func() { handleError(sinkManager.Run(ctx)) }()
sinkManager.WaitForReady(ctx)

Expand All @@ -92,6 +92,6 @@ func NewManagerWithMemEngine(
schemaStorage := &entry.MockSchemaStorage{Resolved: math.MaxUint64}
sourceManager := sourcemanager.NewForTest(changefeedID, up, mg, sortEngine, false)
sinkManager := New(changefeedID, changefeedInfo.SinkURI,
changefeedInfo.Config, up, schemaStorage, redoMgr, sourceManager)
changefeedInfo.Config, up, schemaStorage, redoMgr, sourceManager, false)

Check warning on line 95 in cdc/processor/sinkmanager/manager_test_helper.go

View check run for this annotation

Codecov / codecov/patch

cdc/processor/sinkmanager/manager_test_helper.go#L95

Added line #L95 was not covered by tests
return sinkManager, sourceManager, sortEngine
}
2 changes: 1 addition & 1 deletion cdc/processor/sinkmanager/redo_log_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (w *redoWorker) handleTask(ctx context.Context, task *redoTask) (finalErr e
// NOTICE: The event can be filtered by the event filter.
if e.Row != nil {
// For all events, we add table replicate ts, so mysql sink can determine safe-mode.
e.Row.ReplicatingTs = task.tableSink.replicateTs
e.Row.ReplicatingTs = task.tableSink.replicateTs.Load()
x, size = handleRowChangedEvents(w.changefeedID, task.span, e)
advancer.appendEvents(x, size)
}
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/sinkmanager/table_sink_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e
// NOTICE: The event can be filtered by the event filter.
if e.Row != nil {
// For all rows, we add table replicate ts, so mysql sink can determine safe-mode.
e.Row.ReplicatingTs = task.tableSink.replicateTs
e.Row.ReplicatingTs = task.tableSink.GetReplicaTs()
x, size := handleRowChangedEvents(w.changefeedID, task.span, e)
advancer.appendEvents(x, size)
}
Expand Down
25 changes: 18 additions & 7 deletions cdc/processor/sinkmanager/table_sink_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package sinkmanager

import (
"context"
"math"
"sort"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -77,7 +78,7 @@ type tableSinkWrapper struct {
receivedSorterResolvedTs atomic.Uint64

// replicateTs is the ts that the table sink has started to replicate.
replicateTs model.Ts
replicateTs atomic.Uint64
genReplicateTs func(ctx context.Context) (model.Ts, error)

// lastCleanTime indicates the last time the table has been cleaned.
Expand All @@ -90,6 +91,11 @@ type tableSinkWrapper struct {
rangeEventCountsMu sync.Mutex
}

// GetReplicaTs returns the replicate ts of the table sink.
func (t *tableSinkWrapper) GetReplicaTs() model.Ts {
return t.replicateTs.Load()
}

type rangeEventCount struct {
// firstPos and lastPos are used to merge many rangeEventCount into one.
firstPos sorter.Position
Expand Down Expand Up @@ -132,31 +138,34 @@ func newTableSinkWrapper(

res.receivedSorterResolvedTs.Store(startTs)
res.barrierTs.Store(startTs)
res.replicateTs.Store(math.MaxUint64)
return res
}

func (t *tableSinkWrapper) start(ctx context.Context, startTs model.Ts) (err error) {
if t.replicateTs != 0 {
if t.replicateTs.Load() != math.MaxUint64 {
log.Panic("The table sink has already started",
zap.String("namespace", t.changefeed.Namespace),
zap.String("changefeed", t.changefeed.ID),
zap.Stringer("span", &t.span),
zap.Uint64("startTs", startTs),
zap.Uint64("oldReplicateTs", t.replicateTs),
zap.Uint64("oldReplicateTs", t.replicateTs.Load()),

Check warning on line 152 in cdc/processor/sinkmanager/table_sink_wrapper.go

View check run for this annotation

Codecov / codecov/patch

cdc/processor/sinkmanager/table_sink_wrapper.go#L152

Added line #L152 was not covered by tests
)
}

// FIXME(qupeng): it can be re-fetched later instead of fails.
if t.replicateTs, err = t.genReplicateTs(ctx); err != nil {
ts, err := t.genReplicateTs(ctx)
if err != nil {
return errors.Trace(err)
}
t.replicateTs.Store(ts)

log.Info("Sink is started",
zap.String("namespace", t.changefeed.Namespace),
zap.String("changefeed", t.changefeed.ID),
zap.Stringer("span", &t.span),
zap.Uint64("startTs", startTs),
zap.Uint64("replicateTs", t.replicateTs),
zap.Uint64("replicateTs", ts),
)

// This start ts maybe greater than the initial start ts of the table sink.
Expand Down Expand Up @@ -379,14 +388,16 @@ func (t *tableSinkWrapper) checkTableSinkHealth() (err error) {
// committed at downstream but we don't know. So we need to update `replicateTs`
// of the table so that we can re-send those events later.
func (t *tableSinkWrapper) restart(ctx context.Context) (err error) {
if t.replicateTs, err = t.genReplicateTs(ctx); err != nil {
ts, err := t.genReplicateTs(ctx)
if err != nil {
return errors.Trace(err)
}
t.replicateTs.Store(ts)
log.Info("Sink is restarted",
zap.String("namespace", t.changefeed.Namespace),
zap.String("changefeed", t.changefeed.ID),
zap.Stringer("span", &t.span),
zap.Uint64("replicateTs", t.replicateTs))
zap.Uint64("replicateTs", ts))
return nil
}

Expand Down
Loading

0 comments on commit f25075d

Please sign in to comment.