Skip to content

Commit

Permalink
add batch field to resolved ts
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed May 16, 2022
1 parent 46136be commit 0aae81c
Show file tree
Hide file tree
Showing 29 changed files with 222 additions and 147 deletions.
2 changes: 1 addition & 1 deletion cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func NewMounter(schemaStorage SchemaStorage,
// this method could block indefinitely if the DDL puller is lagging.
func (m *mounterImpl) DecodeEvent(ctx context.Context, pEvent *model.PolymorphicEvent) error {
m.metricTotalRows.Inc()
if pEvent.RawKV.OpType == model.OpTypeResolved {
if pEvent.IsResolved() {
return nil
}
start := time.Now()
Expand Down
38 changes: 33 additions & 5 deletions cdc/model/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,19 @@

package model

// PolymorphicEvent describes an event can be in multiple states
// PolymorphicEvent describes an event can be in multiple states.
type PolymorphicEvent struct {
StartTs uint64
// Commit or resolved TS
CRTs uint64
// Identify whether the resolved event is in batch mode.
Batch bool

RawKV *RawKVEntry
Row *RowChangedEvent
}

// NewPolymorphicEvent creates a new PolymorphicEvent with a raw KV
// NewPolymorphicEvent creates a new PolymorphicEvent with a raw KV.
func NewPolymorphicEvent(rawKV *RawKVEntry) *PolymorphicEvent {
if rawKV.OpType == OpTypeResolved {
return NewResolvedPolymorphicEvent(rawKV.RegionID, rawKV.CRTs)
Expand All @@ -35,7 +37,7 @@ func NewPolymorphicEvent(rawKV *RawKVEntry) *PolymorphicEvent {
}
}

// NewResolvedPolymorphicEvent creates a new PolymorphicEvent with the resolved ts
// NewResolvedPolymorphicEvent creates a new PolymorphicEvent with the resolved ts.
func NewResolvedPolymorphicEvent(regionID uint64, resolvedTs uint64) *PolymorphicEvent {
return &PolymorphicEvent{
CRTs: resolvedTs,
Expand All @@ -49,13 +51,23 @@ func (e *PolymorphicEvent) RegionID() uint64 {
return e.RawKV.RegionID
}

// IsResolved returns true if the event is resolved.
func (e *PolymorphicEvent) IsResolved() bool {
return e.RawKV != nil && e.RawKV.OpType == OpTypeResolved
}

// IsBatchResolved returns true if the event is batch resolved event.
func (e *PolymorphicEvent) IsBatchResolved() bool {
return e.IsResolved() && e.Batch
}

// ComparePolymorphicEvents compares two events by CRTs, Resolved, StartTs, Delete/Put order.
// It returns true if and only if i should precede j.
func ComparePolymorphicEvents(i, j *PolymorphicEvent) bool {
if i.CRTs == j.CRTs {
if i.RawKV.OpType == OpTypeResolved {
if i.IsResolved() {
return false
} else if j.RawKV.OpType == OpTypeResolved {
} else if j.IsResolved() {
return true
}

Expand All @@ -71,3 +83,19 @@ func ComparePolymorphicEvents(i, j *PolymorphicEvent) bool {
}
return i.CRTs < j.CRTs
}

// ResolvedTs is the resolved timestamp of sink module.
type ResolvedTs struct {
Ts uint64
Batch bool
}

// NewResolvedTs creates a new ResolvedTs.
func NewResolvedTs(t uint64) ResolvedTs {
return ResolvedTs{Ts: t, Batch: false}
}

// NewBatchResolvedTs creates a ResolvedTs with a given batch type.
func NewBatchResolvedTs(t uint64, b bool) ResolvedTs {
return ResolvedTs{Ts: t, Batch: b}
}
2 changes: 1 addition & 1 deletion cdc/processor/pipeline/cyclic_mark.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (n *cyclicMarkNode) TryHandleDataMessage(
case pmessage.MessageTypePolymorphicEvent:
event := msg.PolymorphicEvent
n.flush(ctx, event.CRTs)
if event.RawKV.OpType == model.OpTypeResolved {
if event.IsResolved() {
ctx.SendToNextNode(msg)
return true, nil
}
Expand Down
4 changes: 2 additions & 2 deletions cdc/processor/pipeline/cyclic_mark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func TestCyclicMarkNode(t *testing.T) {
go func() {
defer wg.Done()
for row := range outputCh {
if row.PolymorphicEvent.RawKV.OpType == model.OpTypeResolved {
if row.PolymorphicEvent.IsResolved() {
continue
}
output = append(output, row.PolymorphicEvent.Row)
Expand Down Expand Up @@ -213,7 +213,7 @@ func TestCyclicMarkNode(t *testing.T) {
require.Nil(t, err)
output := []*model.RowChangedEvent{}
putToOutput := func(row *pmessage.Message) {
if row == nil || row.PolymorphicEvent.RawKV.OpType == model.OpTypeResolved {
if row == nil || row.PolymorphicEvent.IsResolved() {
return
}
output = append(output, row.PolymorphicEvent.Row)
Expand Down
42 changes: 23 additions & 19 deletions cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ type sinkNode struct {
status TableStatus
tableID model.TableID

resolvedTs model.Ts
// atomic oprations for model.ResolvedTs
resolvedTs atomic.Value
checkpointTs model.Ts
targetTs model.Ts
barrierTs model.Ts
Expand All @@ -83,23 +84,24 @@ type sinkNode struct {
}

func newSinkNode(tableID model.TableID, sink sink.Sink, startTs model.Ts, targetTs model.Ts, flowController tableFlowController) *sinkNode {
return &sinkNode{
sn := &sinkNode{
tableID: tableID,
sink: sink,
status: TableStatusInitializing,
targetTs: targetTs,
resolvedTs: startTs,
checkpointTs: startTs,
barrierTs: startTs,

flowController: flowController,
}
sn.resolvedTs.Store(model.NewResolvedTs(startTs))
return sn
}

func (n *sinkNode) ResolvedTs() model.Ts { return atomic.LoadUint64(&n.resolvedTs) }
func (n *sinkNode) CheckpointTs() model.Ts { return atomic.LoadUint64(&n.checkpointTs) }
func (n *sinkNode) BarrierTs() model.Ts { return atomic.LoadUint64(&n.barrierTs) }
func (n *sinkNode) Status() TableStatus { return n.status.Load() }
func (n *sinkNode) ResolvedTs() model.ResolvedTs { return n.resolvedTs.Load().(model.ResolvedTs) }
func (n *sinkNode) CheckpointTs() model.Ts { return atomic.LoadUint64(&n.checkpointTs) }
func (n *sinkNode) BarrierTs() model.Ts { return atomic.LoadUint64(&n.barrierTs) }
func (n *sinkNode) Status() TableStatus { return n.status.Load() }

func (n *sinkNode) Init(ctx pipeline.NodeContext) error {
n.replicaConfig = ctx.ChangefeedVars().Info.Config
Expand Down Expand Up @@ -129,7 +131,7 @@ func (n *sinkNode) stop(ctx context.Context) (err error) {

// flushSink emits all rows in rowBuffer to the backend sink and flushes
// the backend sink.
func (n *sinkNode) flushSink(ctx context.Context, resolvedTs model.Ts) (err error) {
func (n *sinkNode) flushSink(ctx context.Context, resolved model.ResolvedTs) (err error) {
defer func() {
if err != nil {
n.status.Store(TableStatusStopped)
Expand All @@ -141,16 +143,16 @@ func (n *sinkNode) flushSink(ctx context.Context, resolvedTs model.Ts) (err erro
}()
currentBarrierTs := atomic.LoadUint64(&n.barrierTs)
currentCheckpointTs := atomic.LoadUint64(&n.checkpointTs)
if resolvedTs > currentBarrierTs {
resolvedTs = currentBarrierTs
if resolved.Ts > currentBarrierTs {
resolved.Ts = currentBarrierTs
}
if resolvedTs > n.targetTs {
resolvedTs = n.targetTs
if resolved.Ts > n.targetTs {
resolved.Ts = n.targetTs
}
if resolvedTs <= currentCheckpointTs {
if resolved.Ts <= currentCheckpointTs {
return nil
}
checkpointTs, err := n.sink.FlushRowChangedEvents(ctx, n.tableID, resolvedTs)
checkpointTs, err := n.sink.FlushRowChangedEvents(ctx, n.tableID, resolved)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -291,24 +293,26 @@ func (n *sinkNode) HandleMessage(ctx context.Context, msg pmessage.Message) (boo
switch msg.Tp {
case pmessage.MessageTypePolymorphicEvent:
event := msg.PolymorphicEvent
if event.RawKV.OpType == model.OpTypeResolved {
if event.IsResolved() {
if n.status.Load() == TableStatusInitializing {
n.status.Store(TableStatusRunning)
}
failpoint.Inject("ProcessorSyncResolvedError", func() {
failpoint.Return(false, errors.New("processor sync resolved injected error"))
})
if err := n.flushSink(ctx, msg.PolymorphicEvent.CRTs); err != nil {

resolved := model.NewBatchResolvedTs(event.CRTs, event.Batch)
if err := n.flushSink(ctx, resolved); err != nil {
return false, errors.Trace(err)
}
atomic.StoreUint64(&n.resolvedTs, msg.PolymorphicEvent.CRTs)
n.resolvedTs.Store(resolved)
return true, nil
}
if err := n.emitRowToSink(ctx, event); err != nil {
return false, errors.Trace(err)
}
case pmessage.MessageTypeTick:
if err := n.flushSink(ctx, atomic.LoadUint64(&n.resolvedTs)); err != nil {
if err := n.flushSink(ctx, n.ResolvedTs()); err != nil {
return false, errors.Trace(err)
}
case pmessage.MessageTypeCommand:
Expand All @@ -327,7 +331,7 @@ func (n *sinkNode) HandleMessage(ctx context.Context, msg pmessage.Message) (boo

func (n *sinkNode) updateBarrierTs(ctx context.Context, ts model.Ts) error {
atomic.StoreUint64(&n.barrierTs, ts)
if err := n.flushSink(ctx, atomic.LoadUint64(&n.resolvedTs)); err != nil {
if err := n.flushSink(ctx, n.ResolvedTs()); err != nil {
return errors.Trace(err)
}
return nil
Expand Down
24 changes: 14 additions & 10 deletions cdc/processor/pipeline/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,14 @@ func (s *mockSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error
panic("unreachable")
}

func (s *mockSink) FlushRowChangedEvents(ctx context.Context, _ model.TableID, resolvedTs uint64) (uint64, error) {
func (s *mockSink) FlushRowChangedEvents(
ctx context.Context, _ model.TableID, resolved model.ResolvedTs,
) (uint64, error) {
s.received = append(s.received, struct {
resolvedTs model.Ts
row *model.RowChangedEvent
}{resolvedTs: resolvedTs})
return resolvedTs, nil
}{resolvedTs: resolved.Ts})
return resolved.Ts, nil
}

func (s *mockSink) EmitCheckpointTs(_ context.Context, _ uint64, _ []model.TableName) error {
Expand Down Expand Up @@ -416,7 +418,7 @@ func TestManyTs(t *testing.T) {
{resolvedTs: 1},
})
sink.Reset()
require.Equal(t, uint64(2), node.ResolvedTs())
require.Equal(t, model.NewResolvedTs(uint64(2)), node.ResolvedTs())
require.Equal(t, uint64(1), node.CheckpointTs())

require.Nil(t, node.Receive(
Expand All @@ -429,7 +431,7 @@ func TestManyTs(t *testing.T) {
{resolvedTs: 2},
})
sink.Reset()
require.Equal(t, uint64(2), node.ResolvedTs())
require.Equal(t, model.NewResolvedTs(uint64(2)), node.ResolvedTs())
require.Equal(t, uint64(2), node.CheckpointTs())
}

Expand Down Expand Up @@ -642,11 +644,13 @@ type flushSink struct {
// fall back
var fallBackResolvedTs = uint64(10)

func (s *flushSink) FlushRowChangedEvents(ctx context.Context, _ model.TableID, resolvedTs uint64) (uint64, error) {
if resolvedTs == fallBackResolvedTs {
func (s *flushSink) FlushRowChangedEvents(
ctx context.Context, _ model.TableID, resolved model.ResolvedTs,
) (uint64, error) {
if resolved.Ts == fallBackResolvedTs {
return 0, nil
}
return resolvedTs, nil
return resolved.Ts, nil
}

// TestFlushSinkReleaseFlowController tests sinkNode.flushSink method will always
Expand All @@ -670,12 +674,12 @@ func TestFlushSinkReleaseFlowController(t *testing.T) {
require.Nil(t, sNode.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil)))
sNode.barrierTs = 10

err := sNode.flushSink(context.Background(), uint64(8))
err := sNode.flushSink(context.Background(), model.NewResolvedTs(uint64(8)))
require.Nil(t, err)
require.Equal(t, uint64(8), sNode.checkpointTs)
require.Equal(t, 1, flowController.releaseCounter)
// resolvedTs will fall back in this call
err = sNode.flushSink(context.Background(), uint64(10))
err = sNode.flushSink(context.Background(), model.NewResolvedTs(uint64(10)))
require.Nil(t, err)
require.Equal(t, uint64(8), sNode.checkpointTs)
require.Equal(t, 2, flowController.releaseCounter)
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/pipeline/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (t *tablePipelineImpl) ResolvedTs() model.Ts {
// another replication barrier for consistent replication instead of reusing
// the global resolved-ts.
if redo.IsConsistentEnabled(t.replConfig.Consistent.Level) {
return t.sinkNode.ResolvedTs()
return t.sinkNode.ResolvedTs().Ts
}
return t.sorterNode.ResolvedTs()
}
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/pipeline/table_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ func (t *tableActor) ResolvedTs() model.Ts {
// another replication barrier for consistent replication instead of reusing
// the global resolved-ts.
if redo.IsConsistentEnabled(t.replicaConfig.Consistent.Level) {
return t.sinkNode.ResolvedTs()
return t.sinkNode.ResolvedTs().Ts
}
return t.sortNode.ResolvedTs()
}
Expand Down
35 changes: 20 additions & 15 deletions cdc/processor/pipeline/table_actor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func TestTableActorInterface(t *testing.T) {

require.Equal(t, model.Ts(5), tbl.ResolvedTs())
tbl.replicaConfig.Consistent.Level = string(redo.ConsistentLevelEventual)
atomic.StoreUint64(&sink.resolvedTs, 6)
sink.resolvedTs.Store(model.NewResolvedTs(6))
require.Equal(t, model.Ts(6), tbl.ResolvedTs())
}

Expand Down Expand Up @@ -184,15 +184,18 @@ func TestPollStoppedActor(t *testing.T) {

func TestPollTickMessage(t *testing.T) {
startTime := time.Now().Add(-sinkFlushInterval)

sn := &sinkNode{
status: TableStatusInitializing,
sink: &mockSink{},
flowController: &mockFlowController{},
checkpointTs: 10,
targetTs: 11,
}
sn.resolvedTs.Store(model.NewResolvedTs(10))

tbl := tableActor{
sinkNode: &sinkNode{
status: TableStatusInitializing,
sink: &mockSink{},
flowController: &mockFlowController{},
resolvedTs: 10,
checkpointTs: 10,
targetTs: 11,
},
sinkNode: sn,
lastFlushSinkTime: time.Now().Add(-2 * sinkFlushInterval),
cancel: func() {},
reportErr: func(err error) {},
Expand Down Expand Up @@ -235,13 +238,15 @@ func TestPollStopMessage(t *testing.T) {
}

func TestPollBarrierTsMessage(t *testing.T) {
sn := &sinkNode{
targetTs: 10,
checkpointTs: 5,
barrierTs: 8,
}
sn.resolvedTs.Store(model.NewResolvedTs(5))

tbl := tableActor{
sinkNode: &sinkNode{
targetTs: 10,
checkpointTs: 5,
resolvedTs: 5,
barrierTs: 8,
},
sinkNode: sn,
sortNode: &sorterNode{
barrierTs: 8,
},
Expand Down
Loading

0 comments on commit 0aae81c

Please sign in to comment.