Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

puller(ticdc): fix wrong update splitting behavior after table scheduling #11296

Merged
merged 2 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions cdc/model/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package model

import (
"errors"
"fmt"

"github.com/pingcap/tiflow/cdc/processor/tablepb"
Expand Down Expand Up @@ -110,3 +111,20 @@
func (v *RawKVEntry) ApproximateDataSize() int64 {
return int64(len(v.Key) + len(v.Value) + len(v.OldValue))
}

// ShouldSplitKVEntry checks whether the raw kv entry should be splitted.
type ShouldSplitKVEntry func(raw *RawKVEntry) bool

// SplitUpdateKVEntry splits the raw kv entry into a delete entry and an insert entry.
func SplitUpdateKVEntry(raw *RawKVEntry) (*RawKVEntry, *RawKVEntry, error) {
if raw == nil {
return nil, nil, errors.New("nil event cannot be split")
}
deleteKVEntry := *raw
deleteKVEntry.Value = nil

insertKVEntry := *raw
insertKVEntry.OldValue = nil

return &deleteKVEntry, &insertKVEntry, nil

Check warning on line 129 in cdc/model/kv.go

View check run for this annotation

Codecov / codecov/patch

cdc/model/kv.go#L119-L129

Added lines #L119 - L129 were not covered by tests
}
32 changes: 15 additions & 17 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,13 +223,13 @@
zap.Bool("isPrepare", isPrepare))
}

p.sinkManager.r.AddTable(
table := p.sinkManager.r.AddTable(
span, startTs, p.latestInfo.TargetTs)
if p.redo.r.Enabled() {
p.redo.r.AddTable(span, startTs)
}
p.sourceManager.r.AddTable(span, p.getTableName(ctx, span.TableID), startTs)

p.sourceManager.r.AddTable(span, p.getTableName(ctx, span.TableID), startTs, table.GetReplicaTs)
return true, nil
}

Expand Down Expand Up @@ -488,18 +488,6 @@
return false
}

// needPullerSafeModeAtStart returns true if the scheme is mysql compatible.
// pullerSafeMode means to split all update kv entries whose commitTS
// is older then the start time of this changefeed.
func needPullerSafeModeAtStart(sinkURIStr string) (bool, error) {
sinkURI, err := url.Parse(sinkURIStr)
if err != nil {
return false, cerror.WrapError(cerror.ErrSinkURIInvalid, err)
}
scheme := sink.GetScheme(sinkURI)
return sink.IsMySQLCompatibleScheme(scheme), nil
}

// Tick implements the `orchestrator.State` interface
// the `info` parameter is sent by metadata store, the `info` must be the latest value snapshot.
// the `status` parameter is sent by metadata store, the `status` must be the latest value snapshot.
Expand Down Expand Up @@ -600,6 +588,16 @@
return nil, warning
}

// isMysqlCompatibleBackend returns true if the sinkURIStr is mysql compatible.
func isMysqlCompatibleBackend(sinkURIStr string) (bool, error) {
sinkURI, err := url.Parse(sinkURIStr)
if err != nil {
return false, cerror.WrapError(cerror.ErrSinkURIInvalid, err)
}
scheme := sink.GetScheme(sinkURI)
return sink.IsMySQLCompatibleScheme(scheme), nil

Check warning on line 598 in cdc/processor/processor.go

View check run for this annotation

Codecov / codecov/patch

cdc/processor/processor.go#L592-L598

Added lines #L592 - L598 were not covered by tests
}

// lazyInitImpl create Filter, SchemaStorage, Mounter instances at the first tick.
func (p *processor) lazyInitImpl(etcdCtx context.Context) (err error) {
if p.initialized.Load() {
Expand Down Expand Up @@ -659,22 +657,22 @@
return errors.Trace(err)
}

pullerSafeModeAtStart, err := needPullerSafeModeAtStart(p.latestInfo.SinkURI)
isMysqlBackend, err := isMysqlCompatibleBackend(p.latestInfo.SinkURI)

Check warning on line 660 in cdc/processor/processor.go

View check run for this annotation

Codecov / codecov/patch

cdc/processor/processor.go#L660

Added line #L660 was not covered by tests
if err != nil {
return errors.Trace(err)
}
p.sourceManager.r = sourcemanager.New(
p.changefeedID, p.upstream, p.mg.r,
sortEngine, util.GetOrZero(cfConfig.BDRMode),
util.GetOrZero(cfConfig.EnableTableMonitor),
pullerSafeModeAtStart)
isMysqlBackend)

Check warning on line 668 in cdc/processor/processor.go

View check run for this annotation

Codecov / codecov/patch

cdc/processor/processor.go#L668

Added line #L668 was not covered by tests
p.sourceManager.name = "SourceManager"
p.sourceManager.changefeedID = p.changefeedID
p.sourceManager.spawn(prcCtx)

p.sinkManager.r = sinkmanager.New(
p.changefeedID, p.latestInfo.SinkURI, cfConfig, p.upstream,
p.ddlHandler.r.schemaStorage, p.redo.r, p.sourceManager.r)
p.ddlHandler.r.schemaStorage, p.redo.r, p.sourceManager.r, isMysqlBackend)

Check warning on line 675 in cdc/processor/processor.go

View check run for this annotation

Codecov / codecov/patch

cdc/processor/processor.go#L675

Added line #L675 was not covered by tests
p.sinkManager.name = "SinkManager"
p.sinkManager.changefeedID = p.changefeedID
p.sinkManager.spawn(prcCtx)
Expand Down
15 changes: 12 additions & 3 deletions cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@
// wg is used to wait for all workers to exit.
wg sync.WaitGroup

// isMysqlBackend indicates whether the backend is MySQL compatible.
isMysqlBackend bool

// Metric for table sink.
metricsTableSinkTotalRows prometheus.Counter

Expand All @@ -143,6 +146,7 @@
schemaStorage entry.SchemaStorage,
redoDMLMgr redo.DMLManager,
sourceManager *sourcemanager.SourceManager,
isMysqlBackend bool,
) *SinkManager {
m := &SinkManager{
changefeedID: changefeedID,
Expand All @@ -156,7 +160,7 @@
sinkTaskChan: make(chan *sinkTask),
sinkWorkerAvailable: make(chan struct{}, 1),
sinkRetry: retry.NewInfiniteErrorRetry(),

isMysqlBackend: isMysqlBackend,
metricsTableSinkTotalRows: tablesinkmetrics.TotalRowsCountCounter.
WithLabelValues(changefeedID.Namespace, changefeedID.ID),

Expand Down Expand Up @@ -298,6 +302,11 @@
if cerror.IsDupEntryError(err) {
return errors.Trace(err)
}

if m.isMysqlBackend {
// For MySQL backend, we should restart sink. Let owner to handle the error.
lidezhu marked this conversation as resolved.
Show resolved Hide resolved
return errors.Trace(err)
}

Check warning on line 309 in cdc/processor/sinkmanager/manager.go

View check run for this annotation

Codecov / codecov/patch

cdc/processor/sinkmanager/manager.go#L306-L309

Added lines #L306 - L309 were not covered by tests
}

// If the error is retryable, we should retry to re-establish the internal resources.
Expand Down Expand Up @@ -832,7 +841,7 @@
}

// AddTable adds a table(TableSink) to the sink manager.
func (m *SinkManager) AddTable(span tablepb.Span, startTs model.Ts, targetTs model.Ts) {
func (m *SinkManager) AddTable(span tablepb.Span, startTs model.Ts, targetTs model.Ts) *tableSinkWrapper {
sinkWrapper := newTableSinkWrapper(
m.changefeedID,
span,
Expand Down Expand Up @@ -860,7 +869,6 @@
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Stringer("span", &span))
return
}
m.sinkMemQuota.AddTable(span)
m.redoMemQuota.AddTable(span)
Expand All @@ -870,6 +878,7 @@
zap.Stringer("span", &span),
zap.Uint64("startTs", startTs),
zap.Uint64("version", sinkWrapper.version))
return sinkWrapper
}

// StartTable sets the table(TableSink) state to replicating.
Expand Down
4 changes: 2 additions & 2 deletions cdc/processor/sinkmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func TestAddTable(t *testing.T) {
require.Equal(t, 0, manager.sinkProgressHeap.len(), "Not started table shout not in progress heap")
err := manager.StartTable(span, 1)
require.NoError(t, err)
require.Equal(t, uint64(0x7ffffffffffbffff), tableSink.(*tableSinkWrapper).replicateTs)
require.Equal(t, uint64(0x7ffffffffffbffff), tableSink.(*tableSinkWrapper).replicateTs.Load())

progress := manager.sinkProgressHeap.pop()
require.Equal(t, span, progress.span)
Expand Down Expand Up @@ -359,7 +359,7 @@ func TestSinkManagerRunWithErrors(t *testing.T) {

span := spanz.TableIDToComparableSpan(1)

source.AddTable(span, "test", 100)
source.AddTable(span, "test", 100, func() model.Ts { return 0 })
manager.AddTable(span, 100, math.MaxUint64)
manager.StartTable(span, 100)
source.Add(span, model.NewResolvedPolymorphicEvent(0, 101))
Expand Down
4 changes: 2 additions & 2 deletions cdc/processor/sinkmanager/manager_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
sourceManager.WaitForReady(ctx)

sinkManager := New(changefeedID, changefeedInfo.SinkURI,
changefeedInfo.Config, up, schemaStorage, nil, sourceManager)
changefeedInfo.Config, up, schemaStorage, nil, sourceManager, false)
go func() { handleError(sinkManager.Run(ctx)) }()
sinkManager.WaitForReady(ctx)

Expand All @@ -92,6 +92,6 @@
schemaStorage := &entry.MockSchemaStorage{Resolved: math.MaxUint64}
sourceManager := sourcemanager.NewForTest(changefeedID, up, mg, sortEngine, false)
sinkManager := New(changefeedID, changefeedInfo.SinkURI,
changefeedInfo.Config, up, schemaStorage, redoMgr, sourceManager)
changefeedInfo.Config, up, schemaStorage, redoMgr, sourceManager, false)

Check warning on line 95 in cdc/processor/sinkmanager/manager_test_helper.go

View check run for this annotation

Codecov / codecov/patch

cdc/processor/sinkmanager/manager_test_helper.go#L95

Added line #L95 was not covered by tests
return sinkManager, sourceManager, sortEngine
}
2 changes: 1 addition & 1 deletion cdc/processor/sinkmanager/redo_log_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (w *redoWorker) handleTask(ctx context.Context, task *redoTask) (finalErr e
// NOTICE: The event can be filtered by the event filter.
if e.Row != nil {
// For all events, we add table replicate ts, so mysql sink can determine safe-mode.
e.Row.ReplicatingTs = task.tableSink.replicateTs
e.Row.ReplicatingTs = task.tableSink.replicateTs.Load()
x, size = handleRowChangedEvents(w.changefeedID, task.span, e)
advancer.appendEvents(x, size)
}
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/sinkmanager/table_sink_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e
// NOTICE: The event can be filtered by the event filter.
if e.Row != nil {
// For all rows, we add table replicate ts, so mysql sink can determine safe-mode.
e.Row.ReplicatingTs = task.tableSink.replicateTs
e.Row.ReplicatingTs = task.tableSink.GetReplicaTs()
x, size := handleRowChangedEvents(w.changefeedID, task.span, e)
advancer.appendEvents(x, size)
}
Expand Down
25 changes: 18 additions & 7 deletions cdc/processor/sinkmanager/table_sink_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import (
"context"
"math"
"sort"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -77,7 +78,7 @@
receivedSorterResolvedTs atomic.Uint64

// replicateTs is the ts that the table sink has started to replicate.
replicateTs model.Ts
replicateTs atomic.Uint64
genReplicateTs func(ctx context.Context) (model.Ts, error)

// lastCleanTime indicates the last time the table has been cleaned.
Expand All @@ -90,6 +91,11 @@
rangeEventCountsMu sync.Mutex
}

// GetReplicaTs returns the replicate ts of the table sink.
func (t *tableSinkWrapper) GetReplicaTs() model.Ts {
return t.replicateTs.Load()
}

type rangeEventCount struct {
// firstPos and lastPos are used to merge many rangeEventCount into one.
firstPos sorter.Position
Expand Down Expand Up @@ -132,31 +138,34 @@

res.receivedSorterResolvedTs.Store(startTs)
res.barrierTs.Store(startTs)
res.replicateTs.Store(math.MaxUint64)
return res
}

func (t *tableSinkWrapper) start(ctx context.Context, startTs model.Ts) (err error) {
if t.replicateTs != 0 {
if t.replicateTs.Load() != math.MaxUint64 {
log.Panic("The table sink has already started",
zap.String("namespace", t.changefeed.Namespace),
zap.String("changefeed", t.changefeed.ID),
zap.Stringer("span", &t.span),
zap.Uint64("startTs", startTs),
zap.Uint64("oldReplicateTs", t.replicateTs),
zap.Uint64("oldReplicateTs", t.replicateTs.Load()),

Check warning on line 152 in cdc/processor/sinkmanager/table_sink_wrapper.go

View check run for this annotation

Codecov / codecov/patch

cdc/processor/sinkmanager/table_sink_wrapper.go#L152

Added line #L152 was not covered by tests
)
}

// FIXME(qupeng): it can be re-fetched later instead of fails.
if t.replicateTs, err = t.genReplicateTs(ctx); err != nil {
ts, err := t.genReplicateTs(ctx)
if err != nil {
return errors.Trace(err)
}
t.replicateTs.Store(ts)

log.Info("Sink is started",
zap.String("namespace", t.changefeed.Namespace),
zap.String("changefeed", t.changefeed.ID),
zap.Stringer("span", &t.span),
zap.Uint64("startTs", startTs),
zap.Uint64("replicateTs", t.replicateTs),
zap.Uint64("replicateTs", ts),
)

// This start ts maybe greater than the initial start ts of the table sink.
Expand Down Expand Up @@ -379,14 +388,16 @@
// committed at downstream but we don't know. So we need to update `replicateTs`
// of the table so that we can re-send those events later.
func (t *tableSinkWrapper) restart(ctx context.Context) (err error) {
if t.replicateTs, err = t.genReplicateTs(ctx); err != nil {
ts, err := t.genReplicateTs(ctx)
if err != nil {
return errors.Trace(err)
}
t.replicateTs.Store(ts)
log.Info("Sink is restarted",
zap.String("namespace", t.changefeed.Namespace),
zap.String("changefeed", t.changefeed.ID),
zap.Stringer("span", &t.span),
zap.Uint64("replicateTs", t.replicateTs))
zap.Uint64("replicateTs", ts))
return nil
}

Expand Down
Loading
Loading