Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cdc/sink: Refine sink interface #3613

Merged
merged 5 commits into from
Nov 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cdc/owner/async_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (m *mockSink) Close(ctx context.Context) error {
return nil
}

func (m *mockSink) Barrier(ctx context.Context) error {
func (m *mockSink) Barrier(ctx context.Context, tableID model.TableID) error {
return nil
}

Expand Down
10 changes: 6 additions & 4 deletions cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,9 @@ func (s *TableStatus) Store(new TableStatus) {
}

type sinkNode struct {
sink sink.Sink
status TableStatus
sink sink.Sink
status TableStatus
tableID model.TableID

resolvedTs model.Ts
checkpointTs model.Ts
Expand All @@ -78,8 +79,9 @@ type sinkNode struct {
flowController tableFlowController
}

func newSinkNode(sink sink.Sink, startTs model.Ts, targetTs model.Ts, flowController tableFlowController) *sinkNode {
func newSinkNode(tableID model.TableID, sink sink.Sink, startTs model.Ts, targetTs model.Ts, flowController tableFlowController) *sinkNode {
return &sinkNode{
tableID: tableID,
sink: sink,
status: TableStatusInitializing,
targetTs: targetTs,
Expand Down Expand Up @@ -136,7 +138,7 @@ func (n *sinkNode) flushSink(ctx pipeline.NodeContext, resolvedTs model.Ts) (err
if err := n.emitRow2Sink(ctx); err != nil {
return errors.Trace(err)
}
checkpointTs, err := n.sink.FlushRowChangedEvents(ctx, resolvedTs)
checkpointTs, err := n.sink.FlushRowChangedEvents(ctx, n.tableID, resolvedTs)
if err != nil {
return errors.Trace(err)
}
Expand Down
20 changes: 10 additions & 10 deletions cdc/processor/pipeline/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (s *mockSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error
panic("unreachable")
}

func (s *mockSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) {
func (s *mockSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) {
s.received = append(s.received, struct {
resolvedTs model.Ts
row *model.RowChangedEvent
Expand All @@ -92,7 +92,7 @@ func (s *mockSink) Close(ctx context.Context) error {
return nil
}

func (s *mockSink) Barrier(ctx context.Context) error {
func (s *mockSink) Barrier(ctx context.Context, tableID model.TableID) error {
return nil
}

Expand Down Expand Up @@ -137,7 +137,7 @@ func (s *outputSuite) TestStatus(c *check.C) {
})

// test stop at targetTs
node := newSinkNode(&mockSink{}, 0, 10, &mockFlowController{})
node := newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)
c.Assert(node.Status(), check.Equals, TableStatusInitializing)

Expand All @@ -163,7 +163,7 @@ func (s *outputSuite) TestStatus(c *check.C) {
c.Assert(node.CheckpointTs(), check.Equals, uint64(10))

// test the stop at ts command
node = newSinkNode(&mockSink{}, 0, 10, &mockFlowController{})
node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)
c.Assert(node.Status(), check.Equals, TableStatusInitializing)

Expand All @@ -186,7 +186,7 @@ func (s *outputSuite) TestStatus(c *check.C) {
c.Assert(node.CheckpointTs(), check.Equals, uint64(2))

// test the stop at ts command is after then resolvedTs and checkpointTs is greater than stop ts
node = newSinkNode(&mockSink{}, 0, 10, &mockFlowController{})
node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)
c.Assert(node.Status(), check.Equals, TableStatusInitializing)

Expand Down Expand Up @@ -223,7 +223,7 @@ func (s *outputSuite) TestStopStatus(c *check.C) {
})

closeCh := make(chan interface{}, 1)
node := newSinkNode(&mockCloseControlSink{mockSink: mockSink{}, closeCh: closeCh}, 0, 100, &mockFlowController{})
node := newSinkNode(1, &mockCloseControlSink{mockSink: mockSink{}, closeCh: closeCh}, 0, 100, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)
c.Assert(node.Status(), check.Equals, TableStatusInitializing)
c.Assert(node.Receive(pipeline.MockNodeContext4Test(ctx,
Expand Down Expand Up @@ -258,7 +258,7 @@ func (s *outputSuite) TestManyTs(c *check.C) {
},
})
sink := &mockSink{}
node := newSinkNode(sink, 0, 10, &mockFlowController{})
node := newSinkNode(1, sink, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)
c.Assert(node.Status(), check.Equals, TableStatusInitializing)

Expand Down Expand Up @@ -379,7 +379,7 @@ func (s *outputSuite) TestIgnoreEmptyRowChangeEvent(c *check.C) {
},
})
sink := &mockSink{}
node := newSinkNode(sink, 0, 10, &mockFlowController{})
node := newSinkNode(1, sink, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)

// empty row, no Columns and PreColumns.
Expand All @@ -399,7 +399,7 @@ func (s *outputSuite) TestSplitUpdateEventWhenEnableOldValue(c *check.C) {
},
})
sink := &mockSink{}
node := newSinkNode(sink, 0, 10, &mockFlowController{})
node := newSinkNode(1, sink, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)

// nil row.
Expand Down Expand Up @@ -458,7 +458,7 @@ func (s *outputSuite) TestSplitUpdateEventWhenDisableOldValue(c *check.C) {
},
})
sink := &mockSink{}
node := newSinkNode(sink, 0, 10, &mockFlowController{})
node := newSinkNode(1, sink, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)

// nil row.
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/pipeline/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func NewTablePipeline(ctx cdcContext.Context,

p := pipeline.NewPipeline(ctx, 500*time.Millisecond, runnerSize, defaultOutputChannelSize)
sorterNode := newSorterNode(tableName, tableID, replicaInfo.StartTs, flowController, mounter)
sinkNode := newSinkNode(sink, replicaInfo.StartTs, targetTs, flowController)
sinkNode := newSinkNode(tableID, sink, replicaInfo.StartTs, targetTs, flowController)

p.AppendNode(ctx, "puller", newPullerNode(tableID, replicaInfo, tableName))
p.AppendNode(ctx, "sorter", sorterNode)
Expand Down
4 changes: 2 additions & 2 deletions cdc/sink/black_hole.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (b *blackHoleSink) EmitRowChangedEvents(ctx context.Context, rows ...*model
return nil
}

func (b *blackHoleSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) {
func (b *blackHoleSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) {
log.Debug("BlockHoleSink: FlushRowChangedEvents", zap.Uint64("resolvedTs", resolvedTs))
err := b.statistics.RecordBatchExecution(func() (int, error) {
// TODO: add some random replication latency
Expand Down Expand Up @@ -79,6 +79,6 @@ func (b *blackHoleSink) Close(ctx context.Context) error {
return nil
}

func (b *blackHoleSink) Barrier(ctx context.Context) error {
func (b *blackHoleSink) Barrier(ctx context.Context, tableID model.TableID) error {
return nil
}
5 changes: 3 additions & 2 deletions cdc/sink/buffer_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ func (b *bufferSink) run(ctx context.Context, errCh chan error) {
b.bufferMu.Unlock()

start := time.Now()
checkpointTs, err := b.Sink.FlushRowChangedEvents(ctx, resolvedTs)
// todo: use real table ID
checkpointTs, err := b.Sink.FlushRowChangedEvents(ctx, 0, resolvedTs)
if err != nil {
if errors.Cause(err) != context.Canceled {
errCh <- err
Expand Down Expand Up @@ -146,7 +147,7 @@ func (b *bufferSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.Ro
return nil
}

func (b *bufferSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) {
func (b *bufferSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) {
select {
case <-ctx.Done():
return atomic.LoadUint64(&b.checkpointTs), ctx.Err()
Expand Down
4 changes: 2 additions & 2 deletions cdc/sink/cdclog/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func (f *fileSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowC
return f.emitRowChangedEvents(ctx, newTableStream, rows...)
}

func (f *fileSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) {
func (f *fileSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) {
log.Debug("[FlushRowChangedEvents] enter", zap.Uint64("ts", resolvedTs))
return f.flushRowChangedEvents(ctx, resolvedTs)
}
Expand Down Expand Up @@ -349,7 +349,7 @@ func (f *fileSink) Close(ctx context.Context) error {
return nil
}

func (f *fileSink) Barrier(ctx context.Context) error {
func (f *fileSink) Barrier(ctx context.Context, tableID model.TableID) error {
// Barrier does nothing because FlushRowChangedEvents in file sink has flushed
// all buffered events forcedlly.
return nil
Expand Down
4 changes: 2 additions & 2 deletions cdc/sink/cdclog/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func (s *s3Sink) flushLogMeta(ctx context.Context) error {
return cerror.WrapError(cerror.ErrS3SinkWriteStorage, s.storage.WriteFile(ctx, logMetaFile, data))
}

func (s *s3Sink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) {
func (s *s3Sink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) {
// we should flush all events before resolvedTs, there are two kind of flush policy
// 1. flush row events to a s3 chunk: if the event size is not enough,
// TODO: when cdc crashed, we should repair these chunks to a complete file
Expand Down Expand Up @@ -347,7 +347,7 @@ func (s *s3Sink) Close(ctx context.Context) error {
return nil
}

func (s *s3Sink) Barrier(ctx context.Context) error {
func (s *s3Sink) Barrier(ctx context.Context, tableID model.TableID) error {
// Barrier does nothing because FlushRowChangedEvents in s3 sink has flushed
// all buffered events forcedlly.
return nil
Expand Down
6 changes: 3 additions & 3 deletions cdc/sink/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (m *Manager) getMinEmittedTs() model.Ts {
return minTs
}

func (m *Manager) flushBackendSink(ctx context.Context) (model.Ts, error) {
func (m *Manager) flushBackendSink(ctx context.Context, tableID model.TableID) (model.Ts, error) {
// NOTICE: Because all table sinks will try to flush backend sink,
// which will cause a lot of lock contention and blocking in high concurrency cases.
// So here we use flushing as a lightweight lock to improve the lock competition problem.
Expand All @@ -119,7 +119,7 @@ func (m *Manager) flushBackendSink(ctx context.Context) (model.Ts, error) {
atomic.StoreInt64(&m.flushing, 0)
}()
minEmittedTs := m.getMinEmittedTs()
checkpointTs, err := m.backendSink.FlushRowChangedEvents(ctx, minEmittedTs)
checkpointTs, err := m.backendSink.FlushRowChangedEvents(ctx, tableID, minEmittedTs)
if err != nil {
return m.getCheckpointTs(), errors.Trace(err)
}
Expand All @@ -142,7 +142,7 @@ func (m *Manager) destroyTableSink(ctx context.Context, tableID model.TableID) e
return ctx.Err()
case <-callback:
}
return m.backendSink.Barrier(ctx)
return m.backendSink.Barrier(ctx, tableID)
}

func (m *Manager) getCheckpointTs() uint64 {
Expand Down
26 changes: 13 additions & 13 deletions cdc/sink/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (c *checkSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error
panic("unreachable")
}

func (c *checkSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) {
func (c *checkSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) {
c.rowsMu.Lock()
defer c.rowsMu.Unlock()
var newRows []*model.RowChangedEvent
Expand Down Expand Up @@ -83,7 +83,7 @@ func (c *checkSink) Close(ctx context.Context) error {
return nil
}

func (c *checkSink) Barrier(ctx context.Context) error {
func (c *checkSink) Barrier(ctx context.Context, tableID model.TableID) error {
return nil
}

Expand Down Expand Up @@ -118,7 +118,7 @@ func (s *managerSuite) TestManagerRandom(c *check.C) {
for j := 1; j < rowNum; j++ {
if rand.Intn(10) == 0 {
resolvedTs := lastResolvedTs + uint64(rand.Intn(j-int(lastResolvedTs)))
_, err := tableSink.FlushRowChangedEvents(ctx, resolvedTs)
_, err := tableSink.FlushRowChangedEvents(ctx, model.TableID(i), resolvedTs)
c.Assert(err, check.IsNil)
lastResolvedTs = resolvedTs
} else {
Expand All @@ -129,7 +129,7 @@ func (s *managerSuite) TestManagerRandom(c *check.C) {
c.Assert(err, check.IsNil)
}
}
_, err := tableSink.FlushRowChangedEvents(ctx, uint64(rowNum))
_, err := tableSink.FlushRowChangedEvents(ctx, model.TableID(i), uint64(rowNum))
c.Assert(err, check.IsNil)
}()
}
Expand Down Expand Up @@ -180,7 +180,7 @@ func (s *managerSuite) TestManagerAddRemoveTable(c *check.C) {
})
c.Assert(err, check.IsNil)
}
_, err := sink.FlushRowChangedEvents(ctx, resolvedTs)
_, err := sink.FlushRowChangedEvents(ctx, sink.(*tableSink).tableID, resolvedTs)
if err != nil {
c.Assert(errors.Cause(err), check.Equals, context.Canceled)
}
Expand Down Expand Up @@ -244,7 +244,7 @@ func (s *managerSuite) TestManagerDestroyTableSink(c *check.C) {
CommitTs: uint64(110),
})
c.Assert(err, check.IsNil)
_, err = tableSink.FlushRowChangedEvents(ctx, 110)
_, err = tableSink.FlushRowChangedEvents(ctx, tableID, 110)
c.Assert(err, check.IsNil)
err = manager.destroyTableSink(ctx, tableID)
c.Assert(err, check.IsNil)
Expand Down Expand Up @@ -295,11 +295,11 @@ func BenchmarkManagerFlushing(b *testing.B) {
// All tables are flushed concurrently, except table 0.
for i := 1; i < goroutineNum; i++ {
i := i
tableSink := tableSinks[i]
tblSink := tableSinks[i]
go func() {
for j := 1; j < rowNum; j++ {
if j%2 == 0 {
_, err := tableSink.FlushRowChangedEvents(context.Background(), uint64(j))
_, err := tblSink.FlushRowChangedEvents(context.Background(), tblSink.(*tableSink).tableID, uint64(j))
if err != nil {
b.Error(err)
}
Expand All @@ -310,9 +310,9 @@ func BenchmarkManagerFlushing(b *testing.B) {

b.ResetTimer()
// Table 0 flush.
tableSink := tableSinks[0]
tblSink := tableSinks[0]
for i := 0; i < b.N; i++ {
_, err := tableSink.FlushRowChangedEvents(context.Background(), uint64(rowNum))
_, err := tblSink.FlushRowChangedEvents(context.Background(), tblSink.(*tableSink).tableID, uint64(rowNum))
if err != nil {
b.Error(err)
}
Expand Down Expand Up @@ -345,7 +345,7 @@ func (e *errorSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error
panic("unreachable")
}

func (e *errorSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) {
func (e *errorSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) {
return 0, errors.New("error in flush row changed events")
}

Expand All @@ -357,7 +357,7 @@ func (e *errorSink) Close(ctx context.Context) error {
return nil
}

func (e *errorSink) Barrier(ctx context.Context) error {
func (e *errorSink) Barrier(ctx context.Context, tableID model.TableID) error {
return nil
}

Expand All @@ -374,7 +374,7 @@ func (s *managerSuite) TestManagerError(c *check.C) {
Table: &model.TableName{TableID: 1},
})
c.Assert(err, check.IsNil)
_, err = sink.FlushRowChangedEvents(ctx, 2)
_, err = sink.FlushRowChangedEvents(ctx, 1, 2)
c.Assert(err, check.IsNil)
err = <-errCh
c.Assert(err.Error(), check.Equals, "error in emit row changed events")
Expand Down
4 changes: 2 additions & 2 deletions cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (k *mqSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowCha
return nil
}

func (k *mqSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) {
func (k *mqSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) {
if resolvedTs <= k.checkpointTs {
return k.checkpointTs, nil
}
Expand Down Expand Up @@ -260,7 +260,7 @@ func (k *mqSink) Close(ctx context.Context) error {
return errors.Trace(err)
}

func (k *mqSink) Barrier(cxt context.Context) error {
func (k *mqSink) Barrier(cxt context.Context, tableID model.TableID) error {
// Barrier does nothing because FlushRowChangedEvents in mq sink has flushed
// all buffered events by force.
return nil
Expand Down
10 changes: 6 additions & 4 deletions cdc/sink/mq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,22 +80,24 @@ func (s mqSinkSuite) TestKafkaSink(c *check.C) {

// mock kafka broker processes 1 row changed event
leader.Returns(prodSuccess)
tableID := model.TableID(1)
row := &model.RowChangedEvent{
Table: &model.TableName{
Schema: "test",
Table: "t1",
Schema: "test",
Table: "t1",
TableID: tableID,
},
StartTs: 100,
CommitTs: 120,
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}},
}
err = sink.EmitRowChangedEvents(ctx, row)
c.Assert(err, check.IsNil)
checkpointTs, err := sink.FlushRowChangedEvents(ctx, uint64(120))
checkpointTs, err := sink.FlushRowChangedEvents(ctx, tableID, uint64(120))
c.Assert(err, check.IsNil)
c.Assert(checkpointTs, check.Equals, uint64(120))
// flush older resolved ts
checkpointTs, err = sink.FlushRowChangedEvents(ctx, uint64(110))
checkpointTs, err = sink.FlushRowChangedEvents(ctx, tableID, uint64(110))
c.Assert(err, check.IsNil)
c.Assert(checkpointTs, check.Equals, uint64(120))

Expand Down
Loading