From 57ee098da490008dcf578002a9d351d8d5e17c1d Mon Sep 17 00:00:00 2001 From: CharlesCheung96 Date: Thu, 12 May 2022 14:52:05 +0800 Subject: [PATCH 1/5] mysql sink support split txn --- cdc/model/mounter.go | 12 ++- cdc/model/sink.go | 3 + cdc/processor/pipeline/sink_test.go | 7 +- cdc/processor/pipeline/sorter.go | 6 +- cdc/processor/pipeline/table.go | 3 +- cdc/sink/flowcontrol/flow_control.go | 89 +++++++++++++++++++--- cdc/sink/flowcontrol/flow_control_test.go | 66 ++++++++-------- cdc/sink/flowcontrol/table_memory_quota.go | 6 +- cdc/sink/mysql/txn_cache.go | 2 +- 9 files changed, 142 insertions(+), 52 deletions(-) diff --git a/cdc/model/mounter.go b/cdc/model/mounter.go index ab2a3d3bec4..60e602c2ed2 100644 --- a/cdc/model/mounter.go +++ b/cdc/model/mounter.go @@ -13,7 +13,7 @@ package model -// PolymorphicEvent describes an event can be in multiple states +// PolymorphicEvent describes an event can be in multiple states. type PolymorphicEvent struct { StartTs uint64 // Commit or resolved TS @@ -23,7 +23,15 @@ type PolymorphicEvent struct { Row *RowChangedEvent } -// NewPolymorphicEvent creates a new PolymorphicEvent with a raw KV +// NewEmptyPolymorphicEvent creates a new empty PolymorphicEvent. +func NewEmptyPolymorphicEvent() *PolymorphicEvent { + return &PolymorphicEvent{ + RawKV: &RawKVEntry{}, + Row: &RowChangedEvent{}, + } +} + +// NewPolymorphicEvent creates a new PolymorphicEvent with a raw KV. func NewPolymorphicEvent(rawKV *RawKVEntry) *PolymorphicEvent { if rawKV.OpType == OpTypeResolved { return NewResolvedPolymorphicEvent(rawKV.RegionID, rawKV.CRTs) diff --git a/cdc/model/sink.go b/cdc/model/sink.go index 8c0ca50cecf..1ba8511fdb4 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -267,6 +267,9 @@ type RowChangedEvent struct { // ApproximateDataSize is the approximate size of protobuf binary // representation of this event. ApproximateDataSize int64 `json:"-" msg:"-"` + + // SplitTxn marks this RowChangedEvent as the first line of a new batch + SplitTxn bool `json:"-" msg:"-"` } // IsDelete returns true if the row is a delete event diff --git a/cdc/processor/pipeline/sink_test.go b/cdc/processor/pipeline/sink_test.go index a3fca75f38e..83f60083fb7 100644 --- a/cdc/processor/pipeline/sink_test.go +++ b/cdc/processor/pipeline/sink_test.go @@ -40,7 +40,12 @@ type mockSink struct { // we are testing sinkNode by itself. type mockFlowController struct{} -func (c *mockFlowController) Consume(commitTs uint64, size uint64, blockCallBack func() error) error { +func (c *mockFlowController) Consume( + msg *model.PolymorphicEvent, + commitTs uint64, + size uint64, + blockCallBack func(bool) error, +) error { return nil } diff --git a/cdc/processor/pipeline/sorter.go b/cdc/processor/pipeline/sorter.go index a503338f094..3b9772858d1 100644 --- a/cdc/processor/pipeline/sorter.go +++ b/cdc/processor/pipeline/sorter.go @@ -206,8 +206,10 @@ func (n *sorterNode) start( size := uint64(msg.Row.ApproximateBytes()) // NOTE we allow the quota to be exceeded if blocking means interrupting a transaction. // Otherwise the pipeline would deadlock. - err = n.flowController.Consume(commitTs, size, func() error { - if lastCRTs > lastSentResolvedTs { + err = n.flowController.Consume(msg, commitTs, size, func(batch bool) error { + if batch { + panic("cdc does not support the batch resolve mechanism at this time") + } else if lastCRTs > lastSentResolvedTs { // If we are blocking, we send a Resolved Event here to elicit a sink-flush. // Not sending a Resolved Event here will very likely deadlock the pipeline. lastSentResolvedTs = lastCRTs diff --git a/cdc/processor/pipeline/table.go b/cdc/processor/pipeline/table.go index 8c015656d7a..2644fcd75ff 100644 --- a/cdc/processor/pipeline/table.go +++ b/cdc/processor/pipeline/table.go @@ -78,7 +78,8 @@ type tablePipelineImpl struct { // TODO find a better name or avoid using an interface // We use an interface here for ease in unit testing. type tableFlowController interface { - Consume(commitTs uint64, size uint64, blockCallBack func() error) error + Consume(msg *model.PolymorphicEvent, commitTs uint64, + size uint64, blockCallBack func(batch bool) error) error Release(resolvedTs uint64) Abort() GetConsumption() uint64 diff --git a/cdc/sink/flowcontrol/flow_control.go b/cdc/sink/flowcontrol/flow_control.go index c6099fa6557..a4fef32d672 100644 --- a/cdc/sink/flowcontrol/flow_control.go +++ b/cdc/sink/flowcontrol/flow_control.go @@ -20,9 +20,16 @@ import ( "github.com/edwingeng/deque" "github.com/pingcap/errors" "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/model" "go.uber.org/zap" ) +const ( + maxRowsPerTxn = 1024 + maxSizePerTxn = 1024 * 1024 /* 1MB */ + batchSize = 100 +) + // TableFlowController provides a convenient interface to control the memory consumption of a per table event stream type TableFlowController struct { memoryQuota *tableMemoryQuota @@ -31,13 +38,17 @@ type TableFlowController struct { sync.Mutex queue deque.Deque } + txnsWithSameCommitTs uint lastCommitTs uint64 } -type commitTsSizeEntry struct { +type txnSizeEntry struct { + // txn id + startTs uint64 commitTs uint64 size uint64 + rowCount uint64 } // NewTableFlowController creates a new TableFlowController @@ -55,7 +66,12 @@ func NewTableFlowController(quota uint64) *TableFlowController { // Consume is called when an event has arrived for being processed by the sink. // It will handle transaction boundaries automatically, and will not block intra-transaction. -func (c *TableFlowController) Consume(commitTs uint64, size uint64, blockCallBack func() error) error { +func (c *TableFlowController) Consume( + msg *model.PolymorphicEvent, + commitTs uint64, + size uint64, + callBack func(batch bool) error, +) error { lastCommitTs := atomic.LoadUint64(&c.lastCommitTs) if commitTs < lastCommitTs { @@ -65,8 +81,7 @@ func (c *TableFlowController) Consume(commitTs uint64, size uint64, blockCallBac } if commitTs > lastCommitTs { - atomic.StoreUint64(&c.lastCommitTs, commitTs) - err := c.memoryQuota.consumeWithBlocking(size, blockCallBack) + err := c.memoryQuota.consumeWithBlocking(size, callBack) if err != nil { return errors.Trace(err) } @@ -82,13 +97,7 @@ func (c *TableFlowController) Consume(commitTs uint64, size uint64, blockCallBac } } - c.queueMu.Lock() - defer c.queueMu.Unlock() - c.queueMu.queue.PushBack(&commitTsSizeEntry{ - commitTs: commitTs, - size: size, - }) - + c.enqueueSingleMsg(msg, commitTs, size) return nil } @@ -98,7 +107,7 @@ func (c *TableFlowController) Release(resolvedTs uint64) { c.queueMu.Lock() for c.queueMu.queue.Len() > 0 { - if peeked := c.queueMu.queue.Front().(*commitTsSizeEntry); peeked.commitTs <= resolvedTs { + if peeked := c.queueMu.queue.Front().(*txnSizeEntry); peeked.commitTs <= resolvedTs { nBytesToRelease += peeked.size c.queueMu.queue.PopFront() } else { @@ -110,6 +119,62 @@ func (c *TableFlowController) Release(resolvedTs uint64) { c.memoryQuota.release(nBytesToRelease) } +func (c *TableFlowController) enqueueSingleMsg( + msg *model.PolymorphicEvent, + commitTs uint64, + size uint64, +) { + lastCommitTs := atomic.LoadUint64(&c.lastCommitTs) + + c.queueMu.Lock() + defer c.queueMu.Unlock() + + var e deque.Elem + // 1. Processing a new transaction. + if e = c.queueMu.queue.Back(); e == nil || lastCommitTs < commitTs { + c.queueMu.queue.PushBack(&txnSizeEntry{ + startTs: msg.StartTs, + commitTs: commitTs, + size: size, + rowCount: 1, + }) + c.txnsWithSameCommitTs = 1 + atomic.StoreUint64(&c.lastCommitTs, commitTs) + return + } + + // Processing txns with the same commitTs. + txnEntry := e.(*txnSizeEntry) + if txnEntry.commitTs != lastCommitTs { + log.Panic("got wrong commitTs from deque, report a bug", + zap.Uint64("lastCommitTs", c.lastCommitTs), + zap.Uint64("commitTsInDeque", txnEntry.commitTs)) + } + + // 2. Append row to current transaction entry. + if txnEntry.startTs == msg.Row.StartTs && + txnEntry.rowCount < maxRowsPerTxn && txnEntry.size < maxSizePerTxn { + txnEntry.size += size + txnEntry.rowCount++ + return + } + + // 3. Split the txn or handle a new txn with the same commitTs. + c.queueMu.queue.PushBack(&txnSizeEntry{ + startTs: msg.StartTs, + commitTs: commitTs, + size: size, + rowCount: 1, + }) + c.txnsWithSameCommitTs++ + // mark the first data of new txnSizeEntry + msg.Row.SplitTxn = true + if c.txnsWithSameCommitTs >= batchSize { + // TODO(CharlesCheung): add batch resolve mechanism to mitigate oom problem + log.Debug("emit batch resolve event") + } +} + // Abort interrupts any ongoing Consume call func (c *TableFlowController) Abort() { c.memoryQuota.abort() diff --git a/cdc/sink/flowcontrol/flow_control_test.go b/cdc/sink/flowcontrol/flow_control_test.go index 24f639fdf8a..e9d0180c590 100644 --- a/cdc/sink/flowcontrol/flow_control_test.go +++ b/cdc/sink/flowcontrol/flow_control_test.go @@ -21,11 +21,12 @@ import ( "testing" "time" + "github.com/pingcap/tiflow/cdc/model" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" ) -func dummyCallBack() error { +func dummyCallBack(_ bool) error { return nil } @@ -34,7 +35,7 @@ type mockCallBacker struct { injectedErr error } -func (c *mockCallBacker) cb() error { +func (c *mockCallBacker) cb(_ bool) error { c.timesCalled += 1 return c.injectedErr } @@ -173,7 +174,7 @@ func TestFlowControlBasic(t *testing.T) { ctx, cancel := context.WithTimeout(context.TODO(), time.Second*5) defer cancel() errg, ctx := errgroup.WithContext(ctx) - mockedRowsCh := make(chan *commitTsSizeEntry, 1024) + mockedRowsCh := make(chan *txnSizeEntry, 1024) flowController := NewTableFlowController(2048) errg.Go(func() error { @@ -186,7 +187,7 @@ func TestFlowControlBasic(t *testing.T) { select { case <-ctx.Done(): return ctx.Err() - case mockedRowsCh <- &commitTsSizeEntry{ + case mockedRowsCh <- &txnSizeEntry{ commitTs: lastCommitTs, size: size, }: @@ -202,7 +203,7 @@ func TestFlowControlBasic(t *testing.T) { defer close(eventCh) resolvedTs := uint64(0) for { - var mockedRow *commitTsSizeEntry + var mockedRow *txnSizeEntry select { case <-ctx.Done(): return ctx.Err() @@ -227,7 +228,8 @@ func TestFlowControlBasic(t *testing.T) { resolvedTs = mockedRow.commitTs updatedResolvedTs = true } - err := flowController.Consume(mockedRow.commitTs, mockedRow.size, dummyCallBack) + err := flowController.Consume(model.NewEmptyPolymorphicEvent(), + mockedRow.commitTs, mockedRow.size, dummyCallBack) require.Nil(t, err) select { case <-ctx.Done(): @@ -290,13 +292,13 @@ func TestFlowControlAbort(t *testing.T) { go func() { defer wg.Done() - err := controller.Consume(1, 1000, callBacker.cb) + err := controller.Consume(model.NewEmptyPolymorphicEvent(), 1, 1000, callBacker.cb) require.Nil(t, err) require.Equal(t, 0, callBacker.timesCalled) - err = controller.Consume(2, 1000, callBacker.cb) + err = controller.Consume(model.NewEmptyPolymorphicEvent(), 2, 1000, callBacker.cb) require.Regexp(t, ".*ErrFlowControllerAborted.*", err) require.Equal(t, 1, callBacker.timesCalled) - err = controller.Consume(2, 10, callBacker.cb) + err = controller.Consume(model.NewEmptyPolymorphicEvent(), 2, 10, callBacker.cb) require.Regexp(t, ".*ErrFlowControllerAborted.*", err) require.Equal(t, 1, callBacker.timesCalled) }() @@ -314,7 +316,7 @@ func TestFlowControlCallBack(t *testing.T) { ctx, cancel := context.WithTimeout(context.TODO(), time.Second*5) defer cancel() errg, ctx := errgroup.WithContext(ctx) - mockedRowsCh := make(chan *commitTsSizeEntry, 1024) + mockedRowsCh := make(chan *txnSizeEntry, 1024) flowController := NewTableFlowController(512) errg.Go(func() error { @@ -327,7 +329,7 @@ func TestFlowControlCallBack(t *testing.T) { select { case <-ctx.Done(): return ctx.Err() - case mockedRowsCh <- &commitTsSizeEntry{ + case mockedRowsCh <- &txnSizeEntry{ commitTs: lastCommitTs, size: size, }: @@ -343,7 +345,7 @@ func TestFlowControlCallBack(t *testing.T) { defer close(eventCh) lastCRTs := uint64(0) for { - var mockedRow *commitTsSizeEntry + var mockedRow *txnSizeEntry select { case <-ctx.Done(): return ctx.Err() @@ -355,16 +357,17 @@ func TestFlowControlCallBack(t *testing.T) { } atomic.AddUint64(&consumedBytes, mockedRow.size) - err := flowController.Consume(mockedRow.commitTs, mockedRow.size, func() error { - select { - case <-ctx.Done(): - return ctx.Err() - case eventCh <- &mockedEvent{ - resolvedTs: lastCRTs, - }: - } - return nil - }) + err := flowController.Consume(model.NewEmptyPolymorphicEvent(), + mockedRow.commitTs, mockedRow.size, func(bool) error { + select { + case <-ctx.Done(): + return ctx.Err() + case eventCh <- &mockedEvent{ + resolvedTs: lastCRTs, + }: + } + return nil + }) require.Nil(t, err) lastCRTs = mockedRow.commitTs @@ -426,7 +429,7 @@ func TestFlowControlCallBackNotBlockingRelease(t *testing.T) { go func() { defer wg.Done() - err := controller.Consume(1, 511, func() error { + err := controller.Consume(model.NewEmptyPolymorphicEvent(), 1, 511, func(bool) error { t.Error("unreachable") return nil }) @@ -443,7 +446,7 @@ func TestFlowControlCallBackNotBlockingRelease(t *testing.T) { cancel() }() - err = controller.Consume(2, 511, func() error { + err = controller.Consume(model.NewEmptyPolymorphicEvent(), 2, 511, func(bool) error { atomic.StoreInt32(&isBlocked, 1) <-ctx.Done() atomic.StoreInt32(&isBlocked, 0) @@ -468,12 +471,12 @@ func TestFlowControlCallBackError(t *testing.T) { go func() { defer wg.Done() - err := controller.Consume(1, 511, func() error { + err := controller.Consume(model.NewEmptyPolymorphicEvent(), 1, 511, func(bool) error { t.Error("unreachable") return nil }) require.Nil(t, err) - err = controller.Consume(2, 511, func() error { + err = controller.Consume(model.NewEmptyPolymorphicEvent(), 2, 511, func(bool) error { <-ctx.Done() return ctx.Err() }) @@ -490,7 +493,7 @@ func TestFlowControlConsumeLargerThanQuota(t *testing.T) { t.Parallel() controller := NewTableFlowController(1024) - err := controller.Consume(1, 2048, func() error { + err := controller.Consume(model.NewEmptyPolymorphicEvent(), 1, 2048, func(bool) error { t.Error("unreachable") return nil }) @@ -501,7 +504,7 @@ func BenchmarkTableFlowController(B *testing.B) { ctx, cancel := context.WithTimeout(context.TODO(), time.Second*5) defer cancel() errg, ctx := errgroup.WithContext(ctx) - mockedRowsCh := make(chan *commitTsSizeEntry, 102400) + mockedRowsCh := make(chan *txnSizeEntry, 102400) flowController := NewTableFlowController(20 * 1024 * 1024) // 20M errg.Go(func() error { @@ -514,7 +517,7 @@ func BenchmarkTableFlowController(B *testing.B) { select { case <-ctx.Done(): return ctx.Err() - case mockedRowsCh <- &commitTsSizeEntry{ + case mockedRowsCh <- &txnSizeEntry{ commitTs: lastCommitTs, size: size, }: @@ -530,7 +533,7 @@ func BenchmarkTableFlowController(B *testing.B) { defer close(eventCh) resolvedTs := uint64(0) for { - var mockedRow *commitTsSizeEntry + var mockedRow *txnSizeEntry select { case <-ctx.Done(): return ctx.Err() @@ -551,7 +554,8 @@ func BenchmarkTableFlowController(B *testing.B) { } resolvedTs = mockedRow.commitTs } - err := flowController.Consume(mockedRow.commitTs, mockedRow.size, dummyCallBack) + err := flowController.Consume(model.NewEmptyPolymorphicEvent(), + mockedRow.commitTs, mockedRow.size, dummyCallBack) if err != nil { B.Fatal(err) } diff --git a/cdc/sink/flowcontrol/table_memory_quota.go b/cdc/sink/flowcontrol/table_memory_quota.go index 7ca15e7857f..c563ba4f333 100644 --- a/cdc/sink/flowcontrol/table_memory_quota.go +++ b/cdc/sink/flowcontrol/table_memory_quota.go @@ -54,7 +54,9 @@ func newTableMemoryQuota(quota uint64) *tableMemoryQuota { // block until enough memory has been freed up by release. // blockCallBack will be called if the function will block. // Should be used with care to prevent deadlock. -func (c *tableMemoryQuota) consumeWithBlocking(nBytes uint64, blockCallBack func() error) error { +func (c *tableMemoryQuota) consumeWithBlocking( + nBytes uint64, blockCallBack func(bool) error, +) error { if nBytes >= c.quota { return cerrors.ErrFlowControllerEventLargerThanQuota.GenWithStackByArgs(nBytes, c.quota) } @@ -62,7 +64,7 @@ func (c *tableMemoryQuota) consumeWithBlocking(nBytes uint64, blockCallBack func c.consumed.Lock() if c.consumed.bytes+nBytes >= c.quota { c.consumed.Unlock() - err := blockCallBack() + err := blockCallBack(false) if err != nil { return errors.Trace(err) } diff --git a/cdc/sink/mysql/txn_cache.go b/cdc/sink/mysql/txn_cache.go index 2aa02ec299b..a800313996b 100644 --- a/cdc/sink/mysql/txn_cache.go +++ b/cdc/sink/mysql/txn_cache.go @@ -36,7 +36,7 @@ func (t *txnsWithTheSameCommitTs) Append(row *model.RowChangedEvent) { } var txn *model.SingleTableTxn - if len(t.txns) == 0 || t.txns[len(t.txns)-1].StartTs < row.StartTs { + if len(t.txns) == 0 || t.txns[len(t.txns)-1].StartTs < row.StartTs || row.SplitTxn { txn = &model.SingleTableTxn{ StartTs: row.StartTs, CommitTs: row.CommitTs, From 5b748dfc94ba4d60096c9c00a7bac4630d3478ab Mon Sep 17 00:00:00 2001 From: CharlesCheung96 Date: Thu, 12 May 2022 15:44:11 +0800 Subject: [PATCH 2/5] remove dummy commitTs --- cdc/model/mounter.go | 3 ++- cdc/processor/pipeline/sink_test.go | 1 - cdc/processor/pipeline/sorter.go | 2 +- cdc/processor/pipeline/table.go | 3 +-- cdc/sink/flowcontrol/flow_control.go | 13 ++++------- cdc/sink/flowcontrol/flow_control_test.go | 28 +++++++++++------------ 6 files changed, 23 insertions(+), 27 deletions(-) diff --git a/cdc/model/mounter.go b/cdc/model/mounter.go index 60e602c2ed2..9998d6eb5d2 100644 --- a/cdc/model/mounter.go +++ b/cdc/model/mounter.go @@ -24,8 +24,9 @@ type PolymorphicEvent struct { } // NewEmptyPolymorphicEvent creates a new empty PolymorphicEvent. -func NewEmptyPolymorphicEvent() *PolymorphicEvent { +func NewEmptyPolymorphicEvent(ts uint64) *PolymorphicEvent { return &PolymorphicEvent{ + CRTs: ts, RawKV: &RawKVEntry{}, Row: &RowChangedEvent{}, } diff --git a/cdc/processor/pipeline/sink_test.go b/cdc/processor/pipeline/sink_test.go index 83f60083fb7..f04a1906086 100644 --- a/cdc/processor/pipeline/sink_test.go +++ b/cdc/processor/pipeline/sink_test.go @@ -42,7 +42,6 @@ type mockFlowController struct{} func (c *mockFlowController) Consume( msg *model.PolymorphicEvent, - commitTs uint64, size uint64, blockCallBack func(bool) error, ) error { diff --git a/cdc/processor/pipeline/sorter.go b/cdc/processor/pipeline/sorter.go index 3b9772858d1..f886e7714af 100644 --- a/cdc/processor/pipeline/sorter.go +++ b/cdc/processor/pipeline/sorter.go @@ -206,7 +206,7 @@ func (n *sorterNode) start( size := uint64(msg.Row.ApproximateBytes()) // NOTE we allow the quota to be exceeded if blocking means interrupting a transaction. // Otherwise the pipeline would deadlock. - err = n.flowController.Consume(msg, commitTs, size, func(batch bool) error { + err = n.flowController.Consume(msg, size, func(batch bool) error { if batch { panic("cdc does not support the batch resolve mechanism at this time") } else if lastCRTs > lastSentResolvedTs { diff --git a/cdc/processor/pipeline/table.go b/cdc/processor/pipeline/table.go index 2644fcd75ff..556c08ee1b3 100644 --- a/cdc/processor/pipeline/table.go +++ b/cdc/processor/pipeline/table.go @@ -78,8 +78,7 @@ type tablePipelineImpl struct { // TODO find a better name or avoid using an interface // We use an interface here for ease in unit testing. type tableFlowController interface { - Consume(msg *model.PolymorphicEvent, commitTs uint64, - size uint64, blockCallBack func(batch bool) error) error + Consume(msg *model.PolymorphicEvent, size uint64, blockCallBack func(batch bool) error) error Release(resolvedTs uint64) Abort() GetConsumption() uint64 diff --git a/cdc/sink/flowcontrol/flow_control.go b/cdc/sink/flowcontrol/flow_control.go index a4fef32d672..53dfd03287e 100644 --- a/cdc/sink/flowcontrol/flow_control.go +++ b/cdc/sink/flowcontrol/flow_control.go @@ -68,10 +68,10 @@ func NewTableFlowController(quota uint64) *TableFlowController { // It will handle transaction boundaries automatically, and will not block intra-transaction. func (c *TableFlowController) Consume( msg *model.PolymorphicEvent, - commitTs uint64, size uint64, callBack func(batch bool) error, ) error { + commitTs := msg.CRTs lastCommitTs := atomic.LoadUint64(&c.lastCommitTs) if commitTs < lastCommitTs { @@ -97,7 +97,7 @@ func (c *TableFlowController) Consume( } } - c.enqueueSingleMsg(msg, commitTs, size) + c.enqueueSingleMsg(msg, size) return nil } @@ -119,11 +119,8 @@ func (c *TableFlowController) Release(resolvedTs uint64) { c.memoryQuota.release(nBytesToRelease) } -func (c *TableFlowController) enqueueSingleMsg( - msg *model.PolymorphicEvent, - commitTs uint64, - size uint64, -) { +func (c *TableFlowController) enqueueSingleMsg(msg *model.PolymorphicEvent, size uint64) { + commitTs := msg.CRTs lastCommitTs := atomic.LoadUint64(&c.lastCommitTs) c.queueMu.Lock() @@ -171,7 +168,7 @@ func (c *TableFlowController) enqueueSingleMsg( msg.Row.SplitTxn = true if c.txnsWithSameCommitTs >= batchSize { // TODO(CharlesCheung): add batch resolve mechanism to mitigate oom problem - log.Debug("emit batch resolve event") + log.Debug("emit batch resolve event throw callback") } } diff --git a/cdc/sink/flowcontrol/flow_control_test.go b/cdc/sink/flowcontrol/flow_control_test.go index e9d0180c590..6836299e4a4 100644 --- a/cdc/sink/flowcontrol/flow_control_test.go +++ b/cdc/sink/flowcontrol/flow_control_test.go @@ -228,8 +228,8 @@ func TestFlowControlBasic(t *testing.T) { resolvedTs = mockedRow.commitTs updatedResolvedTs = true } - err := flowController.Consume(model.NewEmptyPolymorphicEvent(), - mockedRow.commitTs, mockedRow.size, dummyCallBack) + err := flowController.Consume(model.NewEmptyPolymorphicEvent(mockedRow.commitTs), + mockedRow.size, dummyCallBack) require.Nil(t, err) select { case <-ctx.Done(): @@ -292,13 +292,13 @@ func TestFlowControlAbort(t *testing.T) { go func() { defer wg.Done() - err := controller.Consume(model.NewEmptyPolymorphicEvent(), 1, 1000, callBacker.cb) + err := controller.Consume(model.NewEmptyPolymorphicEvent(1), 1000, callBacker.cb) require.Nil(t, err) require.Equal(t, 0, callBacker.timesCalled) - err = controller.Consume(model.NewEmptyPolymorphicEvent(), 2, 1000, callBacker.cb) + err = controller.Consume(model.NewEmptyPolymorphicEvent(2), 1000, callBacker.cb) require.Regexp(t, ".*ErrFlowControllerAborted.*", err) require.Equal(t, 1, callBacker.timesCalled) - err = controller.Consume(model.NewEmptyPolymorphicEvent(), 2, 10, callBacker.cb) + err = controller.Consume(model.NewEmptyPolymorphicEvent(2), 10, callBacker.cb) require.Regexp(t, ".*ErrFlowControllerAborted.*", err) require.Equal(t, 1, callBacker.timesCalled) }() @@ -357,8 +357,8 @@ func TestFlowControlCallBack(t *testing.T) { } atomic.AddUint64(&consumedBytes, mockedRow.size) - err := flowController.Consume(model.NewEmptyPolymorphicEvent(), - mockedRow.commitTs, mockedRow.size, func(bool) error { + err := flowController.Consume(model.NewEmptyPolymorphicEvent(mockedRow.commitTs), + mockedRow.size, func(bool) error { select { case <-ctx.Done(): return ctx.Err() @@ -429,7 +429,7 @@ func TestFlowControlCallBackNotBlockingRelease(t *testing.T) { go func() { defer wg.Done() - err := controller.Consume(model.NewEmptyPolymorphicEvent(), 1, 511, func(bool) error { + err := controller.Consume(model.NewEmptyPolymorphicEvent(1), 511, func(bool) error { t.Error("unreachable") return nil }) @@ -446,7 +446,7 @@ func TestFlowControlCallBackNotBlockingRelease(t *testing.T) { cancel() }() - err = controller.Consume(model.NewEmptyPolymorphicEvent(), 2, 511, func(bool) error { + err = controller.Consume(model.NewEmptyPolymorphicEvent(2), 511, func(bool) error { atomic.StoreInt32(&isBlocked, 1) <-ctx.Done() atomic.StoreInt32(&isBlocked, 0) @@ -471,12 +471,12 @@ func TestFlowControlCallBackError(t *testing.T) { go func() { defer wg.Done() - err := controller.Consume(model.NewEmptyPolymorphicEvent(), 1, 511, func(bool) error { + err := controller.Consume(model.NewEmptyPolymorphicEvent(1), 511, func(bool) error { t.Error("unreachable") return nil }) require.Nil(t, err) - err = controller.Consume(model.NewEmptyPolymorphicEvent(), 2, 511, func(bool) error { + err = controller.Consume(model.NewEmptyPolymorphicEvent(2), 511, func(bool) error { <-ctx.Done() return ctx.Err() }) @@ -493,7 +493,7 @@ func TestFlowControlConsumeLargerThanQuota(t *testing.T) { t.Parallel() controller := NewTableFlowController(1024) - err := controller.Consume(model.NewEmptyPolymorphicEvent(), 1, 2048, func(bool) error { + err := controller.Consume(model.NewEmptyPolymorphicEvent(1), 2048, func(bool) error { t.Error("unreachable") return nil }) @@ -554,8 +554,8 @@ func BenchmarkTableFlowController(B *testing.B) { } resolvedTs = mockedRow.commitTs } - err := flowController.Consume(model.NewEmptyPolymorphicEvent(), - mockedRow.commitTs, mockedRow.size, dummyCallBack) + err := flowController.Consume(model.NewEmptyPolymorphicEvent(mockedRow.commitTs), + mockedRow.size, dummyCallBack) if err != nil { B.Fatal(err) } From bc42747fb64ba0e429203dcb24e61247e2206343 Mon Sep 17 00:00:00 2001 From: CharlesCheung <61726649+CharlesCheung96@users.noreply.github.com> Date: Fri, 13 May 2022 22:14:55 +0800 Subject: [PATCH 3/5] address comment --- cdc/processor/pipeline/sorter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdc/processor/pipeline/sorter.go b/cdc/processor/pipeline/sorter.go index f886e7714af..f9cb796bcdb 100644 --- a/cdc/processor/pipeline/sorter.go +++ b/cdc/processor/pipeline/sorter.go @@ -208,7 +208,7 @@ func (n *sorterNode) start( // Otherwise the pipeline would deadlock. err = n.flowController.Consume(msg, size, func(batch bool) error { if batch { - panic("cdc does not support the batch resolve mechanism at this time") + log.Panic("cdc does not support the batch resolve mechanism at this time") } else if lastCRTs > lastSentResolvedTs { // If we are blocking, we send a Resolved Event here to elicit a sink-flush. // Not sending a Resolved Event here will very likely deadlock the pipeline. From 71500f3102fdf8210d7091711acd16624de78043 Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Mon, 16 May 2022 11:08:07 +0800 Subject: [PATCH 4/5] add batchGroupCount --- cdc/sink/flowcontrol/flow_control.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/cdc/sink/flowcontrol/flow_control.go b/cdc/sink/flowcontrol/flow_control.go index 53dfd03287e..8a707b9d0fc 100644 --- a/cdc/sink/flowcontrol/flow_control.go +++ b/cdc/sink/flowcontrol/flow_control.go @@ -38,7 +38,10 @@ type TableFlowController struct { sync.Mutex queue deque.Deque } - txnsWithSameCommitTs uint + // batchGroupCount is the number of txnSizeEntries with same commitTs, which could be: + // 1. Different txns with same commitTs but different startTs + // 2. TxnSizeEntry split from the same txns which exceeds max rows or max size + batchGroupCount uint lastCommitTs uint64 } @@ -135,7 +138,7 @@ func (c *TableFlowController) enqueueSingleMsg(msg *model.PolymorphicEvent, size size: size, rowCount: 1, }) - c.txnsWithSameCommitTs = 1 + c.batchGroupCount = 1 atomic.StoreUint64(&c.lastCommitTs, commitTs) return } @@ -163,10 +166,11 @@ func (c *TableFlowController) enqueueSingleMsg(msg *model.PolymorphicEvent, size size: size, rowCount: 1, }) - c.txnsWithSameCommitTs++ + c.batchGroupCount++ // mark the first data of new txnSizeEntry msg.Row.SplitTxn = true - if c.txnsWithSameCommitTs >= batchSize { + if c.batchGroupCount >= batchSize { + c.batchGroupCount = 0 // TODO(CharlesCheung): add batch resolve mechanism to mitigate oom problem log.Debug("emit batch resolve event throw callback") } From e7fae3991514957463548d77a9b7e49fb431b7a7 Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Mon, 16 May 2022 16:46:48 +0800 Subject: [PATCH 5/5] add some comments --- cdc/model/sink.go | 2 +- cdc/sink/flowcontrol/flow_control.go | 10 ++++++---- cdc/sink/mysql/txn_cache.go | 2 +- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/cdc/model/sink.go b/cdc/model/sink.go index ba00c72aaf7..eaa3740e505 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -268,7 +268,7 @@ type RowChangedEvent struct { // representation of this event. ApproximateDataSize int64 `json:"-" msg:"-"` - // SplitTxn marks this RowChangedEvent as the first line of a new batch + // SplitTxn marks this RowChangedEvent as the first line of a new txn. SplitTxn bool `json:"-" msg:"-"` } diff --git a/cdc/sink/flowcontrol/flow_control.go b/cdc/sink/flowcontrol/flow_control.go index 8a707b9d0fc..1e97ed6390e 100644 --- a/cdc/sink/flowcontrol/flow_control.go +++ b/cdc/sink/flowcontrol/flow_control.go @@ -122,6 +122,7 @@ func (c *TableFlowController) Release(resolvedTs uint64) { c.memoryQuota.release(nBytesToRelease) } +// Note that msgs received by enqueueSingleMsg must be sorted by commitTs_startTs order. func (c *TableFlowController) enqueueSingleMsg(msg *model.PolymorphicEvent, size uint64) { commitTs := msg.CRTs lastCommitTs := atomic.LoadUint64(&c.lastCommitTs) @@ -130,8 +131,9 @@ func (c *TableFlowController) enqueueSingleMsg(msg *model.PolymorphicEvent, size defer c.queueMu.Unlock() var e deque.Elem - // 1. Processing a new transaction. + // 1. Processing a new txn with different commitTs. if e = c.queueMu.queue.Back(); e == nil || lastCommitTs < commitTs { + atomic.StoreUint64(&c.lastCommitTs, commitTs) c.queueMu.queue.PushBack(&txnSizeEntry{ startTs: msg.StartTs, commitTs: commitTs, @@ -139,7 +141,7 @@ func (c *TableFlowController) enqueueSingleMsg(msg *model.PolymorphicEvent, size rowCount: 1, }) c.batchGroupCount = 1 - atomic.StoreUint64(&c.lastCommitTs, commitTs) + msg.Row.SplitTxn = true return } @@ -151,7 +153,7 @@ func (c *TableFlowController) enqueueSingleMsg(msg *model.PolymorphicEvent, size zap.Uint64("commitTsInDeque", txnEntry.commitTs)) } - // 2. Append row to current transaction entry. + // 2. Append row to current txn entry. if txnEntry.startTs == msg.Row.StartTs && txnEntry.rowCount < maxRowsPerTxn && txnEntry.size < maxSizePerTxn { txnEntry.size += size @@ -167,8 +169,8 @@ func (c *TableFlowController) enqueueSingleMsg(msg *model.PolymorphicEvent, size rowCount: 1, }) c.batchGroupCount++ - // mark the first data of new txnSizeEntry msg.Row.SplitTxn = true + if c.batchGroupCount >= batchSize { c.batchGroupCount = 0 // TODO(CharlesCheung): add batch resolve mechanism to mitigate oom problem diff --git a/cdc/sink/mysql/txn_cache.go b/cdc/sink/mysql/txn_cache.go index a800313996b..721e1dc76ce 100644 --- a/cdc/sink/mysql/txn_cache.go +++ b/cdc/sink/mysql/txn_cache.go @@ -36,7 +36,7 @@ func (t *txnsWithTheSameCommitTs) Append(row *model.RowChangedEvent) { } var txn *model.SingleTableTxn - if len(t.txns) == 0 || t.txns[len(t.txns)-1].StartTs < row.StartTs || row.SplitTxn { + if len(t.txns) == 0 || row.SplitTxn || t.txns[len(t.txns)-1].StartTs < row.StartTs { txn = &model.SingleTableTxn{ StartTs: row.StartTs, CommitTs: row.CommitTs,