Skip to content

Commit

Permalink
Merge branch 'master' into rustin-patch-sink-row
Browse files Browse the repository at this point in the history
  • Loading branch information
Rustin170506 authored Oct 27, 2021
2 parents 11e2305 + 427b70c commit 56ebd20
Show file tree
Hide file tree
Showing 29 changed files with 2,385 additions and 150 deletions.
3 changes: 3 additions & 0 deletions cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,9 @@ func (info *ChangeFeedInfo) VerifyAndFix() error {
if info.Config.Scheduler == nil {
info.Config.Scheduler = defaultConfig.Scheduler
}
if info.Config.Consistent == nil {
info.Config.Consistent = defaultConfig.Consistent
}
return nil
}

Expand Down
6 changes: 3 additions & 3 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,9 +233,9 @@ const (
// more info https://github.com/tinylib/msgp/issues/158, https://github.com/tinylib/msgp/issues/149
// so define a RedoColumn, RedoDDLEvent instead of using the Column, DDLEvent
type RedoLog struct {
Row *RedoRowChangedEvent `msg:"row"`
DDL *RedoDDLEvent `msg:"ddl"`
Type RedoLogType `msg:"type"`
RedoRow *RedoRowChangedEvent `msg:"row"`
RedoDDL *RedoDDLEvent `msg:"ddl"`
Type RedoLogType `msg:"type"`
}

// RedoRowChangedEvent represents the DML event used in RedoLog
Expand Down
122 changes: 61 additions & 61 deletions cdc/model/sink_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 26 additions & 1 deletion cdc/processor/pipeline/sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package pipeline

import (
"context"
"sync/atomic"
"time"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -48,14 +49,21 @@ type sorterNode struct {

wg errgroup.Group
cancel context.CancelFunc

// The latest resolved ts that sorter has received.
resolvedTs model.Ts
}

func newSorterNode(tableName string, tableID model.TableID, flowController tableFlowController, mounter entry.Mounter) pipeline.Node {
func newSorterNode(
tableName string, tableID model.TableID, startTs model.Ts,
flowController tableFlowController, mounter entry.Mounter,
) *sorterNode {
return &sorterNode{
tableName: tableName,
tableID: tableID,
flowController: flowController,
mounter: mounter,
resolvedTs: startTs,
}
}

Expand Down Expand Up @@ -191,6 +199,19 @@ func (n *sorterNode) Receive(ctx pipeline.NodeContext) error {
msg := ctx.Message()
switch msg.Tp {
case pipeline.MessageTypePolymorphicEvent:
rawKV := msg.PolymorphicEvent.RawKV
if rawKV != nil && rawKV.OpType == model.OpTypeResolved {
// Puller resolved ts should not fall back.
resolvedTs := rawKV.CRTs
oldResolvedTs := atomic.SwapUint64(&n.resolvedTs, resolvedTs)
if oldResolvedTs > resolvedTs {
log.Panic("resolved ts regression",
zap.Int64("tableID", n.tableID),
zap.Uint64("resolvedTs", resolvedTs),
zap.Uint64("oldResolvedTs", oldResolvedTs))
}
atomic.StoreUint64(&n.resolvedTs, rawKV.CRTs)
}
n.sorter.AddEntry(ctx, msg.PolymorphicEvent)
default:
ctx.SendToNextNode(msg)
Expand All @@ -203,3 +224,7 @@ func (n *sorterNode) Destroy(ctx pipeline.NodeContext) error {
n.cancel()
return n.wg.Wait()
}

func (n *sorterNode) ResolvedTs() model.Ts {
return atomic.LoadUint64(&n.resolvedTs)
}
Loading

0 comments on commit 56ebd20

Please sign in to comment.