From 434508b718c3e2d5d963554030d35c1ec2413b3e Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Wed, 18 May 2022 11:40:03 +0800 Subject: [PATCH] add batch field to resolved ts --- cdc/entry/mounter.go | 2 +- cdc/model/mounter.go | 47 ++++++++++++++++++++-- cdc/processor/pipeline/cyclic_mark.go | 2 +- cdc/processor/pipeline/cyclic_mark_test.go | 4 +- cdc/processor/pipeline/sink.go | 42 ++++++++++--------- cdc/processor/pipeline/sink_test.go | 24 ++++++----- cdc/processor/pipeline/table.go | 2 +- cdc/processor/pipeline/table_actor.go | 2 +- cdc/processor/pipeline/table_actor_test.go | 35 +++++++++------- cdc/sink/black_hole.go | 10 +++-- cdc/sink/mq/mq.go | 24 ++++++----- cdc/sink/mq/mq_flush_worker.go | 10 ++--- cdc/sink/mq/mq_flush_worker_test.go | 14 +++---- cdc/sink/mq/mq_test.go | 16 +++++--- cdc/sink/mysql/mysql.go | 36 +++++++++++------ cdc/sink/mysql/mysql_test.go | 16 ++++---- cdc/sink/mysql/simple_mysql_tester.go | 8 ++-- cdc/sink/mysql/txn_cache.go | 4 +- cdc/sink/mysql/txn_cache_test.go | 10 ++--- cdc/sink/sink.go | 12 ++++-- cdc/sink/table_sink.go | 21 ++++++---- cdc/sorter/leveldb/writer.go | 3 +- cdc/sorter/memory/entry_sorter.go | 2 +- cdc/sorter/memory/entry_sorter_test.go | 4 +- cdc/sorter/unified/heap_sorter.go | 4 +- cdc/sorter/unified/merger.go | 2 +- cdc/sorter/unified/unified_sorter.go | 2 +- cmd/kafka-consumer/main.go | 3 +- pkg/applier/redo.go | 4 +- 29 files changed, 225 insertions(+), 140 deletions(-) diff --git a/cdc/entry/mounter.go b/cdc/entry/mounter.go index b50f18a72b7..aa48a7f9b9a 100644 --- a/cdc/entry/mounter.go +++ b/cdc/entry/mounter.go @@ -101,7 +101,7 @@ func NewMounter(schemaStorage SchemaStorage, // this method could block indefinitely if the DDL puller is lagging. func (m *mounterImpl) DecodeEvent(ctx context.Context, pEvent *model.PolymorphicEvent) error { m.metricTotalRows.Inc() - if pEvent.RawKV.OpType == model.OpTypeResolved { + if pEvent.IsResolved() { return nil } start := time.Now() diff --git a/cdc/model/mounter.go b/cdc/model/mounter.go index 9998d6eb5d2..38f38bc2cc4 100644 --- a/cdc/model/mounter.go +++ b/cdc/model/mounter.go @@ -18,6 +18,8 @@ type PolymorphicEvent struct { StartTs uint64 // Commit or resolved TS CRTs uint64 + // Identify whether the resolved event is in batch mode. + Mode ResolvedMode RawKV *RawKVEntry Row *RowChangedEvent @@ -44,7 +46,7 @@ func NewPolymorphicEvent(rawKV *RawKVEntry) *PolymorphicEvent { } } -// NewResolvedPolymorphicEvent creates a new PolymorphicEvent with the resolved ts +// NewResolvedPolymorphicEvent creates a new PolymorphicEvent with the resolved ts. func NewResolvedPolymorphicEvent(regionID uint64, resolvedTs uint64) *PolymorphicEvent { return &PolymorphicEvent{ CRTs: resolvedTs, @@ -58,13 +60,24 @@ func (e *PolymorphicEvent) RegionID() uint64 { return e.RawKV.RegionID } +// IsResolved returns true if the event is resolved. Note that this function can +// only be called when `RawKV != nil`. +func (e *PolymorphicEvent) IsResolved() bool { + return e.RawKV.OpType == OpTypeResolved +} + +// IsBatchResolved returns true if the event is batch resolved event. +func (e *PolymorphicEvent) IsBatchResolved() bool { + return e.IsResolved() && e.Mode == BatchResolvedMode +} + // ComparePolymorphicEvents compares two events by CRTs, Resolved, StartTs, Delete/Put order. // It returns true if and only if i should precede j. func ComparePolymorphicEvents(i, j *PolymorphicEvent) bool { if i.CRTs == j.CRTs { - if i.RawKV.OpType == OpTypeResolved { + if i.IsResolved() { return false - } else if j.RawKV.OpType == OpTypeResolved { + } else if j.IsResolved() { return true } @@ -80,3 +93,31 @@ func ComparePolymorphicEvents(i, j *PolymorphicEvent) bool { } return i.CRTs < j.CRTs } + +// ResolvedMode describes the batch type of a resolved event. +type ResolvedMode int + +const ( + // NormalResolvedMode means that all events whose commitTs is less than or equal to + // `resolved.Ts` are sent to Sink. + NormalResolvedMode ResolvedMode = iota + // BatchResolvedMode means that all events whose commitTs is less than + // 'resolved.Ts' are sent to Sink. + BatchResolvedMode +) + +// ResolvedTs is the resolved timestamp of sink module. +type ResolvedTs struct { + Ts uint64 + Mode ResolvedMode +} + +// NewResolvedTs creates a new ResolvedTs. +func NewResolvedTs(t uint64) ResolvedTs { + return ResolvedTs{Ts: t, Mode: NormalResolvedMode} +} + +// NewResolvedTsWithMode creates a ResolvedTs with a given batch type. +func NewResolvedTsWithMode(t uint64, m ResolvedMode) ResolvedTs { + return ResolvedTs{Ts: t, Mode: m} +} diff --git a/cdc/processor/pipeline/cyclic_mark.go b/cdc/processor/pipeline/cyclic_mark.go index 5df9e5fa9c8..010cf3b01b9 100644 --- a/cdc/processor/pipeline/cyclic_mark.go +++ b/cdc/processor/pipeline/cyclic_mark.go @@ -95,7 +95,7 @@ func (n *cyclicMarkNode) TryHandleDataMessage( case pmessage.MessageTypePolymorphicEvent: event := msg.PolymorphicEvent n.flush(ctx, event.CRTs) - if event.RawKV.OpType == model.OpTypeResolved { + if event.IsResolved() { ctx.SendToNextNode(msg) return true, nil } diff --git a/cdc/processor/pipeline/cyclic_mark_test.go b/cdc/processor/pipeline/cyclic_mark_test.go index 84d1f4ad09e..c5552ab9f72 100644 --- a/cdc/processor/pipeline/cyclic_mark_test.go +++ b/cdc/processor/pipeline/cyclic_mark_test.go @@ -171,7 +171,7 @@ func TestCyclicMarkNode(t *testing.T) { go func() { defer wg.Done() for row := range outputCh { - if row.PolymorphicEvent.RawKV.OpType == model.OpTypeResolved { + if row.PolymorphicEvent.IsResolved() { continue } output = append(output, row.PolymorphicEvent.Row) @@ -213,7 +213,7 @@ func TestCyclicMarkNode(t *testing.T) { require.Nil(t, err) output := []*model.RowChangedEvent{} putToOutput := func(row *pmessage.Message) { - if row == nil || row.PolymorphicEvent.RawKV.OpType == model.OpTypeResolved { + if row == nil || row.PolymorphicEvent.IsResolved() { return } output = append(output, row.PolymorphicEvent.Row) diff --git a/cdc/processor/pipeline/sink.go b/cdc/processor/pipeline/sink.go index e87bf4c9859..37ecd559762 100755 --- a/cdc/processor/pipeline/sink.go +++ b/cdc/processor/pipeline/sink.go @@ -71,7 +71,8 @@ type sinkNode struct { status TableStatus tableID model.TableID - resolvedTs model.Ts + // atomic oprations for model.ResolvedTs + resolvedTs atomic.Value checkpointTs model.Ts targetTs model.Ts barrierTs model.Ts @@ -83,23 +84,24 @@ type sinkNode struct { } func newSinkNode(tableID model.TableID, sink sink.Sink, startTs model.Ts, targetTs model.Ts, flowController tableFlowController) *sinkNode { - return &sinkNode{ + sn := &sinkNode{ tableID: tableID, sink: sink, status: TableStatusInitializing, targetTs: targetTs, - resolvedTs: startTs, checkpointTs: startTs, barrierTs: startTs, flowController: flowController, } + sn.resolvedTs.Store(model.NewResolvedTs(startTs)) + return sn } -func (n *sinkNode) ResolvedTs() model.Ts { return atomic.LoadUint64(&n.resolvedTs) } -func (n *sinkNode) CheckpointTs() model.Ts { return atomic.LoadUint64(&n.checkpointTs) } -func (n *sinkNode) BarrierTs() model.Ts { return atomic.LoadUint64(&n.barrierTs) } -func (n *sinkNode) Status() TableStatus { return n.status.Load() } +func (n *sinkNode) ResolvedTs() model.ResolvedTs { return n.resolvedTs.Load().(model.ResolvedTs) } +func (n *sinkNode) CheckpointTs() model.Ts { return atomic.LoadUint64(&n.checkpointTs) } +func (n *sinkNode) BarrierTs() model.Ts { return atomic.LoadUint64(&n.barrierTs) } +func (n *sinkNode) Status() TableStatus { return n.status.Load() } func (n *sinkNode) Init(ctx pipeline.NodeContext) error { n.replicaConfig = ctx.ChangefeedVars().Info.Config @@ -129,7 +131,7 @@ func (n *sinkNode) stop(ctx context.Context) (err error) { // flushSink emits all rows in rowBuffer to the backend sink and flushes // the backend sink. -func (n *sinkNode) flushSink(ctx context.Context, resolvedTs model.Ts) (err error) { +func (n *sinkNode) flushSink(ctx context.Context, resolved model.ResolvedTs) (err error) { defer func() { if err != nil { n.status.Store(TableStatusStopped) @@ -141,16 +143,16 @@ func (n *sinkNode) flushSink(ctx context.Context, resolvedTs model.Ts) (err erro }() currentBarrierTs := atomic.LoadUint64(&n.barrierTs) currentCheckpointTs := atomic.LoadUint64(&n.checkpointTs) - if resolvedTs > currentBarrierTs { - resolvedTs = currentBarrierTs + if resolved.Ts > currentBarrierTs { + resolved.Ts = currentBarrierTs } - if resolvedTs > n.targetTs { - resolvedTs = n.targetTs + if resolved.Ts > n.targetTs { + resolved.Ts = n.targetTs } - if resolvedTs <= currentCheckpointTs { + if resolved.Ts <= currentCheckpointTs { return nil } - checkpointTs, err := n.sink.FlushRowChangedEvents(ctx, n.tableID, resolvedTs) + checkpointTs, err := n.sink.FlushRowChangedEvents(ctx, n.tableID, resolved) if err != nil { return errors.Trace(err) } @@ -291,24 +293,26 @@ func (n *sinkNode) HandleMessage(ctx context.Context, msg pmessage.Message) (boo switch msg.Tp { case pmessage.MessageTypePolymorphicEvent: event := msg.PolymorphicEvent - if event.RawKV.OpType == model.OpTypeResolved { + if event.IsResolved() { if n.status.Load() == TableStatusInitializing { n.status.Store(TableStatusRunning) } failpoint.Inject("ProcessorSyncResolvedError", func() { failpoint.Return(false, errors.New("processor sync resolved injected error")) }) - if err := n.flushSink(ctx, msg.PolymorphicEvent.CRTs); err != nil { + + resolved := model.NewResolvedTsWithMode(event.CRTs, event.Mode) + if err := n.flushSink(ctx, resolved); err != nil { return false, errors.Trace(err) } - atomic.StoreUint64(&n.resolvedTs, msg.PolymorphicEvent.CRTs) + n.resolvedTs.Store(resolved) return true, nil } if err := n.emitRowToSink(ctx, event); err != nil { return false, errors.Trace(err) } case pmessage.MessageTypeTick: - if err := n.flushSink(ctx, atomic.LoadUint64(&n.resolvedTs)); err != nil { + if err := n.flushSink(ctx, n.ResolvedTs()); err != nil { return false, errors.Trace(err) } case pmessage.MessageTypeCommand: @@ -327,7 +331,7 @@ func (n *sinkNode) HandleMessage(ctx context.Context, msg pmessage.Message) (boo func (n *sinkNode) updateBarrierTs(ctx context.Context, ts model.Ts) error { atomic.StoreUint64(&n.barrierTs, ts) - if err := n.flushSink(ctx, atomic.LoadUint64(&n.resolvedTs)); err != nil { + if err := n.flushSink(ctx, n.ResolvedTs()); err != nil { return errors.Trace(err) } return nil diff --git a/cdc/processor/pipeline/sink_test.go b/cdc/processor/pipeline/sink_test.go index 87766150120..0c8f5ced58d 100644 --- a/cdc/processor/pipeline/sink_test.go +++ b/cdc/processor/pipeline/sink_test.go @@ -76,12 +76,14 @@ func (s *mockSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error panic("unreachable") } -func (s *mockSink) FlushRowChangedEvents(ctx context.Context, _ model.TableID, resolvedTs uint64) (uint64, error) { +func (s *mockSink) FlushRowChangedEvents( + ctx context.Context, _ model.TableID, resolved model.ResolvedTs, +) (uint64, error) { s.received = append(s.received, struct { resolvedTs model.Ts row *model.RowChangedEvent - }{resolvedTs: resolvedTs}) - return resolvedTs, nil + }{resolvedTs: resolved.Ts}) + return resolved.Ts, nil } func (s *mockSink) EmitCheckpointTs(_ context.Context, _ uint64, _ []model.TableName) error { @@ -420,7 +422,7 @@ func TestManyTs(t *testing.T) { {resolvedTs: 1}, }) sink.Reset() - require.Equal(t, uint64(2), node.ResolvedTs()) + require.Equal(t, model.NewResolvedTs(uint64(2)), node.ResolvedTs()) require.Equal(t, uint64(1), node.CheckpointTs()) require.Nil(t, node.Receive( @@ -433,7 +435,7 @@ func TestManyTs(t *testing.T) { {resolvedTs: 2}, }) sink.Reset() - require.Equal(t, uint64(2), node.ResolvedTs()) + require.Equal(t, model.NewResolvedTs(uint64(2)), node.ResolvedTs()) require.Equal(t, uint64(2), node.CheckpointTs()) } @@ -646,11 +648,13 @@ type flushSink struct { // fall back var fallBackResolvedTs = uint64(10) -func (s *flushSink) FlushRowChangedEvents(ctx context.Context, _ model.TableID, resolvedTs uint64) (uint64, error) { - if resolvedTs == fallBackResolvedTs { +func (s *flushSink) FlushRowChangedEvents( + ctx context.Context, _ model.TableID, resolved model.ResolvedTs, +) (uint64, error) { + if resolved.Ts == fallBackResolvedTs { return 0, nil } - return resolvedTs, nil + return resolved.Ts, nil } // TestFlushSinkReleaseFlowController tests sinkNode.flushSink method will always @@ -674,12 +678,12 @@ func TestFlushSinkReleaseFlowController(t *testing.T) { require.Nil(t, sNode.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil))) sNode.barrierTs = 10 - err := sNode.flushSink(context.Background(), uint64(8)) + err := sNode.flushSink(context.Background(), model.NewResolvedTs(uint64(8))) require.Nil(t, err) require.Equal(t, uint64(8), sNode.checkpointTs) require.Equal(t, 1, flowController.releaseCounter) // resolvedTs will fall back in this call - err = sNode.flushSink(context.Background(), uint64(10)) + err = sNode.flushSink(context.Background(), model.NewResolvedTs(uint64(10))) require.Nil(t, err) require.Equal(t, uint64(8), sNode.checkpointTs) require.Equal(t, 2, flowController.releaseCounter) diff --git a/cdc/processor/pipeline/table.go b/cdc/processor/pipeline/table.go index 556c08ee1b3..6aad666d1df 100644 --- a/cdc/processor/pipeline/table.go +++ b/cdc/processor/pipeline/table.go @@ -91,7 +91,7 @@ func (t *tablePipelineImpl) ResolvedTs() model.Ts { // another replication barrier for consistent replication instead of reusing // the global resolved-ts. if redo.IsConsistentEnabled(t.replConfig.Consistent.Level) { - return t.sinkNode.ResolvedTs() + return t.sinkNode.ResolvedTs().Ts } return t.sorterNode.ResolvedTs() } diff --git a/cdc/processor/pipeline/table_actor.go b/cdc/processor/pipeline/table_actor.go index d6a1bb35f9f..6c28c9e6215 100644 --- a/cdc/processor/pipeline/table_actor.go +++ b/cdc/processor/pipeline/table_actor.go @@ -429,7 +429,7 @@ func (t *tableActor) ResolvedTs() model.Ts { // another replication barrier for consistent replication instead of reusing // the global resolved-ts. if redo.IsConsistentEnabled(t.replicaConfig.Consistent.Level) { - return t.sinkNode.ResolvedTs() + return t.sinkNode.ResolvedTs().Ts } return t.sortNode.ResolvedTs() } diff --git a/cdc/processor/pipeline/table_actor_test.go b/cdc/processor/pipeline/table_actor_test.go index 92f25cef6a9..0f80a4e9b6f 100644 --- a/cdc/processor/pipeline/table_actor_test.go +++ b/cdc/processor/pipeline/table_actor_test.go @@ -91,7 +91,7 @@ func TestTableActorInterface(t *testing.T) { require.Equal(t, model.Ts(5), tbl.ResolvedTs()) tbl.replicaConfig.Consistent.Level = string(redo.ConsistentLevelEventual) - atomic.StoreUint64(&sink.resolvedTs, 6) + sink.resolvedTs.Store(model.NewResolvedTs(6)) require.Equal(t, model.Ts(6), tbl.ResolvedTs()) } @@ -184,15 +184,18 @@ func TestPollStoppedActor(t *testing.T) { func TestPollTickMessage(t *testing.T) { startTime := time.Now().Add(-sinkFlushInterval) + + sn := &sinkNode{ + status: TableStatusInitializing, + sink: &mockSink{}, + flowController: &mockFlowController{}, + checkpointTs: 10, + targetTs: 11, + } + sn.resolvedTs.Store(model.NewResolvedTs(10)) + tbl := tableActor{ - sinkNode: &sinkNode{ - status: TableStatusInitializing, - sink: &mockSink{}, - flowController: &mockFlowController{}, - resolvedTs: 10, - checkpointTs: 10, - targetTs: 11, - }, + sinkNode: sn, lastFlushSinkTime: time.Now().Add(-2 * sinkFlushInterval), cancel: func() {}, reportErr: func(err error) {}, @@ -235,13 +238,15 @@ func TestPollStopMessage(t *testing.T) { } func TestPollBarrierTsMessage(t *testing.T) { + sn := &sinkNode{ + targetTs: 10, + checkpointTs: 5, + barrierTs: 8, + } + sn.resolvedTs.Store(model.NewResolvedTs(5)) + tbl := tableActor{ - sinkNode: &sinkNode{ - targetTs: 10, - checkpointTs: 5, - resolvedTs: 5, - barrierTs: 8, - }, + sinkNode: sn, sortNode: &sorterNode{ barrierTs: 8, }, diff --git a/cdc/sink/black_hole.go b/cdc/sink/black_hole.go index 23054c304ef..e00394daee4 100644 --- a/cdc/sink/black_hole.go +++ b/cdc/sink/black_hole.go @@ -37,6 +37,8 @@ type blackHoleSink struct { lastAccumulated uint64 } +var _ Sink = (*blackHoleSink)(nil) + func (b *blackHoleSink) AddTable(tableID model.TableID) error { return nil } @@ -51,8 +53,10 @@ func (b *blackHoleSink) EmitRowChangedEvents(ctx context.Context, rows ...*model return nil } -func (b *blackHoleSink) FlushRowChangedEvents(ctx context.Context, _ model.TableID, resolvedTs uint64) (uint64, error) { - log.Debug("BlockHoleSink: FlushRowChangedEvents", zap.Uint64("resolvedTs", resolvedTs)) +func (b *blackHoleSink) FlushRowChangedEvents( + ctx context.Context, _ model.TableID, resolved model.ResolvedTs, +) (uint64, error) { + log.Debug("BlockHoleSink: FlushRowChangedEvents", zap.Uint64("resolvedTs", resolved.Ts)) err := b.statistics.RecordBatchExecution(func() (int, error) { // TODO: add some random replication latency accumulated := atomic.LoadUint64(&b.accumulated) @@ -61,7 +65,7 @@ func (b *blackHoleSink) FlushRowChangedEvents(ctx context.Context, _ model.Table return int(batchSize), nil }) b.statistics.PrintStatus(ctx) - return resolvedTs, err + return resolved.Ts, err } func (b *blackHoleSink) EmitCheckpointTs(ctx context.Context, ts uint64, tables []model.TableName) error { diff --git a/cdc/sink/mq/mq.go b/cdc/sink/mq/mq.go index 07b70952a55..5487631a821 100644 --- a/cdc/sink/mq/mq.go +++ b/cdc/sink/mq/mq.go @@ -42,8 +42,8 @@ import ( ) type resolvedTsEvent struct { - tableID model.TableID - resolvedTs model.Ts + tableID model.TableID + resolved model.ResolvedTs } const ( @@ -179,21 +179,23 @@ func (k *mqSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowCha // that the data before the resolvedTs has been // successfully written downstream. // FlushRowChangedEvents is thread-safe. -func (k *mqSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) { +func (k *mqSink) FlushRowChangedEvents( + ctx context.Context, tableID model.TableID, resolved model.ResolvedTs, +) (uint64, error) { var checkpointTs uint64 v, ok := k.tableCheckpointTsMap.Load(tableID) if ok { checkpointTs = v.(uint64) } - if resolvedTs <= checkpointTs { + if resolved.Ts <= checkpointTs { return checkpointTs, nil } select { case <-ctx.Done(): return 0, ctx.Err() case k.resolvedBuffer <- resolvedTsEvent{ - tableID: tableID, - resolvedTs: resolvedTs, + tableID: tableID, + resolved: model.NewResolvedTs(resolved.Ts), }: } k.statistics.PrintStatus(ctx) @@ -207,21 +209,21 @@ func (k *mqSink) bgFlushTs(ctx context.Context) error { case <-ctx.Done(): return errors.Trace(ctx.Err()) case msg := <-k.resolvedBuffer: - resolvedTs := msg.resolvedTs - err := k.flushTsToWorker(ctx, resolvedTs) + resolved := msg.resolved + err := k.flushTsToWorker(ctx, resolved) if err != nil { return errors.Trace(err) } // Since CDC does not guarantee exactly once semantic, it won't cause any problem // here even if the table was moved or removed. // ref: https://github.com/pingcap/tiflow/pull/4356#discussion_r787405134 - k.tableCheckpointTsMap.Store(msg.tableID, resolvedTs) + k.tableCheckpointTsMap.Store(msg.tableID, resolved.Ts) } } } -func (k *mqSink) flushTsToWorker(ctx context.Context, resolvedTs model.Ts) error { - if err := k.flushWorker.addEvent(ctx, mqEvent{resolvedTs: resolvedTs}); err != nil { +func (k *mqSink) flushTsToWorker(ctx context.Context, resolved model.ResolvedTs) error { + if err := k.flushWorker.addEvent(ctx, mqEvent{resolved: resolved}); err != nil { if errors.Cause(err) != context.Canceled { log.Warn("failed to flush TS to worker", zap.Error(err)) } else { diff --git a/cdc/sink/mq/mq_flush_worker.go b/cdc/sink/mq/mq_flush_worker.go index dbe3e9cfa75..c467ae0d8df 100644 --- a/cdc/sink/mq/mq_flush_worker.go +++ b/cdc/sink/mq/mq_flush_worker.go @@ -43,9 +43,9 @@ type topicPartitionKey struct { // It carries the partition information of the message, // and it is also used as resolved ts messaging. type mqEvent struct { - key topicPartitionKey - row *model.RowChangedEvent - resolvedTs model.Ts + key topicPartitionKey + row *model.RowChangedEvent + resolved model.ResolvedTs } // flushWorker is responsible for sending messages to the Kafka producer on a batch basis. @@ -98,7 +98,7 @@ func (w *flushWorker) batch( case msg := <-w.msgChan: // When the resolved ts is received, // we need to write the previous data to the producer as soon as possible. - if msg.resolvedTs != 0 { + if msg.resolved.Ts != 0 { w.needSyncFlush = true return index, nil } @@ -116,7 +116,7 @@ func (w *flushWorker) batch( case <-ctx.Done(): return index, ctx.Err() case msg := <-w.msgChan: - if msg.resolvedTs != 0 { + if msg.resolved.Ts != 0 { w.needSyncFlush = true return index, nil } diff --git a/cdc/sink/mq/mq_flush_worker_test.go b/cdc/sink/mq/mq_flush_worker_test.go index 70b1f320fec..c8ba0adc460 100644 --- a/cdc/sink/mq/mq_flush_worker_test.go +++ b/cdc/sink/mq/mq_flush_worker_test.go @@ -114,7 +114,7 @@ func TestBatch(t *testing.T) { name: "Normal batching", events: []mqEvent{ { - resolvedTs: 0, + resolved: model.NewResolvedTs(0), }, { row: &model.RowChangedEvent{ @@ -139,7 +139,7 @@ func TestBatch(t *testing.T) { name: "No row change events", events: []mqEvent{ { - resolvedTs: 1, + resolved: model.NewResolvedTs(1), }, }, expectedN: 0, @@ -156,7 +156,7 @@ func TestBatch(t *testing.T) { key: key, }, { - resolvedTs: 1, + resolved: model.NewResolvedTs(1), }, { row: &model.RowChangedEvent{ @@ -384,7 +384,7 @@ func TestFlush(t *testing.T) { key: key1, }, { - resolvedTs: 1, + resolved: model.NewResolvedTs(1), }, } @@ -461,15 +461,15 @@ func TestProducerError(t *testing.T) { }, }) require.NoError(t, err) - err = worker.addEvent(ctx, mqEvent{resolvedTs: 100}) + err = worker.addEvent(ctx, mqEvent{resolved: model.NewResolvedTs(100)}) require.NoError(t, err) wg.Wait() - err = worker.addEvent(ctx, mqEvent{resolvedTs: 200}) + err = worker.addEvent(ctx, mqEvent{resolved: model.NewResolvedTs(200)}) require.Error(t, err) require.Regexp(t, ".*fake.*", err.Error()) - err = worker.addEvent(ctx, mqEvent{resolvedTs: 300}) + err = worker.addEvent(ctx, mqEvent{resolved: model.NewResolvedTs(300)}) require.Error(t, err) require.Regexp(t, ".*ErrMQWorkerClosed.*", err.Error()) } diff --git a/cdc/sink/mq/mq_test.go b/cdc/sink/mq/mq_test.go index d07428c31e3..e01e6c53daf 100644 --- a/cdc/sink/mq/mq_test.go +++ b/cdc/sink/mq/mq_test.go @@ -93,11 +93,11 @@ func (s mqSinkSuite) TestKafkaSink(c *check.C) { } err = sink.EmitRowChangedEvents(ctx, row) c.Assert(err, check.IsNil) - checkpointTs, err := sink.FlushRowChangedEvents(ctx, tableID, uint64(120)) + checkpointTs, err := sink.FlushRowChangedEvents(ctx, tableID, model.NewResolvedTs(uint64(120))) c.Assert(err, check.IsNil) c.Assert(checkpointTs, check.Equals, uint64(120)) // flush older resolved ts - checkpointTs, err = sink.FlushRowChangedEvents(ctx, tableID, uint64(110)) + checkpointTs, err = sink.FlushRowChangedEvents(ctx, tableID, model.NewResolvedTs(uint64(110))) c.Assert(err, check.IsNil) c.Assert(checkpointTs, check.Equals, uint64(120)) @@ -324,20 +324,24 @@ func (s mqSinkSuite) TestFlushRowChangedEvents(c *check.C) { // mock kafka broker processes 1 row resolvedTs event leader.Returns(prodSuccess) - checkpointTs1, err := sink.FlushRowChangedEvents(ctx, tableID1, row1.CommitTs) + checkpointTs1, err := sink.FlushRowChangedEvents(ctx, + tableID1, model.NewResolvedTs(row1.CommitTs)) c.Assert(err, check.IsNil) c.Assert(checkpointTs1, check.Equals, row1.CommitTs) - checkpointTs2, err := sink.FlushRowChangedEvents(ctx, tableID2, row2.CommitTs) + checkpointTs2, err := sink.FlushRowChangedEvents(ctx, + tableID2, model.NewResolvedTs(row2.CommitTs)) c.Assert(err, check.IsNil) c.Assert(checkpointTs2, check.Equals, row2.CommitTs) - checkpointTs3, err := sink.FlushRowChangedEvents(ctx, tableID3, row3.CommitTs) + checkpointTs3, err := sink.FlushRowChangedEvents(ctx, + tableID3, model.NewResolvedTs(row3.CommitTs)) c.Assert(err, check.IsNil) c.Assert(checkpointTs3, check.Equals, row3.CommitTs) // flush older resolved ts - checkpointTsOld, err := sink.FlushRowChangedEvents(ctx, tableID1, uint64(110)) + checkpointTsOld, err := sink.FlushRowChangedEvents(ctx, tableID1, + model.NewResolvedTs(uint64(110))) c.Assert(err, check.IsNil) c.Assert(checkpointTsOld, check.Equals, row1.CommitTs) diff --git a/cdc/sink/mysql/mysql.go b/cdc/sink/mysql/mysql.go index a01089ba9cf..6c791b3f310 100644 --- a/cdc/sink/mysql/mysql.go +++ b/cdc/sink/mysql/mysql.go @@ -225,10 +225,12 @@ func (s *mysqlSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.Row // FlushRowChangedEvents will flush all received events, // we do not write data downstream until we receive resolvedTs. // Concurrency Note: FlushRowChangedEvents is thread-safe. -func (s *mysqlSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) { - v, ok := s.tableMaxResolvedTs.Load(tableID) - if !ok || v.(uint64) < resolvedTs { - s.tableMaxResolvedTs.Store(tableID, resolvedTs) +func (s *mysqlSink) FlushRowChangedEvents( + ctx context.Context, tableID model.TableID, resolved model.ResolvedTs, +) (uint64, error) { + v, ok := s.getTableResolvedTs(tableID) + if !ok || v.Ts < resolved.Ts { + s.tableMaxResolvedTs.Store(tableID, resolved) } // check and throw error @@ -506,10 +508,10 @@ func (s *mysqlSink) cleanTableResource(tableID model.TableID) { // otherwise when the table is dispatched back again, // it may read the old values. // See: https://github.com/pingcap/tiflow/issues/4464#issuecomment-1085385382. - if resolvedTs, loaded := s.tableMaxResolvedTs.LoadAndDelete(tableID); loaded { + if resolved, loaded := s.tableMaxResolvedTs.LoadAndDelete(tableID); loaded { log.Info("clean up table max resolved ts in MySQL sink", zap.Int64("tableID", tableID), - zap.Uint64("resolvedTs", resolvedTs.(uint64))) + zap.Uint64("resolvedTs", resolved.(model.ResolvedTs).Ts)) } if checkpointTs, loaded := s.tableCheckpointTs.LoadAndDelete(tableID); loaded { log.Info("clean up table checkpoint ts in MySQL sink", @@ -538,27 +540,26 @@ func (s *mysqlSink) RemoveTable(ctx context.Context, tableID model.TableID) erro case <-ctx.Done(): return errors.Trace(ctx.Err()) case <-ticker.C: - maxResolvedTs, ok := s.tableMaxResolvedTs.Load(tableID) + maxResolved, ok := s.getTableResolvedTs(tableID) log.Warn("Barrier doesn't return in time, may be stuck", zap.Int64("tableID", tableID), zap.Bool("hasResolvedTs", ok), - zap.Any("resolvedTs", maxResolvedTs), + zap.Any("resolvedTs", maxResolved.Ts), zap.Uint64("checkpointTs", s.getTableCheckpointTs(tableID))) default: - v, ok := s.tableMaxResolvedTs.Load(tableID) + maxResolved, ok := s.getTableResolvedTs(tableID) if !ok { log.Info("No table resolvedTs is found", zap.Int64("tableID", tableID)) return nil } - maxResolvedTs := v.(uint64) - if s.getTableCheckpointTs(tableID) >= maxResolvedTs { + if s.getTableCheckpointTs(tableID) >= maxResolved.Ts { return nil } - checkpointTs, err := s.FlushRowChangedEvents(ctx, tableID, maxResolvedTs) + checkpointTs, err := s.FlushRowChangedEvents(ctx, tableID, maxResolved) if err != nil { return err } - if checkpointTs >= maxResolvedTs { + if checkpointTs >= maxResolved.Ts { return nil } // short sleep to avoid cpu spin @@ -575,6 +576,15 @@ func (s *mysqlSink) getTableCheckpointTs(tableID model.TableID) uint64 { return uint64(0) } +func (s *mysqlSink) getTableResolvedTs(tableID model.TableID) (model.ResolvedTs, bool) { + v, ok := s.tableMaxResolvedTs.Load(tableID) + var resolved model.ResolvedTs + if ok { + resolved = v.(model.ResolvedTs) + } + return resolved, ok +} + func logDMLTxnErr(err error) error { if isRetryableDMLError(err) { log.Warn("execute DMLs with error, retry later", zap.Error(err)) diff --git a/cdc/sink/mysql/mysql_test.go b/cdc/sink/mysql/mysql_test.go index 14b743de09c..b2af9e18040 100644 --- a/cdc/sink/mysql/mysql_test.go +++ b/cdc/sink/mysql/mysql_test.go @@ -1248,7 +1248,7 @@ func TestNewMySQLSinkExecDML(t *testing.T) { // retry to make sure event is flushed err = retry.Do(context.Background(), func() error { - ts, err := sink.FlushRowChangedEvents(ctx, 1, uint64(2)) + ts, err := sink.FlushRowChangedEvents(ctx, 1, model.NewResolvedTs(uint64(2))) require.Nil(t, err) if ts < uint64(2) { return errors.Errorf("checkpoint ts %d less than resolved ts %d", ts, 2) @@ -1259,7 +1259,7 @@ func TestNewMySQLSinkExecDML(t *testing.T) { require.Nil(t, err) err = retry.Do(context.Background(), func() error { - ts, err := sink.FlushRowChangedEvents(ctx, 2, uint64(4)) + ts, err := sink.FlushRowChangedEvents(ctx, 2, model.NewResolvedTs(uint64(4))) require.Nil(t, err) if ts < uint64(4) { return errors.Errorf("checkpoint ts %d less than resolved ts %d", ts, 4) @@ -1787,7 +1787,7 @@ func TestMySQLSinkFlushResolvedTs(t *testing.T) { model.DefaultChangeFeedID(changefeed), sinkURI, f, rc, map[string]string{}) require.Nil(t, err) - checkpoint, err := sink.FlushRowChangedEvents(ctx, model.TableID(1), 1) + checkpoint, err := sink.FlushRowChangedEvents(ctx, model.TableID(1), model.NewResolvedTs(1)) require.Nil(t, err) require.True(t, checkpoint <= 1) rows := []*model.RowChangedEvent{ @@ -1806,7 +1806,7 @@ func TestMySQLSinkFlushResolvedTs(t *testing.T) { } err = sink.EmitRowChangedEvents(ctx, rows...) require.Nil(t, err) - checkpoint, err = sink.FlushRowChangedEvents(ctx, model.TableID(1), 6) + checkpoint, err = sink.FlushRowChangedEvents(ctx, model.TableID(1), model.NewResolvedTs(6)) require.True(t, checkpoint <= 6) require.Nil(t, err) require.True(t, sink.getTableCheckpointTs(model.TableID(1)) <= 6) @@ -1826,16 +1826,16 @@ func TestMySQLSinkFlushResolvedTs(t *testing.T) { } err = sink.EmitRowChangedEvents(ctx, rows...) require.Nil(t, err) - checkpoint, err = sink.FlushRowChangedEvents(ctx, model.TableID(2), 5) + checkpoint, err = sink.FlushRowChangedEvents(ctx, model.TableID(2), model.NewResolvedTs(5)) require.True(t, checkpoint <= 5) require.Nil(t, err) require.True(t, sink.getTableCheckpointTs(model.TableID(2)) <= 5) _ = sink.Close(ctx) - _, err = sink.FlushRowChangedEvents(ctx, model.TableID(2), 6) + _, err = sink.FlushRowChangedEvents(ctx, model.TableID(2), model.NewResolvedTs(6)) require.Nil(t, err) cancel() - _, err = sink.FlushRowChangedEvents(ctx, model.TableID(2), 6) + _, err = sink.FlushRowChangedEvents(ctx, model.TableID(2), model.NewResolvedTs(6)) require.Regexp(t, ".*context canceled.*", err) } @@ -1906,7 +1906,7 @@ func TestCleanTableResource(t *testing.T) { Table: &model.TableName{TableID: tblID, Schema: "test", Table: "t1"}, })) s.tableCheckpointTs.Store(tblID, uint64(1)) - s.tableMaxResolvedTs.Store(tblID, uint64(2)) + s.tableMaxResolvedTs.Store(tblID, model.NewResolvedTs(uint64(2))) _, ok := s.txnCache.unresolvedTxns[tblID] require.True(t, ok) require.Nil(t, s.AddTable(tblID)) diff --git a/cdc/sink/mysql/simple_mysql_tester.go b/cdc/sink/mysql/simple_mysql_tester.go index ff3df320f50..ed68b1f12b5 100644 --- a/cdc/sink/mysql/simple_mysql_tester.go +++ b/cdc/sink/mysql/simple_mysql_tester.go @@ -177,12 +177,14 @@ func (s *simpleMySQLSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) // FlushRowChangedEvents flushes each row which of commitTs less than or equal to `resolvedTs` into downstream. // TiCDC guarantees that all of Event which of commitTs less than or equal to `resolvedTs` are sent to Sink through `EmitRowChangedEvents` -func (s *simpleMySQLSink) FlushRowChangedEvents(ctx context.Context, _ model.TableID, resolvedTs uint64) (uint64, error) { +func (s *simpleMySQLSink) FlushRowChangedEvents( + ctx context.Context, _ model.TableID, resolved model.ResolvedTs, +) (uint64, error) { s.rowsBufferLock.Lock() defer s.rowsBufferLock.Unlock() newBuffer := make([]*model.RowChangedEvent, 0, len(s.rowsBuffer)) for _, row := range s.rowsBuffer { - if row.CommitTs <= resolvedTs { + if row.CommitTs <= resolved.Ts { err := s.executeRowChangedEvents(ctx, row) if err != nil { return 0, err @@ -192,7 +194,7 @@ func (s *simpleMySQLSink) FlushRowChangedEvents(ctx context.Context, _ model.Tab } } s.rowsBuffer = newBuffer - return resolvedTs, nil + return resolved.Ts, nil } // EmitCheckpointTs sends CheckpointTs to Sink diff --git a/cdc/sink/mysql/txn_cache.go b/cdc/sink/mysql/txn_cache.go index e084decaf12..440e8bd96c0 100644 --- a/cdc/sink/mysql/txn_cache.go +++ b/cdc/sink/mysql/txn_cache.go @@ -134,8 +134,8 @@ func splitResolvedTxn( checkpointTsMap = make(map[model.TableID]uint64, len(unresolvedTxns)) resolvedTsMap.Range(func(k, v any) bool { tableID := k.(model.TableID) - resolvedTs := v.(model.Ts) - checkpointTsMap[tableID] = resolvedTs + resolved := v.(model.ResolvedTs) + checkpointTsMap[tableID] = resolved.Ts return true }) diff --git a/cdc/sink/mysql/txn_cache_test.go b/cdc/sink/mysql/txn_cache_test.go index 1332aec1bb5..d55bd25cf6c 100644 --- a/cdc/sink/mysql/txn_cache_test.go +++ b/cdc/sink/mysql/txn_cache_test.go @@ -267,19 +267,19 @@ func TestSplitResolvedTxn(test *testing.T) { cache.Append(nil, t.input...) resolvedTsMap := sync.Map{} for tableID, ts := range t.resolvedTsMap { - resolvedTsMap.Store(tableID, ts) + resolvedTsMap.Store(tableID, model.NewResolvedTs(ts)) } - checkpointTsMap, resolved := cache.Resolved(&resolvedTsMap) - for tableID, txns := range resolved { + checkpointTsMap, resolvedTxn := cache.Resolved(&resolvedTsMap) + for tableID, txns := range resolvedTxn { sort.Slice(txns, func(i, j int) bool { if txns[i].CommitTs != txns[j].CommitTs { return txns[i].CommitTs < txns[j].CommitTs } return txns[i].StartTs < txns[j].StartTs }) - resolved[tableID] = txns + resolvedTxn[tableID] = txns } - require.Equal(test, t.expected, resolved, cmp.Diff(resolved, t.expected)) + require.Equal(test, t.expected, resolvedTxn, cmp.Diff(resolvedTxn, t.expected)) require.Equal(test, t.resolvedTsMap, checkpointTsMap) } } diff --git a/cdc/sink/sink.go b/cdc/sink/sink.go index 4c0081570e6..3ea13640e9c 100644 --- a/cdc/sink/sink.go +++ b/cdc/sink/sink.go @@ -53,12 +53,16 @@ type Sink interface { EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error // FlushRowChangedEvents flushes each row which of commitTs less than or - // equal to `resolvedTs` into downstream. - // TiCDC guarantees that all the Events whose commitTs is less than or - // equal to `resolvedTs` are sent to Sink through `EmitRowChangedEvents` + // equal to `resolved.Ts` into downstream. + // With `resolved.Mode == NormalResolvedMode`, TiCDC guarantees that all events whose commitTs + // is less than or equal to `resolved.Ts` are sent to Sink. + // With `resolved.Mode == BatchResolvedMode`, TiCDC guarantees that all events whose commitTs + // is less than 'resolved.Ts' are sent to Sink. // // FlushRowChangedEvents is thread-safe. - FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) + FlushRowChangedEvents( + ctx context.Context, tableID model.TableID, resolved model.ResolvedTs, + ) (uint64, error) // EmitCheckpointTs sends CheckpointTs to Sink. // TiCDC guarantees that all Events **in the cluster** which of commitTs diff --git a/cdc/sink/table_sink.go b/cdc/sink/table_sink.go index 34f4950a5c7..ee3bdc2a548 100644 --- a/cdc/sink/table_sink.go +++ b/cdc/sink/table_sink.go @@ -71,7 +71,10 @@ func (t *tableSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error // FlushRowChangedEvents flushes sorted rows to sink manager, note the resolvedTs // is required to be no more than global resolvedTs, table barrierTs and table // redo log watermarkTs. -func (t *tableSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) { +func (t *tableSink) FlushRowChangedEvents( + ctx context.Context, tableID model.TableID, resolved model.ResolvedTs, +) (uint64, error) { + resolvedTs := resolved.Ts if tableID != t.tableID { log.Panic("inconsistent table sink", zap.Int64("tableID", tableID), zap.Int64("sinkTableID", t.tableID)) @@ -80,7 +83,7 @@ func (t *tableSink) FlushRowChangedEvents(ctx context.Context, tableID model.Tab return t.buffer[i].CommitTs > resolvedTs }) if i == 0 { - return t.flushResolvedTs(ctx, resolvedTs) + return t.flushResolvedTs(ctx, resolved) } resolvedRows := t.buffer[:i] t.buffer = append(make([]*model.RowChangedEvent, 0, len(t.buffer[i:])), t.buffer[i:]...) @@ -89,19 +92,21 @@ func (t *tableSink) FlushRowChangedEvents(ctx context.Context, tableID model.Tab if err != nil { return 0, errors.Trace(err) } - return t.flushResolvedTs(ctx, resolvedTs) + return t.flushResolvedTs(ctx, resolved) } -func (t *tableSink) flushResolvedTs(ctx context.Context, resolvedTs uint64) (uint64, error) { - redoTs, err := t.flushRedoLogs(ctx, resolvedTs) +func (t *tableSink) flushResolvedTs( + ctx context.Context, resolved model.ResolvedTs, +) (uint64, error) { + redoTs, err := t.flushRedoLogs(ctx, resolved.Ts) if err != nil { return 0, errors.Trace(err) } - if redoTs < resolvedTs { - resolvedTs = redoTs + if redoTs < resolved.Ts { + resolved.Ts = redoTs } - checkpointTs, err := t.backendSink.FlushRowChangedEvents(ctx, t.tableID, resolvedTs) + checkpointTs, err := t.backendSink.FlushRowChangedEvents(ctx, t.tableID, resolved) if err != nil { return 0, errors.Trace(err) } diff --git a/cdc/sorter/leveldb/writer.go b/cdc/sorter/leveldb/writer.go index e9a959bab02..1e3251113b5 100644 --- a/cdc/sorter/leveldb/writer.go +++ b/cdc/sorter/leveldb/writer.go @@ -17,7 +17,6 @@ import ( "context" "github.com/pingcap/log" - "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sorter/encoding" "github.com/pingcap/tiflow/cdc/sorter/leveldb/message" "github.com/pingcap/tiflow/pkg/actor" @@ -57,7 +56,7 @@ func (w *writer) Poll(ctx context.Context, msgs []actormsg.Message[message.Task] } ev := msgs[i].Value.InputEvent - if ev.RawKV.OpType == model.OpTypeResolved { + if ev.IsResolved() { if w.maxResolvedTs < ev.CRTs { w.maxResolvedTs = ev.CRTs } diff --git a/cdc/sorter/memory/entry_sorter.go b/cdc/sorter/memory/entry_sorter.go index 233497f3a89..02c6f884248 100644 --- a/cdc/sorter/memory/entry_sorter.go +++ b/cdc/sorter/memory/entry_sorter.go @@ -141,7 +141,7 @@ func (es *EntrySorter) AddEntry(_ context.Context, entry *model.PolymorphicEvent } es.lock.Lock() defer es.lock.Unlock() - if entry.RawKV.OpType == model.OpTypeResolved { + if entry.IsResolved() { es.resolvedTsGroup = append(es.resolvedTsGroup, entry.CRTs) es.resolvedNotifier.Notify() } else { diff --git a/cdc/sorter/memory/entry_sorter_test.go b/cdc/sorter/memory/entry_sorter_test.go index 90b95f99e8a..d88ec9b473d 100644 --- a/cdc/sorter/memory/entry_sorter_test.go +++ b/cdc/sorter/memory/entry_sorter_test.go @@ -275,7 +275,7 @@ func TestEntrySorterRandomly(t *testing.T) { } lastTs = entry.CRTs lastOpType = entry.RawKV.OpType - if entry.RawKV.OpType == model.OpTypeResolved { + if entry.IsResolved() { resolvedTs = entry.CRTs } if resolvedTs == maxTs { @@ -509,7 +509,7 @@ func BenchmarkSorter(b *testing.B) { }() var resolvedTs uint64 for entry := range es.Output() { - if entry.RawKV.OpType == model.OpTypeResolved { + if entry.IsResolved() { resolvedTs = entry.CRTs } if resolvedTs == maxTs { diff --git a/cdc/sorter/unified/heap_sorter.go b/cdc/sorter/unified/heap_sorter.go index 16d78dfeec2..30c4930472c 100644 --- a/cdc/sorter/unified/heap_sorter.go +++ b/cdc/sorter/unified/heap_sorter.go @@ -118,7 +118,7 @@ func (h *heapSorter) flush(ctx context.Context, maxResolvedTs uint64) error { // Since when a table is mostly idle or near-idle, most flushes would contain one ResolvedEvent alone, // this optimization will greatly improve performance when (1) total number of table is large, // and (2) most tables do not have many events. - if h.heap.Len() == 1 && h.heap[0].entry.RawKV.OpType == model.OpTypeResolved { + if h.heap.Len() == 1 && h.heap[0].entry.IsResolved() { h.heap.Pop() } @@ -303,7 +303,7 @@ func (h *heapSorter) init(ctx context.Context, onError func(err error)) { poolHandle := heapSorterPool.RegisterEvent(func(ctx context.Context, eventI interface{}) error { event := eventI.(*model.PolymorphicEvent) heap.Push(&h.heap, &sortItem{entry: event}) - isResolvedEvent := event.RawKV != nil && event.RawKV.OpType == model.OpTypeResolved + isResolvedEvent := event.RawKV != nil && event.IsResolved() if isResolvedEvent { if event.RawKV.CRTs < state.maxResolved { diff --git a/cdc/sorter/unified/merger.go b/cdc/sorter/unified/merger.go index 238bf41a1a3..efd0a08d43a 100644 --- a/cdc/sorter/unified/merger.go +++ b/cdc/sorter/unified/merger.go @@ -352,7 +352,7 @@ func runMerger(ctx context.Context, numSorters int, in <-chan *flushTask, out ch continue } - if event.CRTs > minResolvedTs || (event.CRTs == minResolvedTs && event.RawKV.OpType == model.OpTypeResolved) { + if event.CRTs > minResolvedTs || (event.CRTs == minResolvedTs && event.IsResolved()) { // we have processed all events from this task that need to be processed in this merge if event.CRTs > minResolvedTs || event.RawKV.OpType != model.OpTypeResolved { pendingSet.Store(task, event) diff --git a/cdc/sorter/unified/unified_sorter.go b/cdc/sorter/unified/unified_sorter.go index 70f04cf2118..0287f158675 100644 --- a/cdc/sorter/unified/unified_sorter.go +++ b/cdc/sorter/unified/unified_sorter.go @@ -183,7 +183,7 @@ func (s *Sorter) Run(ctx context.Context) error { case <-subctx.Done(): return subctx.Err() case event := <-s.inputCh: - if event.RawKV != nil && event.RawKV.OpType == model.OpTypeResolved { + if event.RawKV != nil && event.IsResolved() { // broadcast resolved events for _, sorter := range heapSorters { select { diff --git a/cmd/kafka-consumer/main.go b/cmd/kafka-consumer/main.go index 11e9cb179db..9fbe12b782a 100644 --- a/cmd/kafka-consumer/main.go +++ b/cmd/kafka-consumer/main.go @@ -790,7 +790,8 @@ func syncFlushRowChangedEvents(ctx context.Context, sink *partitionSink, resolve flushedResolvedTs := true sink.tablesMap.Range(func(key, value interface{}) bool { tableID := key.(int64) - checkpointTs, err = sink.FlushRowChangedEvents(ctx, tableID, resolvedTs) + checkpointTs, err = sink.FlushRowChangedEvents(ctx, + tableID, model.NewResolvedTs(resolvedTs)) if err != nil { return false } diff --git a/pkg/applier/redo.go b/pkg/applier/redo.go index b20b326e452..f13bf1c9fd5 100644 --- a/pkg/applier/redo.go +++ b/pkg/applier/redo.go @@ -165,7 +165,7 @@ func (ra *RedoApplier) consumeLogs(ctx context.Context) error { } for tableID, tableLastResolvedTs := range tableResolvedTsMap { - _, err = s.FlushRowChangedEvents(ctx, tableID, tableLastResolvedTs) + _, err = s.FlushRowChangedEvents(ctx, tableID, model.NewResolvedTs(tableLastResolvedTs)) if err != nil { return err } @@ -177,7 +177,7 @@ func (ra *RedoApplier) consumeLogs(ctx context.Context) error { } for tableID := range tableResolvedTsMap { - _, err = s.FlushRowChangedEvents(ctx, tableID, resolvedTs) + _, err = s.FlushRowChangedEvents(ctx, tableID, model.NewResolvedTs(resolvedTs)) if err != nil { return err }