Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

puller(ticdc): fix wrong update splitting behavior after table scheduling (#11269) #11282

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions cdc/model/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package model

import (
"errors"
"fmt"

"github.com/pingcap/tiflow/pkg/regionspan"
Expand Down Expand Up @@ -110,3 +111,20 @@ func (v *RawKVEntry) String() string {
func (v *RawKVEntry) ApproximateDataSize() int64 {
return int64(len(v.Key) + len(v.Value) + len(v.OldValue))
}

// ShouldSplitKVEntry checks whether the raw kv entry should be splitted.
type ShouldSplitKVEntry func(raw *RawKVEntry) bool

// SplitUpdateKVEntry splits the raw kv entry into a delete entry and an insert entry.
func SplitUpdateKVEntry(raw *RawKVEntry) (*RawKVEntry, *RawKVEntry, error) {
if raw == nil {
return nil, nil, errors.New("nil event cannot be split")
}
deleteKVEntry := *raw
deleteKVEntry.Value = nil

insertKVEntry := *raw
insertKVEntry.OldValue = nil

return &deleteKVEntry, &insertKVEntry, nil
}
33 changes: 15 additions & 18 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,11 +219,11 @@ func (p *processor) AddTable(
}

if p.pullBasedSinking {
p.sinkManager.AddTable(tableID, startTs, p.changefeed.Info.TargetTs)
table := p.sinkManager.AddTable(tableID, startTs, p.changefeed.Info.TargetTs)
if p.redoDMLMgr.Enabled() {
p.redoDMLMgr.AddTable(tableID, startTs)
}
p.sourceManager.AddTable(ctx.(cdcContext.Context), tableID, p.getTableName(ctx, tableID), startTs)
p.sourceManager.AddTable(ctx.(cdcContext.Context), tableID, p.getTableName(ctx, tableID), startTs, table.GetReplicaTs)
} else {
table, err := p.createTablePipeline(
ctx.(cdcContext.Context), tableID, &model.TableReplicaInfo{StartTs: startTs})
Expand All @@ -232,7 +232,6 @@ func (p *processor) AddTable(
}
p.tables[tableID] = table
}

return true, nil
}

Expand Down Expand Up @@ -570,18 +569,6 @@ func isProcessorIgnorableError(err error) bool {
return false
}

// needPullerSafeModeAtStart returns true if the scheme is mysql compatible.
// pullerSafeMode means to split all update kv entries whose commitTS
// is older then the start time of this changefeed.
func needPullerSafeModeAtStart(sinkURIStr string) (bool, error) {
sinkURI, err := url.Parse(sinkURIStr)
if err != nil {
return false, cerror.WrapError(cerror.ErrSinkURIInvalid, err)
}
scheme := sink.GetScheme(sinkURI)
return sink.IsMySQLCompatibleScheme(scheme), nil
}

// Tick implements the `orchestrator.State` interface
// the `state` parameter is sent by the etcd worker, the `state` must be a snapshot of KVs in etcd
// The main logic of processor is in this function, including the calculation of many kinds of ts,
Expand Down Expand Up @@ -757,6 +744,16 @@ func (p *processor) createTaskPosition() (skipThisTick bool) {
return true
}

// isMysqlCompatibleBackend returns true if the sinkURIStr is mysql compatible.
func isMysqlCompatibleBackend(sinkURIStr string) (bool, error) {
sinkURI, err := url.Parse(sinkURIStr)
if err != nil {
return false, cerror.WrapError(cerror.ErrSinkURIInvalid, err)
}
scheme := sink.GetScheme(sinkURI)
return sink.IsMySQLCompatibleScheme(scheme), nil
}

// lazyInitImpl create Filter, SchemaStorage, Mounter instances at the first tick.
func (p *processor) lazyInitImpl(ctx cdcContext.Context) error {
if p.initialized {
Expand Down Expand Up @@ -847,16 +844,16 @@ func (p *processor) lazyInitImpl(ctx cdcContext.Context) error {
zap.Duration("duration", time.Since(start)))
return errors.Trace(err)
}
pullerSafeModeAtStart, err := needPullerSafeModeAtStart(p.changefeed.Info.SinkURI)
isMysqlBackend, err := isMysqlCompatibleBackend(p.changefeed.Info.SinkURI)
if err != nil {
return errors.Trace(err)
}
p.sourceManager = sourcemanager.New(p.changefeedID, p.upstream, p.mg,
sortEngine, p.errCh, p.changefeed.Info.Config.BDRMode, pullerSafeModeAtStart)
sortEngine, p.errCh, p.changefeed.Info.Config.BDRMode, isMysqlBackend)
p.sinkManager, err = sinkmanager.New(stdCtx, p.changefeedID,
p.changefeed.Info, p.upstream, p.schemaStorage,
p.redoDMLMgr, p.sourceManager,
p.errCh, p.warnCh, p.metricsTableSinkTotalRows, p.metricsTableSinkFlushLagDuration)
p.errCh, p.warnCh, isMysqlBackend, p.metricsTableSinkTotalRows, p.metricsTableSinkFlushLagDuration)
if err != nil {
log.Info("Processor creates sink manager fail",
zap.String("namespace", p.changefeedID.Namespace),
Expand Down
14 changes: 12 additions & 2 deletions cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ type SinkManager struct {
// wg is used to wait for all workers to exit.
wg sync.WaitGroup

// isMysqlBackend indicates whether the backend is MySQL compatible.
isMysqlBackend bool

// Metric for table sink.
metricsTableSinkTotalRows prometheus.Counter

Expand All @@ -145,6 +148,7 @@ func New(
sourceManager *sourcemanager.SourceManager,
errChan chan error,
warnChan chan error,
isMysqlBackend bool,
metricsTableSinkTotalRows prometheus.Counter,
metricsTableSinkFlushLagDuration prometheus.Observer,
) (*SinkManager, error) {
Expand All @@ -160,6 +164,7 @@ func New(
sinkTaskChan: make(chan *sinkTask),
sinkWorkerAvailable: make(chan struct{}, 1),
sinkRetry: retry.NewInfiniteErrorRetry(),
isMysqlBackend: isMysqlBackend,

metricsTableSinkTotalRows: metricsTableSinkTotalRows,

Expand Down Expand Up @@ -312,6 +317,11 @@ func (m *SinkManager) run(ctx context.Context, warnings ...chan<- error) (err er
if cerror.IsDupEntryError(err) {
return errors.Trace(err)
}

if m.isMysqlBackend {
// For MySQL backend, we should restart sink. Let owner to handle the error.
return errors.Trace(err)
}
}

// If the error is retryable, we should retry to re-establish the internal resources.
Expand Down Expand Up @@ -848,7 +858,7 @@ func (m *SinkManager) UpdateBarrierTs(globalBarrierTs model.Ts, tableBarrier map
}

// AddTable adds a table(TableSink) to the sink manager.
func (m *SinkManager) AddTable(tableID model.TableID, startTs model.Ts, targetTs model.Ts) {
func (m *SinkManager) AddTable(tableID model.TableID, startTs model.Ts, targetTs model.Ts) *tableSinkWrapper {
sinkWrapper := newTableSinkWrapper(
m.changefeedID,
tableID,
Expand Down Expand Up @@ -876,7 +886,6 @@ func (m *SinkManager) AddTable(tableID model.TableID, startTs model.Ts, targetTs
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Int64("tableID", tableID))
return
}
m.sinkMemQuota.AddTable(tableID)
m.redoMemQuota.AddTable(tableID)
Expand All @@ -886,6 +895,7 @@ func (m *SinkManager) AddTable(tableID model.TableID, startTs model.Ts, targetTs
zap.Int64("tableID", tableID),
zap.Uint64("startTs", startTs),
zap.Uint64("version", sinkWrapper.version))
return sinkWrapper
}

// StartTable sets the table(TableSink) state to replicating.
Expand Down
3 changes: 2 additions & 1 deletion cdc/processor/sinkmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func createManagerWithMemEngine(
&entry.MockSchemaStorage{Resolved: math.MaxUint64},
nil, sm,
errChan, errChan,
false,
prometheus.NewCounter(prometheus.CounterOpts{}),
prometheus.NewHistogram(prometheus.HistogramOpts{}))
require.NoError(t, err)
Expand Down Expand Up @@ -166,7 +167,7 @@ func TestAddTable(t *testing.T) {
require.Equal(t, 0, manager.sinkProgressHeap.len(), "Not started table shout not in progress heap")
err := manager.StartTable(tableID, 1)
require.NoError(t, err)
require.Equal(t, uint64(0x7ffffffffffbffff), tableSink.(*tableSinkWrapper).replicateTs)
require.Equal(t, uint64(0x7ffffffffffbffff), tableSink.(*tableSinkWrapper).replicateTs.Load())

progress := manager.sinkProgressHeap.pop()
require.Equal(t, tableID, progress.tableID)
Expand Down
4 changes: 2 additions & 2 deletions cdc/processor/sinkmanager/redo_log_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,8 @@ func (w *redoWorker) handleTask(ctx context.Context, task *redoTask) (finalErr e
var x []*model.RowChangedEvent
var size uint64
if e.Row != nil {
// For all rows, we add table replicate ts, so mysql sink can determine safe-mode.
e.Row.ReplicatingTs = task.tableSink.replicateTs
// For all events, we add table replicate ts, so mysql sink can determine safe-mode.
e.Row.ReplicatingTs = task.tableSink.replicateTs.Load()
x, size = handleRowChangedEvents(w.changefeedID, task.tableID, e)
usedMemSize += size
rows = append(rows, x...)
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/sinkmanager/table_sink_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e
// NOTICE: The event can be filtered by the event filter.
if e.Row != nil {
// For all rows, we add table replicate ts, so mysql sink can determine safe-mode.
e.Row.ReplicatingTs = task.tableSink.replicateTs
e.Row.ReplicatingTs = task.tableSink.GetReplicaTs()
x, size := handleRowChangedEvents(w.changefeedID, task.tableID, e)
events = append(events, x...)
allEventSize += size
Expand Down
25 changes: 18 additions & 7 deletions cdc/processor/sinkmanager/table_sink_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package sinkmanager

import (
"context"
"math"
"sort"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -76,7 +77,7 @@ type tableSinkWrapper struct {
receivedSorterResolvedTs atomic.Uint64

// replicateTs is the ts that the table sink has started to replicate.
replicateTs model.Ts
replicateTs atomic.Uint64
genReplicateTs func(ctx context.Context) (model.Ts, error)

// lastCleanTime indicates the last time the table has been cleaned.
Expand All @@ -89,6 +90,11 @@ type tableSinkWrapper struct {
rangeEventCountsMu sync.Mutex
}

// GetReplicaTs returns the replicate ts of the table sink.
func (t *tableSinkWrapper) GetReplicaTs() model.Ts {
return t.replicateTs.Load()
}

type rangeEventCount struct {
// firstPos and lastPos are used to merge many rangeEventCount into one.
firstPos engine.Position
Expand Down Expand Up @@ -131,31 +137,34 @@ func newTableSinkWrapper(

res.receivedSorterResolvedTs.Store(startTs)
res.barrierTs.Store(startTs)
res.replicateTs.Store(math.MaxUint64)
return res
}

func (t *tableSinkWrapper) start(ctx context.Context, startTs model.Ts) (err error) {
if t.replicateTs != 0 {
if t.replicateTs.Load() != math.MaxUint64 {
log.Panic("The table sink has already started",
zap.String("namespace", t.changefeed.Namespace),
zap.String("changefeed", t.changefeed.ID),
zap.Int64("tableID", t.tableID),
zap.Uint64("startTs", startTs),
zap.Uint64("oldReplicateTs", t.replicateTs),
zap.Uint64("oldReplicateTs", t.replicateTs.Load()),
)
}

// FIXME(qupeng): it can be re-fetched later instead of fails.
if t.replicateTs, err = t.genReplicateTs(ctx); err != nil {
ts, err := t.genReplicateTs(ctx)
if err != nil {
return errors.Trace(err)
}
t.replicateTs.Store(ts)

log.Info("Sink is started",
zap.String("namespace", t.changefeed.Namespace),
zap.String("changefeed", t.changefeed.ID),
zap.Int64("tableID", t.tableID),
zap.Uint64("startTs", startTs),
zap.Uint64("replicateTs", t.replicateTs),
zap.Uint64("replicateTs", ts),
)

// This start ts maybe greater than the initial start ts of the table sink.
Expand Down Expand Up @@ -378,14 +387,16 @@ func (t *tableSinkWrapper) checkTableSinkHealth() (err error) {
// committed at downstream but we don't know. So we need to update `replicateTs`
// of the table so that we can re-send those events later.
func (t *tableSinkWrapper) restart(ctx context.Context) (err error) {
if t.replicateTs, err = t.genReplicateTs(ctx); err != nil {
ts, err := t.genReplicateTs(ctx)
if err != nil {
return errors.Trace(err)
}
t.replicateTs.Store(ts)
log.Info("Sink is restarted",
zap.String("namespace", t.changefeed.Namespace),
zap.String("changefeed", t.changefeed.ID),
zap.Int64("tableID", t.tableID),
zap.Uint64("replicateTs", t.replicateTs))
zap.Uint64("replicateTs", ts))
return nil
}

Expand Down
Loading
Loading