From ba2c2fd8c48fd406e22b93a357ef4e7f7a880699 Mon Sep 17 00:00:00 2001 From: sdojjy Date: Thu, 25 Nov 2021 10:12:30 +0800 Subject: [PATCH 1/4] refine Sink interface --- cdc/owner/async_sink_test.go | 2 +- cdc/processor/pipeline/sink.go | 10 ++++++---- cdc/processor/pipeline/sink_test.go | 20 ++++++++++---------- cdc/processor/pipeline/table.go | 2 +- cdc/sink/black_hole.go | 4 ++-- cdc/sink/buffer_sink.go | 5 +++-- cdc/sink/cdclog/file.go | 4 ++-- cdc/sink/cdclog/s3.go | 4 ++-- cdc/sink/manager.go | 6 +++--- cdc/sink/mq.go | 4 ++-- cdc/sink/mysql.go | 6 +++--- cdc/sink/simple_mysql_tester.go | 4 ++-- cdc/sink/sink.go | 4 ++-- cdc/sink/table_sink.go | 8 ++++---- pkg/applier/redo.go | 7 ++++--- 15 files changed, 47 insertions(+), 43 deletions(-) diff --git a/cdc/owner/async_sink_test.go b/cdc/owner/async_sink_test.go index 3c7fc11a11e..529eea01704 100644 --- a/cdc/owner/async_sink_test.go +++ b/cdc/owner/async_sink_test.go @@ -65,7 +65,7 @@ func (m *mockSink) Close(ctx context.Context) error { return nil } -func (m *mockSink) Barrier(ctx context.Context) error { +func (m *mockSink) Barrier(ctx context.Context, tableID model.TableID) error { return nil } diff --git a/cdc/processor/pipeline/sink.go b/cdc/processor/pipeline/sink.go index a53e0c86c57..4d642824b66 100644 --- a/cdc/processor/pipeline/sink.go +++ b/cdc/processor/pipeline/sink.go @@ -64,8 +64,9 @@ func (s *TableStatus) Store(new TableStatus) { } type sinkNode struct { - sink sink.Sink - status TableStatus + sink sink.Sink + status TableStatus + tableID model.TableID resolvedTs model.Ts checkpointTs model.Ts @@ -78,8 +79,9 @@ type sinkNode struct { flowController tableFlowController } -func newSinkNode(sink sink.Sink, startTs model.Ts, targetTs model.Ts, flowController tableFlowController) *sinkNode { +func newSinkNode(tableID model.TableID, sink sink.Sink, startTs model.Ts, targetTs model.Ts, flowController tableFlowController) *sinkNode { return &sinkNode{ + tableID: tableID, sink: sink, status: TableStatusInitializing, targetTs: targetTs, @@ -136,7 +138,7 @@ func (n *sinkNode) flushSink(ctx pipeline.NodeContext, resolvedTs model.Ts) (err if err := n.emitRow2Sink(ctx); err != nil { return errors.Trace(err) } - checkpointTs, err := n.sink.FlushRowChangedEvents(ctx, resolvedTs) + checkpointTs, err := n.sink.FlushRowChangedEvents(ctx, n.tableID, resolvedTs) if err != nil { return errors.Trace(err) } diff --git a/cdc/processor/pipeline/sink_test.go b/cdc/processor/pipeline/sink_test.go index 12ba3bb1fa5..2b334ca6323 100644 --- a/cdc/processor/pipeline/sink_test.go +++ b/cdc/processor/pipeline/sink_test.go @@ -76,7 +76,7 @@ func (s *mockSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error panic("unreachable") } -func (s *mockSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) { +func (s *mockSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) { s.received = append(s.received, struct { resolvedTs model.Ts row *model.RowChangedEvent @@ -92,7 +92,7 @@ func (s *mockSink) Close(ctx context.Context) error { return nil } -func (s *mockSink) Barrier(ctx context.Context) error { +func (s *mockSink) Barrier(ctx context.Context, tableID model.TableID) error { return nil } @@ -137,7 +137,7 @@ func (s *outputSuite) TestStatus(c *check.C) { }) // test stop at targetTs - node := newSinkNode(&mockSink{}, 0, 10, &mockFlowController{}) + node := newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}) c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil) c.Assert(node.Status(), check.Equals, TableStatusInitializing) @@ -163,7 +163,7 @@ func (s *outputSuite) TestStatus(c *check.C) { c.Assert(node.CheckpointTs(), check.Equals, uint64(10)) // test the stop at ts command - node = newSinkNode(&mockSink{}, 0, 10, &mockFlowController{}) + node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}) c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil) c.Assert(node.Status(), check.Equals, TableStatusInitializing) @@ -186,7 +186,7 @@ func (s *outputSuite) TestStatus(c *check.C) { c.Assert(node.CheckpointTs(), check.Equals, uint64(2)) // test the stop at ts command is after then resolvedTs and checkpointTs is greater than stop ts - node = newSinkNode(&mockSink{}, 0, 10, &mockFlowController{}) + node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}) c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil) c.Assert(node.Status(), check.Equals, TableStatusInitializing) @@ -223,7 +223,7 @@ func (s *outputSuite) TestStopStatus(c *check.C) { }) closeCh := make(chan interface{}, 1) - node := newSinkNode(&mockCloseControlSink{mockSink: mockSink{}, closeCh: closeCh}, 0, 100, &mockFlowController{}) + node := newSinkNode(1, &mockCloseControlSink{mockSink: mockSink{}, closeCh: closeCh}, 0, 100, &mockFlowController{}) c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil) c.Assert(node.Status(), check.Equals, TableStatusInitializing) c.Assert(node.Receive(pipeline.MockNodeContext4Test(ctx, @@ -258,7 +258,7 @@ func (s *outputSuite) TestManyTs(c *check.C) { }, }) sink := &mockSink{} - node := newSinkNode(sink, 0, 10, &mockFlowController{}) + node := newSinkNode(1, sink, 0, 10, &mockFlowController{}) c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil) c.Assert(node.Status(), check.Equals, TableStatusInitializing) @@ -379,7 +379,7 @@ func (s *outputSuite) TestIgnoreEmptyRowChangeEvent(c *check.C) { }, }) sink := &mockSink{} - node := newSinkNode(sink, 0, 10, &mockFlowController{}) + node := newSinkNode(1, sink, 0, 10, &mockFlowController{}) c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil) // empty row, no Columns and PreColumns. @@ -399,7 +399,7 @@ func (s *outputSuite) TestSplitUpdateEventWhenEnableOldValue(c *check.C) { }, }) sink := &mockSink{} - node := newSinkNode(sink, 0, 10, &mockFlowController{}) + node := newSinkNode(1, sink, 0, 10, &mockFlowController{}) c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil) // nil row. @@ -458,7 +458,7 @@ func (s *outputSuite) TestSplitUpdateEventWhenDisableOldValue(c *check.C) { }, }) sink := &mockSink{} - node := newSinkNode(sink, 0, 10, &mockFlowController{}) + node := newSinkNode(1, sink, 0, 10, &mockFlowController{}) c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil) // nil row. diff --git a/cdc/processor/pipeline/table.go b/cdc/processor/pipeline/table.go index be4ba66f5c1..357e2cf7ea5 100644 --- a/cdc/processor/pipeline/table.go +++ b/cdc/processor/pipeline/table.go @@ -202,7 +202,7 @@ func NewTablePipeline(ctx cdcContext.Context, p := pipeline.NewPipeline(ctx, 500*time.Millisecond, runnerSize, defaultOutputChannelSize) sorterNode := newSorterNode(tableName, tableID, replicaInfo.StartTs, flowController, mounter) - sinkNode := newSinkNode(sink, replicaInfo.StartTs, targetTs, flowController) + sinkNode := newSinkNode(tableID, sink, replicaInfo.StartTs, targetTs, flowController) p.AppendNode(ctx, "puller", newPullerNode(tableID, replicaInfo, tableName)) p.AppendNode(ctx, "sorter", sorterNode) diff --git a/cdc/sink/black_hole.go b/cdc/sink/black_hole.go index 3eca14a0119..2f730bf381f 100644 --- a/cdc/sink/black_hole.go +++ b/cdc/sink/black_hole.go @@ -46,7 +46,7 @@ func (b *blackHoleSink) EmitRowChangedEvents(ctx context.Context, rows ...*model return nil } -func (b *blackHoleSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) { +func (b *blackHoleSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) { log.Debug("BlockHoleSink: FlushRowChangedEvents", zap.Uint64("resolvedTs", resolvedTs)) err := b.statistics.RecordBatchExecution(func() (int, error) { // TODO: add some random replication latency @@ -79,6 +79,6 @@ func (b *blackHoleSink) Close(ctx context.Context) error { return nil } -func (b *blackHoleSink) Barrier(ctx context.Context) error { +func (b *blackHoleSink) Barrier(ctx context.Context, tableID model.TableID) error { return nil } diff --git a/cdc/sink/buffer_sink.go b/cdc/sink/buffer_sink.go index d8d70456ca7..de7d7dc3953 100644 --- a/cdc/sink/buffer_sink.go +++ b/cdc/sink/buffer_sink.go @@ -109,7 +109,8 @@ func (b *bufferSink) run(ctx context.Context, errCh chan error) { b.bufferMu.Unlock() start := time.Now() - checkpointTs, err := b.Sink.FlushRowChangedEvents(ctx, resolvedTs) + // todo: use real table ID + checkpointTs, err := b.Sink.FlushRowChangedEvents(ctx, 0, resolvedTs) if err != nil { if errors.Cause(err) != context.Canceled { errCh <- err @@ -146,7 +147,7 @@ func (b *bufferSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.Ro return nil } -func (b *bufferSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) { +func (b *bufferSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) { select { case <-ctx.Done(): return atomic.LoadUint64(&b.checkpointTs), ctx.Err() diff --git a/cdc/sink/cdclog/file.go b/cdc/sink/cdclog/file.go index c6794b3c6e2..6913b015a0b 100644 --- a/cdc/sink/cdclog/file.go +++ b/cdc/sink/cdclog/file.go @@ -225,7 +225,7 @@ func (f *fileSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowC return f.emitRowChangedEvents(ctx, newTableStream, rows...) } -func (f *fileSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) { +func (f *fileSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) { log.Debug("[FlushRowChangedEvents] enter", zap.Uint64("ts", resolvedTs)) return f.flushRowChangedEvents(ctx, resolvedTs) } @@ -349,7 +349,7 @@ func (f *fileSink) Close(ctx context.Context) error { return nil } -func (f *fileSink) Barrier(ctx context.Context) error { +func (f *fileSink) Barrier(ctx context.Context, tableID model.TableID) error { // Barrier does nothing because FlushRowChangedEvents in file sink has flushed // all buffered events forcedlly. return nil diff --git a/cdc/sink/cdclog/s3.go b/cdc/sink/cdclog/s3.go index f76d6c23946..53db1e3fb6d 100644 --- a/cdc/sink/cdclog/s3.go +++ b/cdc/sink/cdclog/s3.go @@ -222,7 +222,7 @@ func (s *s3Sink) flushLogMeta(ctx context.Context) error { return cerror.WrapError(cerror.ErrS3SinkWriteStorage, s.storage.WriteFile(ctx, logMetaFile, data)) } -func (s *s3Sink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) { +func (s *s3Sink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) { // we should flush all events before resolvedTs, there are two kind of flush policy // 1. flush row events to a s3 chunk: if the event size is not enough, // TODO: when cdc crashed, we should repair these chunks to a complete file @@ -347,7 +347,7 @@ func (s *s3Sink) Close(ctx context.Context) error { return nil } -func (s *s3Sink) Barrier(ctx context.Context) error { +func (s *s3Sink) Barrier(ctx context.Context, tableID model.TableID) error { // Barrier does nothing because FlushRowChangedEvents in s3 sink has flushed // all buffered events forcedlly. return nil diff --git a/cdc/sink/manager.go b/cdc/sink/manager.go index 80bf0c265ea..ed80e41703d 100644 --- a/cdc/sink/manager.go +++ b/cdc/sink/manager.go @@ -106,7 +106,7 @@ func (m *Manager) getMinEmittedTs() model.Ts { return minTs } -func (m *Manager) flushBackendSink(ctx context.Context) (model.Ts, error) { +func (m *Manager) flushBackendSink(ctx context.Context, tableID model.TableID) (model.Ts, error) { // NOTICE: Because all table sinks will try to flush backend sink, // which will cause a lot of lock contention and blocking in high concurrency cases. // So here we use flushing as a lightweight lock to improve the lock competition problem. @@ -119,7 +119,7 @@ func (m *Manager) flushBackendSink(ctx context.Context) (model.Ts, error) { atomic.StoreInt64(&m.flushing, 0) }() minEmittedTs := m.getMinEmittedTs() - checkpointTs, err := m.backendSink.FlushRowChangedEvents(ctx, minEmittedTs) + checkpointTs, err := m.backendSink.FlushRowChangedEvents(ctx, tableID, minEmittedTs) if err != nil { return m.getCheckpointTs(), errors.Trace(err) } @@ -142,7 +142,7 @@ func (m *Manager) destroyTableSink(ctx context.Context, tableID model.TableID) e return ctx.Err() case <-callback: } - return m.backendSink.Barrier(ctx) + return m.backendSink.Barrier(ctx, tableID) } func (m *Manager) getCheckpointTs() uint64 { diff --git a/cdc/sink/mq.go b/cdc/sink/mq.go index 5684767beb7..0dc4a96dfad 100644 --- a/cdc/sink/mq.go +++ b/cdc/sink/mq.go @@ -155,7 +155,7 @@ func (k *mqSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowCha return nil } -func (k *mqSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) { +func (k *mqSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) { if resolvedTs <= k.checkpointTs { return k.checkpointTs, nil } @@ -260,7 +260,7 @@ func (k *mqSink) Close(ctx context.Context) error { return errors.Trace(err) } -func (k *mqSink) Barrier(cxt context.Context) error { +func (k *mqSink) Barrier(cxt context.Context, tableID model.TableID) error { // Barrier does nothing because FlushRowChangedEvents in mq sink has flushed // all buffered events by force. return nil diff --git a/cdc/sink/mysql.go b/cdc/sink/mysql.go index 4b2e609ae07..3b4d8478b7a 100644 --- a/cdc/sink/mysql.go +++ b/cdc/sink/mysql.go @@ -207,7 +207,7 @@ func (s *mysqlSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.Row // FlushRowChangedEvents will flush all received events, we don't allow mysql // sink to receive events before resolving -func (s *mysqlSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) { +func (s *mysqlSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) { if atomic.LoadUint64(&s.maxResolvedTs) < resolvedTs { atomic.StoreUint64(&s.maxResolvedTs, resolvedTs) } @@ -478,7 +478,7 @@ func (s *mysqlSink) Close(ctx context.Context) error { return cerror.WrapError(cerror.ErrMySQLConnectionError, err) } -func (s *mysqlSink) Barrier(ctx context.Context) error { +func (s *mysqlSink) Barrier(ctx context.Context, tableID model.TableID) error { warnDuration := 3 * time.Minute ticker := time.NewTicker(warnDuration) defer ticker.Stop() @@ -495,7 +495,7 @@ func (s *mysqlSink) Barrier(ctx context.Context) error { if s.checkpointTs() >= maxResolvedTs { return nil } - checkpointTs, err := s.FlushRowChangedEvents(ctx, maxResolvedTs) + checkpointTs, err := s.FlushRowChangedEvents(ctx, tableID, maxResolvedTs) if err != nil { return err } diff --git a/cdc/sink/simple_mysql_tester.go b/cdc/sink/simple_mysql_tester.go index c6abc68a736..fd435344b5c 100644 --- a/cdc/sink/simple_mysql_tester.go +++ b/cdc/sink/simple_mysql_tester.go @@ -186,7 +186,7 @@ func (s *simpleMySQLSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) // FlushRowChangedEvents flushes each row which of commitTs less than or equal to `resolvedTs` into downstream. // TiCDC guarantees that all of Event which of commitTs less than or equal to `resolvedTs` are sent to Sink through `EmitRowChangedEvents` -func (s *simpleMySQLSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) { +func (s *simpleMySQLSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) { s.rowsBufferLock.Lock() defer s.rowsBufferLock.Unlock() newBuffer := make([]*model.RowChangedEvent, 0, len(s.rowsBuffer)) @@ -216,7 +216,7 @@ func (s *simpleMySQLSink) Close(ctx context.Context) error { return s.db.Close() } -func (s *simpleMySQLSink) Barrier(ctx context.Context) error { +func (s *simpleMySQLSink) Barrier(ctx context.Context, tableID model.TableID) error { return nil } diff --git a/cdc/sink/sink.go b/cdc/sink/sink.go index 3d28595a707..6b89e334167 100644 --- a/cdc/sink/sink.go +++ b/cdc/sink/sink.go @@ -45,7 +45,7 @@ type Sink interface { // FlushRowChangedEvents flushes each row which of commitTs less than or equal to `resolvedTs` into downstream. // TiCDC guarantees that all the Events whose commitTs is less than or equal to `resolvedTs` are sent to Sink through `EmitRowChangedEvents` - FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) + FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) // EmitCheckpointTs sends CheckpointTs to Sink // TiCDC guarantees that all Events **in the cluster** which of commitTs less than or equal `checkpointTs` are sent to downstream successfully. @@ -56,7 +56,7 @@ type Sink interface { // Barrier is a synchronous function to wait all events to be flushed in underlying sink // Note once Barrier is called, the resolved ts won't be pushed until the Barrier call returns. - Barrier(ctx context.Context) error + Barrier(ctx context.Context, tableID model.TableID) error } var sinkIniterMap = make(map[string]sinkInitFunc) diff --git a/cdc/sink/table_sink.go b/cdc/sink/table_sink.go index 098566b6499..2cb30150ae9 100644 --- a/cdc/sink/table_sink.go +++ b/cdc/sink/table_sink.go @@ -54,7 +54,7 @@ func (t *tableSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error // FlushRowChangedEvents flushes sorted rows to sink manager, note the resolvedTs // is required to be no more than global resolvedTs, table barrierTs and table // redo log watermarkTs. -func (t *tableSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) { +func (t *tableSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) { i := sort.Search(len(t.buffer), func(i int) bool { return t.buffer[i].CommitTs > resolvedTs }) @@ -64,7 +64,7 @@ func (t *tableSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64 if err != nil { return ckpt, err } - return t.manager.flushBackendSink(ctx) + return t.manager.flushBackendSink(ctx, tableID) } resolvedRows := t.buffer[:i] t.buffer = append(make([]*model.RowChangedEvent, 0, len(t.buffer[i:])), t.buffer[i:]...) @@ -78,7 +78,7 @@ func (t *tableSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64 if err != nil { return ckpt, err } - return t.manager.flushBackendSink(ctx) + return t.manager.flushBackendSink(ctx, tableID) } func (t *tableSink) flushRedoLogs(ctx context.Context, resolvedTs uint64) (uint64, error) { @@ -116,6 +116,6 @@ func (t *tableSink) Close(ctx context.Context) error { } // Barrier is not used in table sink -func (t *tableSink) Barrier(ctx context.Context) error { +func (t *tableSink) Barrier(ctx context.Context, tableID model.TableID) error { return nil } diff --git a/pkg/applier/redo.go b/pkg/applier/redo.go index a4421250846..14868cfcf07 100644 --- a/pkg/applier/redo.go +++ b/pkg/applier/redo.go @@ -151,7 +151,8 @@ func (ra *RedoApplier) consumeLogs(ctx context.Context) error { lastSafeResolvedTs, lastResolvedTs = lastResolvedTs, redoLog.Row.CommitTs } } - _, err = s.FlushRowChangedEvents(ctx, lastSafeResolvedTs) + // todo: use real table ID + _, err = s.FlushRowChangedEvents(ctx, 0, lastSafeResolvedTs) if err != nil { return err } @@ -160,11 +161,11 @@ func (ra *RedoApplier) consumeLogs(ctx context.Context) error { if err != nil { return err } - _, err = s.FlushRowChangedEvents(ctx, resolvedTs) + _, err = s.FlushRowChangedEvents(ctx, 0, resolvedTs) if err != nil { return err } - err = s.Barrier(ctx) + err = s.Barrier(ctx, 0) if err != nil { return err } From 26070d94d1bd43509e83a6dc050fef8d16778f85 Mon Sep 17 00:00:00 2001 From: sdojjy Date: Thu, 25 Nov 2021 10:22:53 +0800 Subject: [PATCH 2/4] fix ut --- cdc/sink/manager_test.go | 26 +++++++++++++------------- cmd/kafka-consumer/main.go | 3 ++- 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/cdc/sink/manager_test.go b/cdc/sink/manager_test.go index 29ea4bd83d2..a485d9e7a75 100644 --- a/cdc/sink/manager_test.go +++ b/cdc/sink/manager_test.go @@ -55,7 +55,7 @@ func (c *checkSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error panic("unreachable") } -func (c *checkSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) { +func (c *checkSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) { c.rowsMu.Lock() defer c.rowsMu.Unlock() var newRows []*model.RowChangedEvent @@ -83,7 +83,7 @@ func (c *checkSink) Close(ctx context.Context) error { return nil } -func (c *checkSink) Barrier(ctx context.Context) error { +func (c *checkSink) Barrier(ctx context.Context, tableID model.TableID) error { return nil } @@ -118,7 +118,7 @@ func (s *managerSuite) TestManagerRandom(c *check.C) { for j := 1; j < rowNum; j++ { if rand.Intn(10) == 0 { resolvedTs := lastResolvedTs + uint64(rand.Intn(j-int(lastResolvedTs))) - _, err := tableSink.FlushRowChangedEvents(ctx, resolvedTs) + _, err := tableSink.FlushRowChangedEvents(ctx, model.TableID(i), resolvedTs) c.Assert(err, check.IsNil) lastResolvedTs = resolvedTs } else { @@ -129,7 +129,7 @@ func (s *managerSuite) TestManagerRandom(c *check.C) { c.Assert(err, check.IsNil) } } - _, err := tableSink.FlushRowChangedEvents(ctx, uint64(rowNum)) + _, err := tableSink.FlushRowChangedEvents(ctx, model.TableID(i), uint64(rowNum)) c.Assert(err, check.IsNil) }() } @@ -180,7 +180,7 @@ func (s *managerSuite) TestManagerAddRemoveTable(c *check.C) { }) c.Assert(err, check.IsNil) } - _, err := sink.FlushRowChangedEvents(ctx, resolvedTs) + _, err := sink.FlushRowChangedEvents(ctx, sink.(*tableSink).tableID, resolvedTs) if err != nil { c.Assert(errors.Cause(err), check.Equals, context.Canceled) } @@ -244,7 +244,7 @@ func (s *managerSuite) TestManagerDestroyTableSink(c *check.C) { CommitTs: uint64(110), }) c.Assert(err, check.IsNil) - _, err = tableSink.FlushRowChangedEvents(ctx, 110) + _, err = tableSink.FlushRowChangedEvents(ctx, tableID, 110) c.Assert(err, check.IsNil) err = manager.destroyTableSink(ctx, tableID) c.Assert(err, check.IsNil) @@ -295,11 +295,11 @@ func BenchmarkManagerFlushing(b *testing.B) { // All tables are flushed concurrently, except table 0. for i := 1; i < goroutineNum; i++ { i := i - tableSink := tableSinks[i] + tblSink := tableSinks[i] go func() { for j := 1; j < rowNum; j++ { if j%2 == 0 { - _, err := tableSink.FlushRowChangedEvents(context.Background(), uint64(j)) + _, err := tblSink.FlushRowChangedEvents(context.Background(), tblSink.(*tableSink).tableID, uint64(j)) if err != nil { b.Error(err) } @@ -310,9 +310,9 @@ func BenchmarkManagerFlushing(b *testing.B) { b.ResetTimer() // Table 0 flush. - tableSink := tableSinks[0] + tblSink := tableSinks[0] for i := 0; i < b.N; i++ { - _, err := tableSink.FlushRowChangedEvents(context.Background(), uint64(rowNum)) + _, err := tblSink.FlushRowChangedEvents(context.Background(), tblSink.(*tableSink).tableID, uint64(rowNum)) if err != nil { b.Error(err) } @@ -345,7 +345,7 @@ func (e *errorSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error panic("unreachable") } -func (e *errorSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) { +func (e *errorSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) { return 0, errors.New("error in flush row changed events") } @@ -357,7 +357,7 @@ func (e *errorSink) Close(ctx context.Context) error { return nil } -func (e *errorSink) Barrier(ctx context.Context) error { +func (e *errorSink) Barrier(ctx context.Context, tableID model.TableID) error { return nil } @@ -374,7 +374,7 @@ func (s *managerSuite) TestManagerError(c *check.C) { Table: &model.TableName{TableID: 1}, }) c.Assert(err, check.IsNil) - _, err = sink.FlushRowChangedEvents(ctx, 2) + _, err = sink.FlushRowChangedEvents(ctx, 1, 2) c.Assert(err, check.IsNil) err = <-errCh c.Assert(err.Error(), check.Equals, "error in emit row changed events") diff --git a/cmd/kafka-consumer/main.go b/cmd/kafka-consumer/main.go index 566063d6406..09b81d93fff 100644 --- a/cmd/kafka-consumer/main.go +++ b/cmd/kafka-consumer/main.go @@ -593,7 +593,8 @@ func syncFlushRowChangedEvents(ctx context.Context, sink sink.Sink, resolvedTs u return ctx.Err() default: } - checkpointTs, err := sink.FlushRowChangedEvents(ctx, resolvedTs) + //todo: use real table id + checkpointTs, err := sink.FlushRowChangedEvents(ctx, 0, resolvedTs) if err != nil { return err } From f388718851cedc322e837e921ee04d8e63d050db Mon Sep 17 00:00:00 2001 From: sdojjy Date: Thu, 25 Nov 2021 10:24:18 +0800 Subject: [PATCH 3/4] fix lint --- cmd/kafka-consumer/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/kafka-consumer/main.go b/cmd/kafka-consumer/main.go index 09b81d93fff..2ab0af53c34 100644 --- a/cmd/kafka-consumer/main.go +++ b/cmd/kafka-consumer/main.go @@ -593,7 +593,7 @@ func syncFlushRowChangedEvents(ctx context.Context, sink sink.Sink, resolvedTs u return ctx.Err() default: } - //todo: use real table id + // todo: use real table id checkpointTs, err := sink.FlushRowChangedEvents(ctx, 0, resolvedTs) if err != nil { return err From 51b754ab9bd0be8086ec33718801530038522ca2 Mon Sep 17 00:00:00 2001 From: sdojjy Date: Thu, 25 Nov 2021 10:26:50 +0800 Subject: [PATCH 4/4] fix lint --- cdc/sink/mq_test.go | 10 ++++++---- cdc/sink/mysql_test.go | 6 +++--- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/cdc/sink/mq_test.go b/cdc/sink/mq_test.go index 029700c26c4..a77cc3e3c22 100644 --- a/cdc/sink/mq_test.go +++ b/cdc/sink/mq_test.go @@ -80,10 +80,12 @@ func (s mqSinkSuite) TestKafkaSink(c *check.C) { // mock kafka broker processes 1 row changed event leader.Returns(prodSuccess) + tableID := model.TableID(1) row := &model.RowChangedEvent{ Table: &model.TableName{ - Schema: "test", - Table: "t1", + Schema: "test", + Table: "t1", + TableID: tableID, }, StartTs: 100, CommitTs: 120, @@ -91,11 +93,11 @@ func (s mqSinkSuite) TestKafkaSink(c *check.C) { } err = sink.EmitRowChangedEvents(ctx, row) c.Assert(err, check.IsNil) - checkpointTs, err := sink.FlushRowChangedEvents(ctx, uint64(120)) + checkpointTs, err := sink.FlushRowChangedEvents(ctx, tableID, uint64(120)) c.Assert(err, check.IsNil) c.Assert(checkpointTs, check.Equals, uint64(120)) // flush older resolved ts - checkpointTs, err = sink.FlushRowChangedEvents(ctx, uint64(110)) + checkpointTs, err = sink.FlushRowChangedEvents(ctx, tableID, uint64(110)) c.Assert(err, check.IsNil) c.Assert(checkpointTs, check.Equals, uint64(120)) diff --git a/cdc/sink/mysql_test.go b/cdc/sink/mysql_test.go index 5588074f5b6..c7328070eef 100644 --- a/cdc/sink/mysql_test.go +++ b/cdc/sink/mysql_test.go @@ -678,7 +678,7 @@ func (s MySQLSinkSuite) TestNewMySQLSinkExecDML(c *check.C) { c.Assert(err, check.IsNil) err = retry.Do(context.Background(), func() error { - ts, err := sink.FlushRowChangedEvents(ctx, uint64(2)) + ts, err := sink.FlushRowChangedEvents(ctx, 2, uint64(2)) c.Assert(err, check.IsNil) if ts < uint64(2) { return errors.Errorf("checkpoint ts %d less than resolved ts %d", ts, 2) @@ -689,7 +689,7 @@ func (s MySQLSinkSuite) TestNewMySQLSinkExecDML(c *check.C) { c.Assert(err, check.IsNil) err = retry.Do(context.Background(), func() error { - ts, err := sink.FlushRowChangedEvents(ctx, uint64(4)) + ts, err := sink.FlushRowChangedEvents(ctx, 2, uint64(4)) c.Assert(err, check.IsNil) if ts < uint64(4) { return errors.Errorf("checkpoint ts %d less than resolved ts %d", ts, 4) @@ -698,7 +698,7 @@ func (s MySQLSinkSuite) TestNewMySQLSinkExecDML(c *check.C) { }, retry.WithBackoffBaseDelay(20), retry.WithMaxTries(10), retry.WithIsRetryableErr(cerror.IsRetryableError)) c.Assert(err, check.IsNil) - err = sink.Barrier(ctx) + err = sink.Barrier(ctx, 2) c.Assert(err, check.IsNil) err = sink.Close(ctx)