Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink(ticdc): add batch field to resolveTs #5342

Merged
merged 2 commits into from
May 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func NewMounter(schemaStorage SchemaStorage,
// this method could block indefinitely if the DDL puller is lagging.
func (m *mounterImpl) DecodeEvent(ctx context.Context, pEvent *model.PolymorphicEvent) error {
m.metricTotalRows.Inc()
if pEvent.RawKV.OpType == model.OpTypeResolved {
if pEvent.IsResolved() {
CharlesCheung96 marked this conversation as resolved.
Show resolved Hide resolved
return nil
}
start := time.Now()
Expand Down
47 changes: 44 additions & 3 deletions cdc/model/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ type PolymorphicEvent struct {
StartTs uint64
// Commit or resolved TS
CRTs uint64
// Identify whether the resolved event is in batch mode.
Mode ResolvedMode

RawKV *RawKVEntry
Row *RowChangedEvent
Expand All @@ -44,7 +46,7 @@ func NewPolymorphicEvent(rawKV *RawKVEntry) *PolymorphicEvent {
}
}

// NewResolvedPolymorphicEvent creates a new PolymorphicEvent with the resolved ts
// NewResolvedPolymorphicEvent creates a new PolymorphicEvent with the resolved ts.
func NewResolvedPolymorphicEvent(regionID uint64, resolvedTs uint64) *PolymorphicEvent {
return &PolymorphicEvent{
CRTs: resolvedTs,
Expand All @@ -58,13 +60,24 @@ func (e *PolymorphicEvent) RegionID() uint64 {
return e.RawKV.RegionID
}

// IsResolved returns true if the event is resolved. Note that this function can
// only be called when `RawKV != nil`.
func (e *PolymorphicEvent) IsResolved() bool {
return e.RawKV.OpType == OpTypeResolved
}

// IsBatchResolved returns true if the event is batch resolved event.
func (e *PolymorphicEvent) IsBatchResolved() bool {
return e.IsResolved() && e.Mode == BatchResolvedMode
}

// ComparePolymorphicEvents compares two events by CRTs, Resolved, StartTs, Delete/Put order.
// It returns true if and only if i should precede j.
func ComparePolymorphicEvents(i, j *PolymorphicEvent) bool {
if i.CRTs == j.CRTs {
if i.RawKV.OpType == OpTypeResolved {
if i.IsResolved() {
return false
} else if j.RawKV.OpType == OpTypeResolved {
} else if j.IsResolved() {
3AceShowHand marked this conversation as resolved.
Show resolved Hide resolved
return true
}

Expand All @@ -80,3 +93,31 @@ func ComparePolymorphicEvents(i, j *PolymorphicEvent) bool {
}
return i.CRTs < j.CRTs
}

// ResolvedMode describes the batch type of a resolved event.
type ResolvedMode int

const (
// NormalResolvedMode means that all events whose commitTs is less than or equal to
// `resolved.Ts` are sent to Sink.
NormalResolvedMode ResolvedMode = iota
// BatchResolvedMode means that all events whose commitTs is less than
// 'resolved.Ts' are sent to Sink.
BatchResolvedMode
)

// ResolvedTs is the resolved timestamp of sink module.
type ResolvedTs struct {
Ts uint64
Mode ResolvedMode
}

// NewResolvedTs creates a new ResolvedTs.
func NewResolvedTs(t uint64) ResolvedTs {
return ResolvedTs{Ts: t, Mode: NormalResolvedMode}
}

// NewResolvedTsWithMode creates a ResolvedTs with a given batch type.
func NewResolvedTsWithMode(t uint64, m ResolvedMode) ResolvedTs {
return ResolvedTs{Ts: t, Mode: m}
}
2 changes: 1 addition & 1 deletion cdc/processor/pipeline/cyclic_mark.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (n *cyclicMarkNode) TryHandleDataMessage(
case pmessage.MessageTypePolymorphicEvent:
event := msg.PolymorphicEvent
n.flush(ctx, event.CRTs)
if event.RawKV.OpType == model.OpTypeResolved {
if event.IsResolved() {
ctx.SendToNextNode(msg)
return true, nil
}
Expand Down
4 changes: 2 additions & 2 deletions cdc/processor/pipeline/cyclic_mark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func TestCyclicMarkNode(t *testing.T) {
go func() {
defer wg.Done()
for row := range outputCh {
if row.PolymorphicEvent.RawKV.OpType == model.OpTypeResolved {
if row.PolymorphicEvent.IsResolved() {
continue
}
output = append(output, row.PolymorphicEvent.Row)
Expand Down Expand Up @@ -213,7 +213,7 @@ func TestCyclicMarkNode(t *testing.T) {
require.Nil(t, err)
output := []*model.RowChangedEvent{}
putToOutput := func(row *pmessage.Message) {
if row == nil || row.PolymorphicEvent.RawKV.OpType == model.OpTypeResolved {
if row == nil || row.PolymorphicEvent.IsResolved() {
return
}
output = append(output, row.PolymorphicEvent.Row)
Expand Down
42 changes: 23 additions & 19 deletions cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ type sinkNode struct {
status TableStatus
tableID model.TableID

resolvedTs model.Ts
// atomic oprations for model.ResolvedTs
resolvedTs atomic.Value
CharlesCheung96 marked this conversation as resolved.
Show resolved Hide resolved
checkpointTs model.Ts
targetTs model.Ts
barrierTs model.Ts
Expand All @@ -83,23 +84,24 @@ type sinkNode struct {
}

func newSinkNode(tableID model.TableID, sink sink.Sink, startTs model.Ts, targetTs model.Ts, flowController tableFlowController) *sinkNode {
return &sinkNode{
sn := &sinkNode{
tableID: tableID,
sink: sink,
status: TableStatusInitializing,
targetTs: targetTs,
resolvedTs: startTs,
checkpointTs: startTs,
barrierTs: startTs,

flowController: flowController,
}
sn.resolvedTs.Store(model.NewResolvedTs(startTs))
return sn
}

func (n *sinkNode) ResolvedTs() model.Ts { return atomic.LoadUint64(&n.resolvedTs) }
func (n *sinkNode) CheckpointTs() model.Ts { return atomic.LoadUint64(&n.checkpointTs) }
func (n *sinkNode) BarrierTs() model.Ts { return atomic.LoadUint64(&n.barrierTs) }
func (n *sinkNode) Status() TableStatus { return n.status.Load() }
func (n *sinkNode) ResolvedTs() model.ResolvedTs { return n.resolvedTs.Load().(model.ResolvedTs) }
func (n *sinkNode) CheckpointTs() model.Ts { return atomic.LoadUint64(&n.checkpointTs) }
func (n *sinkNode) BarrierTs() model.Ts { return atomic.LoadUint64(&n.barrierTs) }
func (n *sinkNode) Status() TableStatus { return n.status.Load() }

func (n *sinkNode) Init(ctx pipeline.NodeContext) error {
n.replicaConfig = ctx.ChangefeedVars().Info.Config
Expand Down Expand Up @@ -129,7 +131,7 @@ func (n *sinkNode) stop(ctx context.Context) (err error) {

// flushSink emits all rows in rowBuffer to the backend sink and flushes
// the backend sink.
func (n *sinkNode) flushSink(ctx context.Context, resolvedTs model.Ts) (err error) {
func (n *sinkNode) flushSink(ctx context.Context, resolved model.ResolvedTs) (err error) {
defer func() {
if err != nil {
n.status.Store(TableStatusStopped)
Expand All @@ -141,16 +143,16 @@ func (n *sinkNode) flushSink(ctx context.Context, resolvedTs model.Ts) (err erro
}()
currentBarrierTs := atomic.LoadUint64(&n.barrierTs)
currentCheckpointTs := atomic.LoadUint64(&n.checkpointTs)
if resolvedTs > currentBarrierTs {
resolvedTs = currentBarrierTs
if resolved.Ts > currentBarrierTs {
resolved.Ts = currentBarrierTs
}
if resolvedTs > n.targetTs {
resolvedTs = n.targetTs
if resolved.Ts > n.targetTs {
resolved.Ts = n.targetTs
}
if resolvedTs <= currentCheckpointTs {
if resolved.Ts <= currentCheckpointTs {
return nil
}
checkpointTs, err := n.sink.FlushRowChangedEvents(ctx, n.tableID, resolvedTs)
checkpointTs, err := n.sink.FlushRowChangedEvents(ctx, n.tableID, resolved)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -291,24 +293,26 @@ func (n *sinkNode) HandleMessage(ctx context.Context, msg pmessage.Message) (boo
switch msg.Tp {
case pmessage.MessageTypePolymorphicEvent:
event := msg.PolymorphicEvent
if event.RawKV.OpType == model.OpTypeResolved {
if event.IsResolved() {
CharlesCheung96 marked this conversation as resolved.
Show resolved Hide resolved
if n.status.Load() == TableStatusInitializing {
n.status.Store(TableStatusRunning)
}
failpoint.Inject("ProcessorSyncResolvedError", func() {
failpoint.Return(false, errors.New("processor sync resolved injected error"))
})
if err := n.flushSink(ctx, msg.PolymorphicEvent.CRTs); err != nil {

resolved := model.NewResolvedTsWithMode(event.CRTs, event.Mode)
if err := n.flushSink(ctx, resolved); err != nil {
return false, errors.Trace(err)
}
atomic.StoreUint64(&n.resolvedTs, msg.PolymorphicEvent.CRTs)
n.resolvedTs.Store(resolved)
return true, nil
}
if err := n.emitRowToSink(ctx, event); err != nil {
return false, errors.Trace(err)
}
case pmessage.MessageTypeTick:
if err := n.flushSink(ctx, atomic.LoadUint64(&n.resolvedTs)); err != nil {
if err := n.flushSink(ctx, n.ResolvedTs()); err != nil {
return false, errors.Trace(err)
}
case pmessage.MessageTypeCommand:
Expand All @@ -327,7 +331,7 @@ func (n *sinkNode) HandleMessage(ctx context.Context, msg pmessage.Message) (boo

func (n *sinkNode) updateBarrierTs(ctx context.Context, ts model.Ts) error {
atomic.StoreUint64(&n.barrierTs, ts)
if err := n.flushSink(ctx, atomic.LoadUint64(&n.resolvedTs)); err != nil {
if err := n.flushSink(ctx, n.ResolvedTs()); err != nil {
return errors.Trace(err)
}
return nil
Expand Down
24 changes: 14 additions & 10 deletions cdc/processor/pipeline/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,14 @@ func (s *mockSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error
panic("unreachable")
}

func (s *mockSink) FlushRowChangedEvents(ctx context.Context, _ model.TableID, resolvedTs uint64) (uint64, error) {
func (s *mockSink) FlushRowChangedEvents(
ctx context.Context, _ model.TableID, resolved model.ResolvedTs,
) (uint64, error) {
s.received = append(s.received, struct {
resolvedTs model.Ts
row *model.RowChangedEvent
}{resolvedTs: resolvedTs})
return resolvedTs, nil
}{resolvedTs: resolved.Ts})
return resolved.Ts, nil
}

func (s *mockSink) EmitCheckpointTs(_ context.Context, _ uint64, _ []model.TableName) error {
Expand Down Expand Up @@ -420,7 +422,7 @@ func TestManyTs(t *testing.T) {
{resolvedTs: 1},
})
sink.Reset()
require.Equal(t, uint64(2), node.ResolvedTs())
require.Equal(t, model.NewResolvedTs(uint64(2)), node.ResolvedTs())
require.Equal(t, uint64(1), node.CheckpointTs())

require.Nil(t, node.Receive(
Expand All @@ -433,7 +435,7 @@ func TestManyTs(t *testing.T) {
{resolvedTs: 2},
})
sink.Reset()
require.Equal(t, uint64(2), node.ResolvedTs())
require.Equal(t, model.NewResolvedTs(uint64(2)), node.ResolvedTs())
require.Equal(t, uint64(2), node.CheckpointTs())
}

Expand Down Expand Up @@ -646,11 +648,13 @@ type flushSink struct {
// fall back
var fallBackResolvedTs = uint64(10)

func (s *flushSink) FlushRowChangedEvents(ctx context.Context, _ model.TableID, resolvedTs uint64) (uint64, error) {
if resolvedTs == fallBackResolvedTs {
func (s *flushSink) FlushRowChangedEvents(
ctx context.Context, _ model.TableID, resolved model.ResolvedTs,
) (uint64, error) {
if resolved.Ts == fallBackResolvedTs {
return 0, nil
}
return resolvedTs, nil
return resolved.Ts, nil
}

// TestFlushSinkReleaseFlowController tests sinkNode.flushSink method will always
Expand All @@ -674,12 +678,12 @@ func TestFlushSinkReleaseFlowController(t *testing.T) {
require.Nil(t, sNode.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil)))
sNode.barrierTs = 10

err := sNode.flushSink(context.Background(), uint64(8))
err := sNode.flushSink(context.Background(), model.NewResolvedTs(uint64(8)))
require.Nil(t, err)
require.Equal(t, uint64(8), sNode.checkpointTs)
require.Equal(t, 1, flowController.releaseCounter)
// resolvedTs will fall back in this call
err = sNode.flushSink(context.Background(), uint64(10))
err = sNode.flushSink(context.Background(), model.NewResolvedTs(uint64(10)))
require.Nil(t, err)
require.Equal(t, uint64(8), sNode.checkpointTs)
require.Equal(t, 2, flowController.releaseCounter)
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/pipeline/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (t *tablePipelineImpl) ResolvedTs() model.Ts {
// another replication barrier for consistent replication instead of reusing
// the global resolved-ts.
if redo.IsConsistentEnabled(t.replConfig.Consistent.Level) {
return t.sinkNode.ResolvedTs()
return t.sinkNode.ResolvedTs().Ts
}
return t.sorterNode.ResolvedTs()
}
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/pipeline/table_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ func (t *tableActor) ResolvedTs() model.Ts {
// another replication barrier for consistent replication instead of reusing
// the global resolved-ts.
if redo.IsConsistentEnabled(t.replicaConfig.Consistent.Level) {
return t.sinkNode.ResolvedTs()
return t.sinkNode.ResolvedTs().Ts
}
return t.sortNode.ResolvedTs()
}
Expand Down
35 changes: 20 additions & 15 deletions cdc/processor/pipeline/table_actor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func TestTableActorInterface(t *testing.T) {

require.Equal(t, model.Ts(5), tbl.ResolvedTs())
tbl.replicaConfig.Consistent.Level = string(redo.ConsistentLevelEventual)
atomic.StoreUint64(&sink.resolvedTs, 6)
sink.resolvedTs.Store(model.NewResolvedTs(6))
require.Equal(t, model.Ts(6), tbl.ResolvedTs())
}

Expand Down Expand Up @@ -184,15 +184,18 @@ func TestPollStoppedActor(t *testing.T) {

func TestPollTickMessage(t *testing.T) {
startTime := time.Now().Add(-sinkFlushInterval)

sn := &sinkNode{
status: TableStatusInitializing,
sink: &mockSink{},
flowController: &mockFlowController{},
checkpointTs: 10,
targetTs: 11,
}
sn.resolvedTs.Store(model.NewResolvedTs(10))

tbl := tableActor{
sinkNode: &sinkNode{
status: TableStatusInitializing,
sink: &mockSink{},
flowController: &mockFlowController{},
resolvedTs: 10,
checkpointTs: 10,
targetTs: 11,
},
sinkNode: sn,
lastFlushSinkTime: time.Now().Add(-2 * sinkFlushInterval),
cancel: func() {},
reportErr: func(err error) {},
Expand Down Expand Up @@ -235,13 +238,15 @@ func TestPollStopMessage(t *testing.T) {
}

func TestPollBarrierTsMessage(t *testing.T) {
sn := &sinkNode{
targetTs: 10,
checkpointTs: 5,
barrierTs: 8,
}
sn.resolvedTs.Store(model.NewResolvedTs(5))

tbl := tableActor{
sinkNode: &sinkNode{
targetTs: 10,
checkpointTs: 5,
resolvedTs: 5,
barrierTs: 8,
},
sinkNode: sn,
sortNode: &sorterNode{
barrierTs: 8,
},
Expand Down
Loading