Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink(ticdc): make mysql sink support split transactions #5281

Merged
merged 11 commits into from
May 17, 2022
13 changes: 11 additions & 2 deletions cdc/model/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

package model

// PolymorphicEvent describes an event can be in multiple states
// PolymorphicEvent describes an event can be in multiple states.
type PolymorphicEvent struct {
StartTs uint64
// Commit or resolved TS
Expand All @@ -23,7 +23,16 @@ type PolymorphicEvent struct {
Row *RowChangedEvent
}

// NewPolymorphicEvent creates a new PolymorphicEvent with a raw KV
// NewEmptyPolymorphicEvent creates a new empty PolymorphicEvent.
func NewEmptyPolymorphicEvent(ts uint64) *PolymorphicEvent {
return &PolymorphicEvent{
CRTs: ts,
RawKV: &RawKVEntry{},
Row: &RowChangedEvent{},
}
}

// NewPolymorphicEvent creates a new PolymorphicEvent with a raw KV.
func NewPolymorphicEvent(rawKV *RawKVEntry) *PolymorphicEvent {
if rawKV.OpType == OpTypeResolved {
return NewResolvedPolymorphicEvent(rawKV.RegionID, rawKV.CRTs)
Expand Down
3 changes: 3 additions & 0 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,9 @@ type RowChangedEvent struct {
// ApproximateDataSize is the approximate size of protobuf binary
// representation of this event.
ApproximateDataSize int64 `json:"-" msg:"-"`

// SplitTxn marks this RowChangedEvent as the first line of a new txn.
SplitTxn bool `json:"-" msg:"-"`
}

// IsDelete returns true if the row is a delete event
Expand Down
6 changes: 5 additions & 1 deletion cdc/processor/pipeline/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ type mockSink struct {
// we are testing sinkNode by itself.
type mockFlowController struct{}

func (c *mockFlowController) Consume(commitTs uint64, size uint64, blockCallBack func() error) error {
func (c *mockFlowController) Consume(
msg *model.PolymorphicEvent,
size uint64,
blockCallBack func(bool) error,
) error {
return nil
}

Expand Down
6 changes: 4 additions & 2 deletions cdc/processor/pipeline/sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,10 @@ func (n *sorterNode) start(
size := uint64(msg.Row.ApproximateBytes())
// NOTE we allow the quota to be exceeded if blocking means interrupting a transaction.
// Otherwise the pipeline would deadlock.
err = n.flowController.Consume(commitTs, size, func() error {
if lastCRTs > lastSentResolvedTs {
err = n.flowController.Consume(msg, size, func(batch bool) error {
if batch {
log.Panic("cdc does not support the batch resolve mechanism at this time")
} else if lastCRTs > lastSentResolvedTs {
// If we are blocking, we send a Resolved Event here to elicit a sink-flush.
// Not sending a Resolved Event here will very likely deadlock the pipeline.
lastSentResolvedTs = lastCRTs
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/pipeline/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ type tablePipelineImpl struct {
// TODO find a better name or avoid using an interface
// We use an interface here for ease in unit testing.
type tableFlowController interface {
Consume(commitTs uint64, size uint64, blockCallBack func() error) error
Consume(msg *model.PolymorphicEvent, size uint64, blockCallBack func(batch bool) error) error
Release(resolvedTs uint64)
Abort()
GetConsumption() uint64
Expand Down
92 changes: 80 additions & 12 deletions cdc/sink/flowcontrol/flow_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,16 @@ import (
"github.com/edwingeng/deque"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"go.uber.org/zap"
)

const (
maxRowsPerTxn = 1024
maxSizePerTxn = 1024 * 1024 /* 1MB */
batchSize = 100
)

// TableFlowController provides a convenient interface to control the memory consumption of a per table event stream
type TableFlowController struct {
memoryQuota *tableMemoryQuota
Expand All @@ -31,13 +38,20 @@ type TableFlowController struct {
sync.Mutex
queue deque.Deque
}
// batchGroupCount is the number of txnSizeEntries with same commitTs, which could be:
// 1. Different txns with same commitTs but different startTs
// 2. TxnSizeEntry split from the same txns which exceeds max rows or max size
batchGroupCount uint

lastCommitTs uint64
}

type commitTsSizeEntry struct {
type txnSizeEntry struct {
// txn id
startTs uint64
commitTs uint64
size uint64
rowCount uint64
}

// NewTableFlowController creates a new TableFlowController
Expand All @@ -55,7 +69,12 @@ func NewTableFlowController(quota uint64) *TableFlowController {

// Consume is called when an event has arrived for being processed by the sink.
// It will handle transaction boundaries automatically, and will not block intra-transaction.
func (c *TableFlowController) Consume(commitTs uint64, size uint64, blockCallBack func() error) error {
func (c *TableFlowController) Consume(
msg *model.PolymorphicEvent,
size uint64,
callBack func(batch bool) error,
) error {
commitTs := msg.CRTs
lastCommitTs := atomic.LoadUint64(&c.lastCommitTs)

if commitTs < lastCommitTs {
Expand All @@ -65,8 +84,7 @@ func (c *TableFlowController) Consume(commitTs uint64, size uint64, blockCallBac
}

if commitTs > lastCommitTs {
atomic.StoreUint64(&c.lastCommitTs, commitTs)
err := c.memoryQuota.consumeWithBlocking(size, blockCallBack)
err := c.memoryQuota.consumeWithBlocking(size, callBack)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -82,13 +100,7 @@ func (c *TableFlowController) Consume(commitTs uint64, size uint64, blockCallBac
}
}

c.queueMu.Lock()
defer c.queueMu.Unlock()
c.queueMu.queue.PushBack(&commitTsSizeEntry{
commitTs: commitTs,
size: size,
})

c.enqueueSingleMsg(msg, size)
return nil
}

Expand All @@ -98,7 +110,7 @@ func (c *TableFlowController) Release(resolvedTs uint64) {

c.queueMu.Lock()
for c.queueMu.queue.Len() > 0 {
if peeked := c.queueMu.queue.Front().(*commitTsSizeEntry); peeked.commitTs <= resolvedTs {
if peeked := c.queueMu.queue.Front().(*txnSizeEntry); peeked.commitTs <= resolvedTs {
nBytesToRelease += peeked.size
c.queueMu.queue.PopFront()
} else {
Expand All @@ -110,6 +122,62 @@ func (c *TableFlowController) Release(resolvedTs uint64) {
c.memoryQuota.release(nBytesToRelease)
}

// Note that msgs received by enqueueSingleMsg must be sorted by commitTs_startTs order.
func (c *TableFlowController) enqueueSingleMsg(msg *model.PolymorphicEvent, size uint64) {
commitTs := msg.CRTs
lastCommitTs := atomic.LoadUint64(&c.lastCommitTs)

c.queueMu.Lock()
defer c.queueMu.Unlock()

var e deque.Elem
// 1. Processing a new txn with different commitTs.
if e = c.queueMu.queue.Back(); e == nil || lastCommitTs < commitTs {
atomic.StoreUint64(&c.lastCommitTs, commitTs)
c.queueMu.queue.PushBack(&txnSizeEntry{
startTs: msg.StartTs,
commitTs: commitTs,
size: size,
rowCount: 1,
})
c.batchGroupCount = 1
msg.Row.SplitTxn = true
return
}

// Processing txns with the same commitTs.
txnEntry := e.(*txnSizeEntry)
if txnEntry.commitTs != lastCommitTs {
log.Panic("got wrong commitTs from deque, report a bug",
zap.Uint64("lastCommitTs", c.lastCommitTs),
zap.Uint64("commitTsInDeque", txnEntry.commitTs))
}

// 2. Append row to current txn entry.
if txnEntry.startTs == msg.Row.StartTs &&
CharlesCheung96 marked this conversation as resolved.
Show resolved Hide resolved
txnEntry.rowCount < maxRowsPerTxn && txnEntry.size < maxSizePerTxn {
txnEntry.size += size
txnEntry.rowCount++
return
}

// 3. Split the txn or handle a new txn with the same commitTs.
c.queueMu.queue.PushBack(&txnSizeEntry{
startTs: msg.StartTs,
commitTs: commitTs,
size: size,
rowCount: 1,
})
c.batchGroupCount++
msg.Row.SplitTxn = true

if c.batchGroupCount >= batchSize {
c.batchGroupCount = 0
// TODO(CharlesCheung): add batch resolve mechanism to mitigate oom problem
log.Debug("emit batch resolve event throw callback")
}
}

// Abort interrupts any ongoing Consume call
func (c *TableFlowController) Abort() {
c.memoryQuota.abort()
Expand Down
Loading