From 58011c3deab2fdd19c1d44719c9ef13bc43a0e9f Mon Sep 17 00:00:00 2001 From: CharlesCheung96 Date: Wed, 27 Apr 2022 14:17:00 +0800 Subject: [PATCH 01/14] fix tableMaxResolvedTs bug --- cdc/sink/mysql/mysql.go | 11 +++-------- cdc/sink/mysql/txn_cache.go | 2 +- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/cdc/sink/mysql/mysql.go b/cdc/sink/mysql/mysql.go index 8b752a00d58..de6542b54d3 100644 --- a/cdc/sink/mysql/mysql.go +++ b/cdc/sink/mysql/mysql.go @@ -268,13 +268,6 @@ func (s *mysqlSink) flushRowChangedEvents(ctx context.Context, receiver *notify. case <-receiver.C: } flushedResolvedTsMap, resolvedTxnsMap := s.txnCache.Resolved(&s.tableMaxResolvedTs) - if len(resolvedTxnsMap) == 0 { - s.tableMaxResolvedTs.Range(func(key, value interface{}) bool { - s.tableCheckpointTs.Store(key, value) - return true - }) - continue - } if s.cyclic != nil { // Filter rows if it is origin from downstream. @@ -283,7 +276,9 @@ func (s *mysqlSink) flushRowChangedEvents(ctx context.Context, receiver *notify. s.statistics.SubRowsCount(skippedRowCount) } - s.dispatchAndExecTxns(ctx, resolvedTxnsMap) + if len(resolvedTxnsMap) == 0 { + s.dispatchAndExecTxns(ctx, resolvedTxnsMap) + } for tableID, resolvedTs := range flushedResolvedTsMap { s.tableCheckpointTs.Store(tableID, resolvedTs) } diff --git a/cdc/sink/mysql/txn_cache.go b/cdc/sink/mysql/txn_cache.go index 2aa02ec299b..57f51517256 100644 --- a/cdc/sink/mysql/txn_cache.go +++ b/cdc/sink/mysql/txn_cache.go @@ -133,6 +133,7 @@ func splitResolvedTxn( continue } resolvedTs := v.(uint64) + flushedResolvedTsMap[tableID] = resolvedTs i := sort.Search(len(txns), func(i int) bool { return txns[i].commitTs > resolvedTs }) @@ -156,7 +157,6 @@ func splitResolvedTxn( resolvedTxns = append(resolvedTxns, txns.txns...) } resolvedRowsMap[tableID] = resolvedTxns - flushedResolvedTsMap[tableID] = resolvedTs } return } From 57e59664b837d9465e439b40830b19fddf734f07 Mon Sep 17 00:00:00 2001 From: CharlesCheung <61726649+CharlesCheung96@users.noreply.github.com> Date: Thu, 28 Apr 2022 15:34:57 +0800 Subject: [PATCH 02/14] Update mysql.go --- cdc/sink/mysql/mysql.go | 4 ++-- cdc/sink/mysql/mysql_worker.go | 4 ++++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/cdc/sink/mysql/mysql.go b/cdc/sink/mysql/mysql.go index de6542b54d3..b116d2912ba 100644 --- a/cdc/sink/mysql/mysql.go +++ b/cdc/sink/mysql/mysql.go @@ -258,7 +258,7 @@ func (s *mysqlSink) FlushRowChangedEvents(ctx context.Context, tableID model.Tab func (s *mysqlSink) flushRowChangedEvents(ctx context.Context, receiver *notify.Receiver) { defer func() { for _, worker := range s.workers { - worker.closedCh <- struct{}{} + worker.close() } }() for { @@ -276,7 +276,7 @@ func (s *mysqlSink) flushRowChangedEvents(ctx context.Context, receiver *notify. s.statistics.SubRowsCount(skippedRowCount) } - if len(resolvedTxnsMap) == 0 { + if len(resolvedTxnsMap) != 0 { s.dispatchAndExecTxns(ctx, resolvedTxnsMap) } for tableID, resolvedTs := range flushedResolvedTsMap { diff --git a/cdc/sink/mysql/mysql_worker.go b/cdc/sink/mysql/mysql_worker.go index bd9e9464922..9deca33ab4e 100644 --- a/cdc/sink/mysql/mysql_worker.go +++ b/cdc/sink/mysql/mysql_worker.go @@ -170,3 +170,7 @@ func (w *mysqlSinkWorker) cleanup() { } } } + +func (w *mysqlSinkWorker) close() { + w.closedCh <- struct{}{} +} From 4eee3c89088a8036203c93e6fa9e3c07615e66bf Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Fri, 13 May 2022 16:11:58 +0800 Subject: [PATCH 03/14] fix resolved bug --- cdc/sink/mysql/txn_cache.go | 51 ++++++++++++++++++------------------- 1 file changed, 25 insertions(+), 26 deletions(-) diff --git a/cdc/sink/mysql/txn_cache.go b/cdc/sink/mysql/txn_cache.go index 57f51517256..028c3c08af9 100644 --- a/cdc/sink/mysql/txn_cache.go +++ b/cdc/sink/mysql/txn_cache.go @@ -127,36 +127,35 @@ func splitResolvedTxn( ) (flushedResolvedTsMap map[model.TableID]uint64, resolvedRowsMap map[model.TableID][]*model.SingleTableTxn) { resolvedRowsMap = make(map[model.TableID][]*model.SingleTableTxn, len(unresolvedTxns)) flushedResolvedTsMap = make(map[model.TableID]uint64, len(unresolvedTxns)) - for tableID, txns := range unresolvedTxns { - v, ok := resolvedTsMap.Load(tableID) - if !ok { - continue - } - resolvedTs := v.(uint64) + resolvedTsMap.Range(func(k, v any) bool { + tableID := k.(model.TableID) + resolvedTs := v.(model.Ts) flushedResolvedTsMap[tableID] = resolvedTs + + txns := unresolvedTxns[tableID] i := sort.Search(len(txns), func(i int) bool { return txns[i].commitTs > resolvedTs }) - if i == 0 { - continue - } - var resolvedTxnsWithTheSameCommitTs []*txnsWithTheSameCommitTs - if i == len(txns) { - resolvedTxnsWithTheSameCommitTs = txns - delete(unresolvedTxns, tableID) - } else { - resolvedTxnsWithTheSameCommitTs = txns[:i] - unresolvedTxns[tableID] = txns[i:] - } - var txnsLength int - for _, txns := range resolvedTxnsWithTheSameCommitTs { - txnsLength += len(txns.txns) - } - resolvedTxns := make([]*model.SingleTableTxn, 0, txnsLength) - for _, txns := range resolvedTxnsWithTheSameCommitTs { - resolvedTxns = append(resolvedTxns, txns.txns...) + if i != 0 { + var resolvedTxnsWithTheSameCommitTs []*txnsWithTheSameCommitTs + if i == len(txns) { + resolvedTxnsWithTheSameCommitTs = txns + delete(unresolvedTxns, tableID) + } else { + resolvedTxnsWithTheSameCommitTs = txns[:i] + unresolvedTxns[tableID] = txns[i:] + } + var txnsLength int + for _, txns := range resolvedTxnsWithTheSameCommitTs { + txnsLength += len(txns.txns) + } + resolvedTxns := make([]*model.SingleTableTxn, 0, txnsLength) + for _, txns := range resolvedTxnsWithTheSameCommitTs { + resolvedTxns = append(resolvedTxns, txns.txns...) + } + resolvedRowsMap[tableID] = resolvedTxns } - resolvedRowsMap[tableID] = resolvedTxns - } + return true + }) return } From 4bc3d6ee01cd3c95bd91bbbecc69266b0f8baa51 Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Fri, 13 May 2022 16:35:08 +0800 Subject: [PATCH 04/14] add unit test --- cdc/sink/mysql/txn_cache.go | 3 --- cdc/sink/mysql/txn_cache_test.go | 5 +++-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/cdc/sink/mysql/txn_cache.go b/cdc/sink/mysql/txn_cache.go index 028c3c08af9..23fd2f03eef 100644 --- a/cdc/sink/mysql/txn_cache.go +++ b/cdc/sink/mysql/txn_cache.go @@ -115,9 +115,6 @@ func (c *unresolvedTxnCache) Resolved( ) (map[model.TableID]uint64, map[model.TableID][]*model.SingleTableTxn) { c.unresolvedTxnsMu.Lock() defer c.unresolvedTxnsMu.Unlock() - if len(c.unresolvedTxns) == 0 { - return nil, nil - } return splitResolvedTxn(resolvedTsMap, c.unresolvedTxns) } diff --git a/cdc/sink/mysql/txn_cache_test.go b/cdc/sink/mysql/txn_cache_test.go index 0656b93b3dd..ae2a35f6940 100644 --- a/cdc/sink/mysql/txn_cache_test.go +++ b/cdc/sink/mysql/txn_cache_test.go @@ -128,7 +128,7 @@ func TestSplitResolvedTxn(test *testing.T) { 2: uint64(13), 3: uint64(13), }, - expected: nil, + expected: map[model.TableID][]*model.SingleTableTxn{}, }, { input: []*model.RowChangedEvent{ @@ -264,7 +264,7 @@ func TestSplitResolvedTxn(test *testing.T) { for tableID, ts := range t.resolvedTsMap { resolvedTsMap.Store(tableID, ts) } - _, resolved := cache.Resolved(&resolvedTsMap) + flushedResolvedTsMap, resolved := cache.Resolved(&resolvedTsMap) for tableID, txns := range resolved { sort.Slice(txns, func(i, j int) bool { if txns[i].CommitTs != txns[j].CommitTs { @@ -275,6 +275,7 @@ func TestSplitResolvedTxn(test *testing.T) { resolved[tableID] = txns } require.Equal(test, t.expected, resolved, cmp.Diff(resolved, t.expected)) + require.Equal(test, t.resolvedTsMap, flushedResolvedTsMap) } } } From ab16bf3a8fa58a6cd42562fc5df573032825c502 Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Fri, 13 May 2022 17:02:38 +0800 Subject: [PATCH 05/14] add more ut --- cdc/sink/mysql/txn_cache_test.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/cdc/sink/mysql/txn_cache_test.go b/cdc/sink/mysql/txn_cache_test.go index ae2a35f6940..20bbfb57561 100644 --- a/cdc/sink/mysql/txn_cache_test.go +++ b/cdc/sink/mysql/txn_cache_test.go @@ -44,6 +44,7 @@ func TestSplitResolvedTxn(test *testing.T) { resolvedTsMap: map[model.TableID]uint64{ 1: uint64(6), 2: uint64(6), + 3: uint64(6), }, expected: map[model.TableID][]*model.SingleTableTxn{ 1: {{ @@ -70,6 +71,7 @@ func TestSplitResolvedTxn(test *testing.T) { 1: uint64(13), 2: uint64(13), 3: uint64(13), + 4: uint64(6), }, expected: map[model.TableID][]*model.SingleTableTxn{ 1: { @@ -139,6 +141,7 @@ func TestSplitResolvedTxn(test *testing.T) { resolvedTsMap: map[model.TableID]uint64{ 1: uint64(6), 2: uint64(6), + 3: uint64(13), }, expected: map[model.TableID][]*model.SingleTableTxn{}, }, @@ -159,6 +162,7 @@ func TestSplitResolvedTxn(test *testing.T) { resolvedTsMap: map[model.TableID]uint64{ 1: uint64(6), 2: uint64(6), + 3: uint64(13), }, expected: map[model.TableID][]*model.SingleTableTxn{ 1: { @@ -202,6 +206,7 @@ func TestSplitResolvedTxn(test *testing.T) { resolvedTsMap: map[model.TableID]uint64{ 1: uint64(13), 2: uint64(13), + 3: uint64(13), }, expected: map[model.TableID][]*model.SingleTableTxn{ 1: { From 739ef5701a715d421f50c64933e1049b1e23209d Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Fri, 13 May 2022 17:26:34 +0800 Subject: [PATCH 06/14] remove resolvedNotifier --- cdc/sink/mysql/mysql.go | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/cdc/sink/mysql/mysql.go b/cdc/sink/mysql/mysql.go index b116d2912ba..1773c3c5441 100644 --- a/cdc/sink/mysql/mysql.go +++ b/cdc/sink/mysql/mysql.go @@ -65,7 +65,7 @@ type mysqlSink struct { tableMaxResolvedTs sync.Map execWaitNotifier *notify.Notifier - resolvedNotifier *notify.Notifier + resolvedCh chan struct{} errCh chan error flushSyncWg sync.WaitGroup @@ -203,18 +203,14 @@ func NewMySQLSink( } sink.execWaitNotifier = new(notify.Notifier) - sink.resolvedNotifier = new(notify.Notifier) + sink.resolvedCh = make(chan struct{}) err = sink.createSinkWorkers(ctx) if err != nil { return nil, err } - receiver, err := sink.resolvedNotifier.NewReceiver(50 * time.Millisecond) - if err != nil { - return nil, err - } - go sink.flushRowChangedEvents(ctx, receiver) + go sink.flushRowChangedEvents(ctx) return sink, nil } @@ -241,12 +237,13 @@ func (s *mysqlSink) FlushRowChangedEvents(ctx context.Context, tableID model.Tab if !ok || v.(uint64) < resolvedTs { s.tableMaxResolvedTs.Store(tableID, resolvedTs) } - s.resolvedNotifier.Notify() // check and throw error select { case err := <-s.errCh: return 0, err + case s.resolvedCh <- struct{}{}: + // Notify `flushRowChangedEvents` to asynchronously write data. default: } @@ -255,7 +252,7 @@ func (s *mysqlSink) FlushRowChangedEvents(ctx context.Context, tableID model.Tab return checkpointTs, nil } -func (s *mysqlSink) flushRowChangedEvents(ctx context.Context, receiver *notify.Receiver) { +func (s *mysqlSink) flushRowChangedEvents(ctx context.Context) { defer func() { for _, worker := range s.workers { worker.close() @@ -265,7 +262,7 @@ func (s *mysqlSink) flushRowChangedEvents(ctx context.Context, receiver *notify. select { case <-ctx.Done(): return - case <-receiver.C: + case <-s.resolvedCh: } flushedResolvedTsMap, resolvedTxnsMap := s.txnCache.Resolved(&s.tableMaxResolvedTs) @@ -528,7 +525,7 @@ func (s *mysqlSink) cleanTableResource(tableID model.TableID) { func (s *mysqlSink) Close(ctx context.Context) error { s.execWaitNotifier.Close() - s.resolvedNotifier.Close() + close(s.resolvedCh) err := s.db.Close() s.cancel() return cerror.WrapError(cerror.ErrMySQLConnectionError, err) From a59b98d67f7c6d1742d507b1760d61b8f41e5660 Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Fri, 13 May 2022 21:49:16 +0800 Subject: [PATCH 07/14] optimize map iteration --- cdc/sink/mysql/txn_cache.go | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/cdc/sink/mysql/txn_cache.go b/cdc/sink/mysql/txn_cache.go index 23fd2f03eef..bfc8355888e 100644 --- a/cdc/sink/mysql/txn_cache.go +++ b/cdc/sink/mysql/txn_cache.go @@ -122,19 +122,29 @@ func (c *unresolvedTxnCache) Resolved( func splitResolvedTxn( resolvedTsMap *sync.Map, unresolvedTxns map[model.TableID][]*txnsWithTheSameCommitTs, ) (flushedResolvedTsMap map[model.TableID]uint64, resolvedRowsMap map[model.TableID][]*model.SingleTableTxn) { - resolvedRowsMap = make(map[model.TableID][]*model.SingleTableTxn, len(unresolvedTxns)) flushedResolvedTsMap = make(map[model.TableID]uint64, len(unresolvedTxns)) resolvedTsMap.Range(func(k, v any) bool { tableID := k.(model.TableID) resolvedTs := v.(model.Ts) flushedResolvedTsMap[tableID] = resolvedTs + return true + }) - txns := unresolvedTxns[tableID] + resolvedRowsMap = make(map[model.TableID][]*model.SingleTableTxn, len(unresolvedTxns)) + var ( + txns []*txnsWithTheSameCommitTs + ok bool + resolvedTxnsWithTheSameCommitTs []*txnsWithTheSameCommitTs + txnsLength int + ) + for tableID, resolvedTs := range flushedResolvedTsMap { + if txns, ok = unresolvedTxns[tableID]; !ok { + continue + } i := sort.Search(len(txns), func(i int) bool { return txns[i].commitTs > resolvedTs }) if i != 0 { - var resolvedTxnsWithTheSameCommitTs []*txnsWithTheSameCommitTs if i == len(txns) { resolvedTxnsWithTheSameCommitTs = txns delete(unresolvedTxns, tableID) @@ -142,7 +152,6 @@ func splitResolvedTxn( resolvedTxnsWithTheSameCommitTs = txns[:i] unresolvedTxns[tableID] = txns[i:] } - var txnsLength int for _, txns := range resolvedTxnsWithTheSameCommitTs { txnsLength += len(txns.txns) } @@ -152,7 +161,7 @@ func splitResolvedTxn( } resolvedRowsMap[tableID] = resolvedTxns } - return true - }) + } + return } From 94cfcb2d5b95c64aef0aecc062d972dce27b4a3d Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Mon, 16 May 2022 10:31:19 +0800 Subject: [PATCH 08/14] address comment --- cdc/sink/mysql/txn_cache.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/cdc/sink/mysql/txn_cache.go b/cdc/sink/mysql/txn_cache.go index bfc8355888e..65fc827f2f7 100644 --- a/cdc/sink/mysql/txn_cache.go +++ b/cdc/sink/mysql/txn_cache.go @@ -122,6 +122,13 @@ func (c *unresolvedTxnCache) Resolved( func splitResolvedTxn( resolvedTsMap *sync.Map, unresolvedTxns map[model.TableID][]*txnsWithTheSameCommitTs, ) (flushedResolvedTsMap map[model.TableID]uint64, resolvedRowsMap map[model.TableID][]*model.SingleTableTxn) { + var ( + ok bool + txnsLength int + txns []*txnsWithTheSameCommitTs + resolvedTxnsWithTheSameCommitTs []*txnsWithTheSameCommitTs + ) + flushedResolvedTsMap = make(map[model.TableID]uint64, len(unresolvedTxns)) resolvedTsMap.Range(func(k, v any) bool { tableID := k.(model.TableID) @@ -131,12 +138,6 @@ func splitResolvedTxn( }) resolvedRowsMap = make(map[model.TableID][]*model.SingleTableTxn, len(unresolvedTxns)) - var ( - txns []*txnsWithTheSameCommitTs - ok bool - resolvedTxnsWithTheSameCommitTs []*txnsWithTheSameCommitTs - txnsLength int - ) for tableID, resolvedTs := range flushedResolvedTsMap { if txns, ok = unresolvedTxns[tableID]; !ok { continue From fb7c7c777d4dc8e6e1a2fb436b30dd43e7d54866 Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Mon, 16 May 2022 11:11:34 +0800 Subject: [PATCH 09/14] address comment --- cdc/sink/mysql/mysql.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/cdc/sink/mysql/mysql.go b/cdc/sink/mysql/mysql.go index 1773c3c5441..309bbd7d192 100644 --- a/cdc/sink/mysql/mysql.go +++ b/cdc/sink/mysql/mysql.go @@ -197,14 +197,13 @@ func NewMySQLSink( statistics: metrics.NewStatistics(ctx, metrics.SinkTypeDB), metricConflictDetectDurationHis: metricConflictDetectDurationHis, metricBucketSizeCounters: metricBucketSizeCounters, + execWaitNotifier: new(notify.Notifier), + resolvedCh: make(chan struct{}, 1), errCh: make(chan error, 1), forceReplicate: replicaConfig.ForceReplicate, cancel: cancel, } - sink.execWaitNotifier = new(notify.Notifier) - sink.resolvedCh = make(chan struct{}) - err = sink.createSinkWorkers(ctx) if err != nil { return nil, err From 1ea5f8596849dfce363162787df3192b7e70d8d2 Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Mon, 16 May 2022 11:18:30 +0800 Subject: [PATCH 10/14] address comment --- cdc/sink/mysql/mysql.go | 4 ++-- cdc/sink/mysql/txn_cache.go | 8 ++++---- cdc/sink/mysql/txn_cache_test.go | 4 ++-- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/cdc/sink/mysql/mysql.go b/cdc/sink/mysql/mysql.go index 309bbd7d192..7ede7d59ae2 100644 --- a/cdc/sink/mysql/mysql.go +++ b/cdc/sink/mysql/mysql.go @@ -263,7 +263,7 @@ func (s *mysqlSink) flushRowChangedEvents(ctx context.Context) { return case <-s.resolvedCh: } - flushedResolvedTsMap, resolvedTxnsMap := s.txnCache.Resolved(&s.tableMaxResolvedTs) + checkpointTsMap, resolvedTxnsMap := s.txnCache.Resolved(&s.tableMaxResolvedTs) if s.cyclic != nil { // Filter rows if it is origin from downstream. @@ -275,7 +275,7 @@ func (s *mysqlSink) flushRowChangedEvents(ctx context.Context) { if len(resolvedTxnsMap) != 0 { s.dispatchAndExecTxns(ctx, resolvedTxnsMap) } - for tableID, resolvedTs := range flushedResolvedTsMap { + for tableID, resolvedTs := range checkpointTsMap { s.tableCheckpointTs.Store(tableID, resolvedTs) } } diff --git a/cdc/sink/mysql/txn_cache.go b/cdc/sink/mysql/txn_cache.go index 65fc827f2f7..84c334ba300 100644 --- a/cdc/sink/mysql/txn_cache.go +++ b/cdc/sink/mysql/txn_cache.go @@ -121,7 +121,7 @@ func (c *unresolvedTxnCache) Resolved( func splitResolvedTxn( resolvedTsMap *sync.Map, unresolvedTxns map[model.TableID][]*txnsWithTheSameCommitTs, -) (flushedResolvedTsMap map[model.TableID]uint64, resolvedRowsMap map[model.TableID][]*model.SingleTableTxn) { +) (checkpointTsMap map[model.TableID]uint64, resolvedRowsMap map[model.TableID][]*model.SingleTableTxn) { var ( ok bool txnsLength int @@ -129,16 +129,16 @@ func splitResolvedTxn( resolvedTxnsWithTheSameCommitTs []*txnsWithTheSameCommitTs ) - flushedResolvedTsMap = make(map[model.TableID]uint64, len(unresolvedTxns)) + checkpointTsMap = make(map[model.TableID]uint64, len(unresolvedTxns)) resolvedTsMap.Range(func(k, v any) bool { tableID := k.(model.TableID) resolvedTs := v.(model.Ts) - flushedResolvedTsMap[tableID] = resolvedTs + checkpointTsMap[tableID] = resolvedTs return true }) resolvedRowsMap = make(map[model.TableID][]*model.SingleTableTxn, len(unresolvedTxns)) - for tableID, resolvedTs := range flushedResolvedTsMap { + for tableID, resolvedTs := range checkpointTsMap { if txns, ok = unresolvedTxns[tableID]; !ok { continue } diff --git a/cdc/sink/mysql/txn_cache_test.go b/cdc/sink/mysql/txn_cache_test.go index 20bbfb57561..1332aec1bb5 100644 --- a/cdc/sink/mysql/txn_cache_test.go +++ b/cdc/sink/mysql/txn_cache_test.go @@ -269,7 +269,7 @@ func TestSplitResolvedTxn(test *testing.T) { for tableID, ts := range t.resolvedTsMap { resolvedTsMap.Store(tableID, ts) } - flushedResolvedTsMap, resolved := cache.Resolved(&resolvedTsMap) + checkpointTsMap, resolved := cache.Resolved(&resolvedTsMap) for tableID, txns := range resolved { sort.Slice(txns, func(i, j int) bool { if txns[i].CommitTs != txns[j].CommitTs { @@ -280,7 +280,7 @@ func TestSplitResolvedTxn(test *testing.T) { resolved[tableID] = txns } require.Equal(test, t.expected, resolved, cmp.Diff(resolved, t.expected)) - require.Equal(test, t.resolvedTsMap, flushedResolvedTsMap) + require.Equal(test, t.resolvedTsMap, checkpointTsMap) } } } From 825d475a9b0ba671b633d53fecf80fd3d1f88359 Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Mon, 16 May 2022 13:47:00 +0800 Subject: [PATCH 11/14] add some ut to mysql sink --- cdc/sink/mysql/mysql.go | 3 ++- cdc/sink/mysql/mysql_test.go | 11 ++++++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/cdc/sink/mysql/mysql.go b/cdc/sink/mysql/mysql.go index 7ede7d59ae2..94aeab2df01 100644 --- a/cdc/sink/mysql/mysql.go +++ b/cdc/sink/mysql/mysql.go @@ -239,6 +239,8 @@ func (s *mysqlSink) FlushRowChangedEvents(ctx context.Context, tableID model.Tab // check and throw error select { + case <-ctx.Done(): + return 0, ctx.Err() case err := <-s.errCh: return 0, err case s.resolvedCh <- struct{}{}: @@ -524,7 +526,6 @@ func (s *mysqlSink) cleanTableResource(tableID model.TableID) { func (s *mysqlSink) Close(ctx context.Context) error { s.execWaitNotifier.Close() - close(s.resolvedCh) err := s.db.Close() s.cancel() return cerror.WrapError(cerror.ErrMySQLConnectionError, err) diff --git a/cdc/sink/mysql/mysql_test.go b/cdc/sink/mysql/mysql_test.go index 2a571166436..f39fdbda4de 100644 --- a/cdc/sink/mysql/mysql_test.go +++ b/cdc/sink/mysql/mysql_test.go @@ -1692,6 +1692,9 @@ func TestNewMySQLSink(t *testing.T) { require.Nil(t, err) err = sink.Close(ctx) require.Nil(t, err) + // Test idempotency of `Close` interface + err = sink.Close(ctx) + require.Nil(t, err) } func TestMySQLSinkClose(t *testing.T) { @@ -1770,7 +1773,7 @@ func TestMySQLSinkFlushResolvedTs(t *testing.T) { GetDBConnImpl = backupGetDBConn }() - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) changefeed := "test-changefeed" sinkURI, err := url.Parse("mysql://127.0.0.1:4000/?time-zone=UTC&worker-count=4") @@ -1828,6 +1831,12 @@ func TestMySQLSinkFlushResolvedTs(t *testing.T) { require.Nil(t, err) require.True(t, sink.getTableCheckpointTs(model.TableID(2)) <= 5) _ = sink.Close(ctx) + _, err = sink.FlushRowChangedEvents(ctx, model.TableID(2), 6) + require.Nil(t, err) + + cancel() + _, err = sink.FlushRowChangedEvents(ctx, model.TableID(2), 6) + require.Regexp(t, ".*context canceled.*", err) } func TestGBKSupported(t *testing.T) { From da4924f4680c102ac664d237999e83abe5a41a6b Mon Sep 17 00:00:00 2001 From: WizardXiao <89761062+WizardXiao@users.noreply.github.com> Date: Tue, 17 May 2022 05:44:37 +0800 Subject: [PATCH 12/14] DM/Openapi: support start task by some conditions (#5349) close pingcap/tiflow#5348 --- dm/_utils/terror_gen/errors_release.txt | 2 +- dm/dm/config/task.go | 2 +- dm/dm/config/task_converters.go | 37 ++- dm/dm/master/openapi_controller.go | 58 +++- dm/dm/master/openapi_controller_test.go | 20 ++ dm/errors.toml | 2 +- dm/openapi/gen.server.go | 205 +++++++------- dm/openapi/gen.types.go | 9 + dm/openapi/spec/dm.yaml | 22 +- dm/pkg/ha/task_cli_args.go | 14 +- dm/pkg/ha/task_cli_args_test.go | 2 +- dm/pkg/terror/error_list.go | 2 +- dm/syncer/syncer.go | 17 +- dm/tests/openapi/client/openapi_task_check | 102 ++++++- dm/tests/openapi/run.sh | 300 +++++++++++++++++++++ 15 files changed, 665 insertions(+), 129 deletions(-) diff --git a/dm/_utils/terror_gen/errors_release.txt b/dm/_utils/terror_gen/errors_release.txt index 02941319971..06ab92f09f6 100644 --- a/dm/_utils/terror_gen/errors_release.txt +++ b/dm/_utils/terror_gen/errors_release.txt @@ -153,7 +153,7 @@ ErrConfigReadCfgFromFile,[code=20018:class=config:scope=internal:level=medium], ErrConfigNeedUniqueTaskName,[code=20019:class=config:scope=internal:level=medium], "Message: must specify a unique task name, Workaround: Please check the `name` config in task configuration file." ErrConfigInvalidTaskMode,[code=20020:class=config:scope=internal:level=medium], "Message: please specify right task-mode, support `full`, `incremental`, `all`, Workaround: Please check the `task-mode` config in task configuration file." ErrConfigNeedTargetDB,[code=20021:class=config:scope=internal:level=medium], "Message: must specify target-database, Workaround: Please check the `target-database` config in task configuration file." -ErrConfigMetadataNotSet,[code=20022:class=config:scope=internal:level=medium], "Message: mysql-instance(%d) must set meta for task-mode %s, Workaround: Please check the `meta` config in task configuration file." +ErrConfigMetadataNotSet,[code=20022:class=config:scope=internal:level=medium], "Message: mysql-instance(%s) must set meta for task-mode %s, Workaround: Please check the `meta` config in task configuration file." ErrConfigRouteRuleNotFound,[code=20023:class=config:scope=internal:level=medium], "Message: mysql-instance(%d)'s route-rules %s not exist in routes, Workaround: Please check the `route-rules` config in task configuration file." ErrConfigFilterRuleNotFound,[code=20024:class=config:scope=internal:level=medium], "Message: mysql-instance(%d)'s filter-rules %s not exist in filters, Workaround: Please check the `filter-rules` config in task configuration file." ErrConfigColumnMappingNotFound,[code=20025:class=config:scope=internal:level=medium], "Message: mysql-instance(%d)'s column-mapping-rules %s not exist in column-mapping, Workaround: Please check the `column-mapping-rules` config in task configuration file." diff --git a/dm/dm/config/task.go b/dm/dm/config/task.go index e9c4f283376..22316d0f89e 100644 --- a/dm/dm/config/task.go +++ b/dm/dm/config/task.go @@ -660,7 +660,7 @@ func (c *TaskConfig) adjust() error { } case ModeIncrement: if inst.Meta == nil { - return terror.ErrConfigMetadataNotSet.Generate(i, c.TaskMode) + return terror.ErrConfigMetadataNotSet.Generate(inst.SourceID, c.TaskMode) } err := inst.Meta.Verify() if err != nil { diff --git a/dm/dm/config/task_converters.go b/dm/dm/config/task_converters.go index 9cb6034d529..187ae9e0ef3 100644 --- a/dm/dm/config/task_converters.go +++ b/dm/dm/config/task_converters.go @@ -166,10 +166,12 @@ func OpenAPITaskToSubTaskConfigs(task *openapi.Task, toDBCfg *DBConfig, sourceCf } subTaskCfg.Meta = meta } - // check must set meta when mode is ModeIncrement + + // if there is no meta for incremental task, we print a warning log if subTaskCfg.Meta == nil && subTaskCfg.Mode == ModeIncrement { - return nil, terror.ErrConfigMetadataNotSet.Generate(i, ModeIncrement) + log.L().Warn("mysql-instance doesn't set meta for incremental mode, user should specify start_time to start task.", zap.String("sourceID", sourceCfg.SourceName)) } + // set shard config if task.ShardMode != nil { subTaskCfg.IsSharding = true @@ -691,3 +693,34 @@ func genFilterRuleName(sourceName string, idx int) string { // NOTE that we don't have user input filter rule name in sub task config, so we make one by ourself return fmt.Sprintf("%s-filter-rule-%d", sourceName, idx) } + +func OpenAPIStartTaskReqToTaskCliArgs(req openapi.StartTaskRequest) (*TaskCliArgs, error) { + if req.StartTime == nil && req.SafeModeTimeDuration == nil { + return nil, nil + } + cliArgs := &TaskCliArgs{} + if req.StartTime != nil { + cliArgs.StartTime = *req.StartTime + } + if req.SafeModeTimeDuration != nil { + cliArgs.SafeModeDuration = *req.SafeModeTimeDuration + } + + if err := cliArgs.Verify(); err != nil { + return nil, err + } + return cliArgs, nil +} + +func OpenAPIStopTaskReqToTaskCliArgs(req openapi.StopTaskRequest) (*TaskCliArgs, error) { + if req.TimeoutDuration == nil { + return nil, nil + } + cliArgs := &TaskCliArgs{ + WaitTimeOnStop: *req.TimeoutDuration, + } + if err := cliArgs.Verify(); err != nil { + return nil, err + } + return cliArgs, nil +} diff --git a/dm/dm/master/openapi_controller.go b/dm/dm/master/openapi_controller.go index 831ac1d7a36..f00d21af8f3 100644 --- a/dm/dm/master/openapi_controller.go +++ b/dm/dm/master/openapi_controller.go @@ -24,9 +24,14 @@ import ( "fmt" "strings" + clientv3 "go.etcd.io/etcd/client/v3" + "github.com/pingcap/log" "go.uber.org/zap" + "github.com/pingcap/tiflow/dm/dm/master/scheduler" + "github.com/pingcap/tiflow/dm/pkg/ha" + "github.com/pingcap/tiflow/dm/checker" dmcommon "github.com/pingcap/tiflow/dm/dm/common" "github.com/pingcap/tiflow/dm/dm/config" @@ -377,7 +382,7 @@ func (s *Server) checkOpenAPITaskBeforeOperate(ctx context.Context, task *openap if sourceCfg := s.scheduler.GetSourceCfgByID(cfg.SourceName); sourceCfg != nil { sourceCfgMap[cfg.SourceName] = sourceCfg } else { - return nil, "", terror.ErrSchedulerSourceCfgNotExist.Generate(sourceCfg.SourceID) + return nil, "", terror.ErrSchedulerSourceCfgNotExist.Generate(cfg.SourceName) } } // generate sub task configs @@ -666,6 +671,10 @@ func (s *Server) startTask(ctx context.Context, taskName string, req openapi.Sta if !ok { return terror.ErrSchedulerSourceCfgNotExist.Generate(sourceName) } + // start task check. incremental task need to specify meta or start time + if subTaskCfg.Meta == nil && subTaskCfg.Mode == config.ModeIncrement && req.StartTime == nil { + return terror.ErrConfigMetadataNotSet.Generate(sourceName, config.ModeIncrement) + } cfg := s.scheduler.GetSourceCfgByID(sourceName) if cfg == nil { return terror.ErrSchedulerSourceCfgNotExist.Generate(sourceName) @@ -679,10 +688,14 @@ func (s *Server) startTask(ctx context.Context, taskName string, req openapi.Sta return nil } - // TODO(ehco) support other start args after https://github.com/pingcap/tiflow/pull/4601 merged + var ( + release scheduler.ReleaseFunc + err error + ) + // removeMeta if req.RemoveMeta != nil && *req.RemoveMeta { // use same latch for remove-meta and start-task - release, err := s.scheduler.AcquireSubtaskLatch(taskName) + release, err = s.scheduler.AcquireSubtaskLatch(taskName) if err != nil { return terror.ErrSchedulerLatchInUse.Generate("RemoveMeta", taskName) } @@ -693,8 +706,21 @@ func (s *Server) startTask(ctx context.Context, taskName string, req openapi.Sta if err != nil { return terror.Annotate(err, "while removing metadata") } + } + + // handle task cli args + cliArgs, err := config.OpenAPIStartTaskReqToTaskCliArgs(req) + if err != nil { + return terror.Annotate(err, "while converting task command line arguments") + } + + if err = handleCliArgs(s.etcdClient, taskName, *req.SourceNameList, cliArgs); err != nil { + return err + } + if release != nil { release() } + return s.scheduler.UpdateExpectSubTaskStage(pb.Stage_Running, taskName, *req.SourceNameList...) } @@ -705,10 +731,34 @@ func (s *Server) stopTask(ctx context.Context, taskName string, req openapi.Stop sourceNameList := openapi.SourceNameList(s.getTaskSourceNameList(taskName)) req.SourceNameList = &sourceNameList } - // TODO(ehco): support stop req after https://github.com/pingcap/tiflow/pull/4601 merged + // handle task cli args + cliArgs, err := config.OpenAPIStopTaskReqToTaskCliArgs(req) + if err != nil { + return terror.Annotate(err, "while converting task command line arguments") + } + if err = handleCliArgs(s.etcdClient, taskName, *req.SourceNameList, cliArgs); err != nil { + return err + } return s.scheduler.UpdateExpectSubTaskStage(pb.Stage_Stopped, taskName, *req.SourceNameList...) } +// handleCliArgs handles cli args. +// it will try to delete args if cli args is nil. +func handleCliArgs(cli *clientv3.Client, taskName string, sources []string, cliArgs *config.TaskCliArgs) error { + if cliArgs == nil { + err := ha.DeleteTaskCliArgs(cli, taskName, sources) + if err != nil { + return terror.Annotate(err, "while removing task command line arguments") + } + } else { + err := ha.PutTaskCliArgs(cli, taskName, sources, *cliArgs) + if err != nil { + return terror.Annotate(err, "while putting task command line arguments") + } + } + return nil +} + // nolint:unparam func (s *Server) convertTaskConfig(ctx context.Context, req openapi.ConverterTaskRequest) (*openapi.Task, *config.TaskConfig, error) { if req.TaskConfigFile != nil { diff --git a/dm/dm/master/openapi_controller_test.go b/dm/dm/master/openapi_controller_test.go index cfb314027fc..7925366b6fc 100644 --- a/dm/dm/master/openapi_controller_test.go +++ b/dm/dm/master/openapi_controller_test.go @@ -22,6 +22,8 @@ import ( "context" "testing" + "github.com/pingcap/tiflow/dm/pkg/ha" + "github.com/pingcap/failpoint" "github.com/pingcap/tiflow/dm/checker" "github.com/pingcap/tiflow/dm/dm/config" @@ -372,6 +374,24 @@ func (s *OpenAPIControllerSuite) TestTaskController() { // stop success s.Nil(server.stopTask(ctx, s.testTask.Name, openapi.StopTaskRequest{})) s.Equal(server.scheduler.GetExpectSubTaskStage(s.testTask.Name, s.testSource.SourceName).Expect, pb.Stage_Stopped) + + // start with cli args + startTime := "2022-05-05 12:12:12" + safeModeTimeDuration := "10s" + req = openapi.StartTaskRequest{ + StartTime: &startTime, + SafeModeTimeDuration: &safeModeTimeDuration, + } + s.Nil(server.startTask(ctx, s.testTask.Name, req)) + taskCliConf, err := ha.GetTaskCliArgs(server.etcdClient, s.testTask.Name, s.testSource.SourceName) + s.Nil(err) + s.NotNil(taskCliConf) + s.Equal(startTime, taskCliConf.StartTime) + s.Equal(safeModeTimeDuration, taskCliConf.SafeModeDuration) + + // stop success + s.Nil(server.stopTask(ctx, s.testTask.Name, openapi.StopTaskRequest{})) + s.Equal(server.scheduler.GetExpectSubTaskStage(s.testTask.Name, s.testSource.SourceName).Expect, pb.Stage_Stopped) } // delete diff --git a/dm/errors.toml b/dm/errors.toml index ed7c1214ae4..5ac789ecfc5 100644 --- a/dm/errors.toml +++ b/dm/errors.toml @@ -929,7 +929,7 @@ workaround = "Please check the `target-database` config in task configuration fi tags = ["internal", "medium"] [error.DM-config-20022] -message = "mysql-instance(%d) must set meta for task-mode %s" +message = "mysql-instance(%s) must set meta for task-mode %s" description = "" workaround = "Please check the `meta` config in task configuration file." tags = ["internal", "medium"] diff --git a/dm/openapi/gen.server.go b/dm/openapi/gen.server.go index dd4cc1725fe..bf40d67a7d9 100644 --- a/dm/openapi/gen.server.go +++ b/dm/openapi/gen.server.go @@ -1248,108 +1248,109 @@ func RegisterHandlersWithOptions(router *gin.Engine, si ServerInterface, options // Base64 encoded, gzipped, json marshaled Swagger object var swaggerSpec = []string{ - "H4sIAAAAAAAC/+w9XXPbOJJ/Bae7h5kpyZJsx0l8tQ9J7Mn6zvko21NzW1M5BSJBCWsSYADQHm3K/30L", - "HyRBEiAp23KssfdhxxFBoNHo7240vw8CmqSUICL44PD7gAdLlED155sYMfEBErhA7IKmNKaLlfw9ZTRF", - "TGCkRi0pF/K/6E+YpDEaHA6muy93JjuTnelgOBCrVP7EBcNkMbgZDlLKqsNfT17vFeMwEWiB2ODmZjhg", - "6FuGGQoHh3/oRczLX4rRdP5PFAg567s44wKxD1D+fxNGGIbq1xDxgOFUYEoGh+pXxDmgERBLBIKMMUQE", - "SNQkgNAQDYaubR2+2j1w7g3G+Ao116EkxgQBLqDIzGqYm2XsFQTLUDHrnNIYQSKnjREMkQN+zO2Z1B7M", - "0B6TEpig6rHpaRwbq52FejPfbAHdUCO55XD8JAQloc0STWkzYY37L4aiweHgP8clkY4NhY6d5HkzHCwY", - "jCCBved5r8fbU2hUFDPMYqxpHAuU8K75NBHa0xmMQMag+nfKaILEEmW8N5Cfi1fsia8pu7w1nL+rl/1w", - "3viPUr/6w/hsTjMSzjjNWIBmOSFX11RDgB4C5JCC7zTOmssmK/4tHk3aFhRw4V9KPuxcRI11rdBkRz1F", - "f3aUqK9C6kKUkz8puUJM0izkl2foW4Y0FVXPVkB+2UVScgJFSJBfzgJKIryYRTh2IE0/BPIhwASsYBKD", - "iLIECrAUIuWH43FIA76TYrIIYLoT0GT8r+VY4HA+5gLOYzSWi4z0PBmDct6RnG4UZXG840Rb1855SglH", - "f8mt2xSjtuOA1EkbDEGBzhUFeUlDE1gXhvQkltjy0fyom+jNin6I74mUXZhzLXqEuTyYMxTDlbVsTQ4G", - "8g8gKOCCpgACJocDZsYPa1BaWCoEe7c8/wgTdCpHOwn+KEvSc2WHNMEr7ZMwS1KQEdyESS4bI4HCmSJE", - "9Zum3cHhIKTZPEbl2ZEsmUtbbjhAXOAECjQTVMB4xuh13zcjTDBfonA2Xwm09ktrLKQhc+wKE3GwP+i0", - "UCvvD5uIamylDqYbSy5iOybr0RpkopPY1NPZHJOYLmYLgUMnfTCByQK8vzg5ypV5lnLBEEyAfrWi7NBr", - "OI2C3d0RCiavRtMpej2a78JgNNnd34XBdDqZTPYOp6OXr/ZfD4YDksWx3FfNZC1VZAVEt9YvQJTyrNT6", - "7WBqxT/HZGci/7fbH5YQG2snglksaWVnrB/oJaqwSTBCzFAgKFuB6yViSIGmzyWmC4C5FAySnnpAsAnp", - "cMwYZb9jsfyAOHfaOpJklL4BSI5tkJH6dRZIs6fxrnoGAm0S1blpaF5N+ML3ZmKA6tIN5URDGx4XJ71H", - "wli0JySifgMg0INmLrYwzwCWx1ZIjcwnNqSk6Wfy192m+j4toNr3ph0Seez+HYZQwN6eQ9Xbdjg4SoAp", - "TdtDaEpOkau3b0LT7/1vwrgyG96Etn3uEfrSmNo82NpguFfAjQ2yYfClDXePOC9M/A2D/AEvmDJh2QIJ", - "fo/AVyZ+iJ3cL+Vk83LOh4D+Qirgc8GyQGQM+XehAZwFyvGY8W9x1al5d3b85uIYXLx5e3oMvorpV/DT", - "Vxx+BZiIn6bTn8HHTxfg42+np+DNbxefZicf350dfzj+eDH8fHby4c3ZP8D/Hv9Dv/EzGP9y8R9/GLmP", - "whkmIfrzC3h3+tv5xfHZ8RH4ZfwzOP74/uTj8d9OCKFHb8HR8a9vfju9AO/+/ubs/Pjib5mIXiXzffDu", - "0+npm4vj/N/SrHKFJczWmp5aOHcGSpSx6xiufp/28EyL1/O5LKw6j6oWvLv38PTeZDK5c3j6lMKw2+2K", - "KQzdbleLF+S3MxIkoDGXLaYot2o9Lyz+Jj4YXTDEufOh9lP6w1TDWsMhsuezlq5uxQG4C+W1KOxd6cIX", - "Lu9FQ692D6ad2DBU30VKn5QFjtoDVsESBZczhrhyS+oUlzI0UiOAGWF7Q+VDzEEKOUfhDnCz+l2CKMMq", - "jB07rUviTqdX+ykIKBnidXqjOOPLigenna3qrL8zLBBXvprel1xABbLlDlKKiQBc/gIFOPoAAkg0J2MB", - "YCQ9A4YKv1S+loffGikZ/i2eBZQIRBx7499isKIZuIZEWDusnJ1D04CvwbRUNbk2kOpmCL4Gu/5He+5H", - "d9Av/+1UMCsSNDf7WxrCHOc0FTjBXOAA8CVkoUSjlABSe4NrLJY64m6OhpJ4BTKOQulhEwCNowpoEGSM", - "A0y8cx4dnYKk4pwWR1MPPlrn5CJcR65mE1nTu6ulzxlzOfllRCKQ+89SkNIYBytQiTg3ff8/U8yMXZfz", - "06TOTGqQjiAIrOMzxXK2/5yrEE8cxFJz8k92pQ2/Yt29g0lj6YslAvlgyUEpYpiGOIBxvAJG5EXNkIze", - "VjgEZnJwBeMMHQK1hCQojgJKQn476BlKICYznsIAVXYwfVGH/wMmOMkSEDGEQIj5JVBvKRjev73N8jc+", - "mrjXOPYDxu264nSVNVMU4GhlgOfZ3IrORZSBBtg74CQChAqg38SSJlTOXYoqAShB4BrHMZgjJYB2wLmC", - "1OR2DsEuRC8P9vf2R9HL19FoOkWvRvMQ7ebhUGlovtJbmXYHAGuc3sSxi9/Vsb5TTNzEh9JoOjWVM2WT", - "xVXkeaYflhahpcOe48hbFUe+8VFJt7dii+0qlZjqidL1qE5Rw2GeCNVsohVLidSfalidDsH09cvXP7uY", - "vbKuh/hcNHcHYmsnLjcIGnF5FYQE6P4BCKAIlrMsnSVFRVQViOultFCYFOJqLMhSbUwVp2O5Xz42d8rV", - "9eiz3PfOmGdzNaXLTHSXXuRI1FRZme4sI0S+3CU5q8TqJCJ7u64T9iE9B9slis+VuVqkY5p8ps1ZJXtU", - "emdYBsm6ozC1wNg5CjKGxaq5jDKiTZUM53HVwtPqLcIoDgvNtsRhiIg2rhdIFE6NPVFlEhAxmqghyvaK", - "pJ3TFEs19xUxMYNxTK9ROAtIE+x3NEkoAR+NZD4/PwXyHRzhAOrgQYGsTuRwHs8C6He8rIm1qMpH2tTm", - "pFk5sdyJd+pfrenkPj4ffzDWwvj/Xkxe51Uhta11r3qJVv5F35XryVNJGb6SW7tEq6IkxVq8Y726Z1TF", - "pQMHTQCd3GGcsveMZqkjbBzGzVK3zoOOMONiFtNAaxnXK9IbReF60wodTXcNzcj6EzaCJWr2YbnnxkYK", - "sK0FnUgtqnRqokb/7rb1KnZJBGPeCI8UmkR54VoCSLdJvV4R8eb1pjYxZmWpLnutR6WZrY1I6c1lUkAp", - "qcy1zHGpdy8IUQyvqEOb6d+Lur4CVzWzz8WJuYvvwjYwNZHuwkdnBAByfk1Z6J2xGFCdcm//xUEfSzSP", - "MLjnlg+teff2JgcubzbNAwqtpaxqUGmqFP5I20u26yIZ1dJorTmjfJx8p61e1KoU7V0Vqq2O9YpuO9Of", - "kF/2L+q4gPyyLOkYDjLusvXM3uTDxv4YpaJntd3MEaE2S1ZZOP9XixRqMXyskl2/4aNHjfpZPzbKfesV", - "FqSrnqW7KEUbRFzF/aRJdM2oy/bMaZ4XwHTSfEkqd6BfhtIYB9BDx7VyzGbUzFQzG2s7XgFd8mzC4A6Z", - "uGYdZ05ZNiBO2pFueWtlJ0MJvUKzBOm8cm9Not9TcWVlys4hV5ZQSK+J8Yfyn92h+3IfvRi3xgVOP/xc", - "0LR1sxtZtJJkb1peWZL2pF6r1HSN6q7ejBRTGPaExMq+Wpm8mtcK+WXOPU1B35/z/J5rlxw/VwONEdpz", - "Z+crEpQ7U6lj987kI6Bgs9lUJV+GLruVIU7jKxTOlNlJg8uZJz/cKjvyWxBO1LjTn36BkKPS7NMpH0p0", - "tASu5K7daXbj1Ot5HZudS0xgspBYcS1hp5KulzhYFlEezEH+8lrOaSOU1jPo5dA7ASJiJtK+1QMmqzGb", - "oyUmoRVH6vNu4fU4JKV81rqjygj/jnSxALrKLy72gMvUaPfGgcUHC+mJtp25HlA7dsgQyMgon8U++la2", - "rri/nS6ijQh7k5VTH/aLdFWPx3kYdT5w4cnySW2m8pGVi5lVzv+uATJf3VGT0y5MOUNTePrERIRjiT+W", - "aS8ZhiGWb8H4c2V0l9x/i8kpXfyqJjuTc7nUMiJLSAI001dJZ3nF2RKSBeosYLDsHG2YA56l0nxXeS6V", - "D9c3VMMwBmmcLTDpc4MULwhlaKYyp5IYCvTXbqmqYSBlyORY1TDnaV0hxnVEo1swIgENGqqpkzAZKeuv", - "jgSHJae2zwVleUmBNwtRTuotDPKbEzY18ku3z0LJLMyUjS4csy3ptTy8JSShDhhGMQ4ECtVOlLuVJToL", - "mMY6vppX52vkW/xliVkpZGaJs1Rf0sQ1XKnsAKVSFkGBpFqzFksR56aIYjAclBUV7sW0Wu/n6ytrSL1g", - "Ofy38bU7C0aVz5roqtiCkesnKRnGjAFqzLB/xa0SYqbstsbctQDiGrjR9btHUMC30h/Jowbuo8whN6Ut", - "+elFWRzLjZCAoQQRXRALY1VkWRIsjOO+hlsJQoe0qhF7ff/OU6kTkFtfOGSpK+QukGJ4OTEHUORpyBhd", - "obgh642QU9rV4brIn3O72iP/KmMqqAVhEveRdQYGU1jcLAtLoRCIqYIMrZP8wPiGl3D9/xGjaTdUN54T", - "+DWLY0Pvknl9t1/z5BCNgKTEgr8kFXHHrUPCMReIBI4UlpJRRDAag1xsYWLsMJWV0jU8lEmBGakbSMVs", - "AHKeMUmr1bPJBHWhQE7nKSYRlEnvNcSsKfZ3xvn6MyOwGzPrATOxZAiG1RKq/bomUwjTL0j8BZQYc9Np", - "w+LEO/P0wDm1fqNzah8FnJCArUcBlhDyEIBUbLM5FEG1WnPaLPKy55Im6JJRgv9VLKXmAOhPFGTqJ8kP", - "3zJIBFZLuSu00rgn+uobuTUOqxc13NZFyTLqmkgDZ0ZiljZSZ9rYvCHyvI9luPiq/ZXkXmMJ80bfJdzR", - "QrNeDeA6OLXFfCrD72EUNlyrf8Eve7sXpU3TDKzVvN1yhcleFEx2D/ZGu6+Cl6PpFL0cwYMXe6ODYDJ/", - "tR++eB3tTQ6no5eT/en+7t5w8mL/5X64F1jDX+292B3tTvbC+e7+QRjuhYfT0fTlxNnnoloUZfWtUA/K", - "6jTfmymtImjfGR7YTCC7JbTsO/yKlekBZcRQDKXuaK9+laKzMFoCc8ZdllxdW95oi2zteeoyt2pxe5Fc", - "31Fvs9ai5K7ohA2H9xjyGGlunZ4LmqYqelCW8fxqbos4/Qunre2vPNNGvaB2fN828XlPn7+mPdVDNUFO", - "vw6RIR/3S1zx1oR9T7q0fWRP/GQIrnEcBpCFeWCg6vzOR7/cMSreSNz5ouWirDloOmE9YBVOWFuTTpa6", - "8OkJ4dHDJfXc52GEFHFdZ2yiNPmOee1YprfEYM8FfBq5hp7+nVkcvmsLSsswTTtOH1WZxWbKKm5T7bCh", - "UgBn8r/AiffUUZJK/vCmMukVYtcMC7RW1rZ4S1vbwqxS/NF9ladctxt032W7COJY9Xnhl834VEs5gfNG", - "XSFOu1s45QKsnNQpu+pKJQsCxLkH3PWK05pzDZvYcAGl73fda1ep/mJIL/7ADaJq7VfaUqUt7oa/rqJ5", - "0OWK3os85sYOB7n2EtTUevC2blRdid5b1IF0VX7UehXe/21eb7e9jV7nvVGhHyGFcXxEA0fA7ugD+JQi", - "8ubzCTj69E6KXBYPDgddjeJGUnmOtEmLKTF947R/EVFF4liojTcWyJMwh4MDiUCVnUgRgSkeHA721E9S", - "4oulgnYMUzy+mo5NU4JxPr2xl4p+QSehWuvN55Nqzx2VpNSSVc23O5mYiF9evQxTHSqW2/gn14W7pR3V", - "2tjT3d1HYb2mFrUgU4fIsySBbDU4lHsARXcfElHAs2AJIAeVlj8CLrjVjmfwRdVB+navhU8dAYoN39Jw", - "dW97bzYPamzaLAvmct2bR3wOmcJZ5Sh2nIi/GTboUSeYeV+SLFslPQxhOloztaFlONi/RzAa7b4cS2t1", - "3sIYVhfXXHGtczDj7/oP5RHeaPkXI20HOk7qUxTFmCCNto8625RCBhOkT/mPRvrLAi/3yVUPBCiWg1wR", - "DCwYBrYY16lvV3zT3yz5S4Nw9h12+CM7UarxWuvJ2+sgc4OhJ4eVfbwehsMcfcO2jMOsXsJrcZg5mPF3", - "Y4WtxWHGeuzBYTZ4fg6zYHjaHFbtDN16kGGykwPn5Kz3SBzR4H/OP330sFIVLDlXcXmtSW4hDYBaroQq", - "pEENImOjtoDz94sPp73AkQM7wFkKnSD3gaOdvG7RU3bf6yJmyV/5JSZ1Hba4F6Bo+luG2MoiaiyWs2KE", - "g4jdpVM3Q8cXAlaAIZEx3Z9El2mNTGuCvL7eBULlRv46MHzZrPR1NDx0cIp9azTOa9JrdFAfUtJD7uMr", - "H437zt/uYL0pY9vRJHt9g3t6b/AUMZFHr+d0dzcASZiXJkJA0LV96q4Db8qA8Xcrs9Ct5Y7Uw4IoWmXC", - "IqZz1SMmI/hbVr3q7Fd41URHL4XnvWrWFBgR1ZeWaJpDAmNu+rHkl+1VQMeUU7hEh5rjjjJjCxSvpgMA", - "u2hq2EeHbCOtPIxO26Q+aZFnRRvcfSctGsxTASL15Y2mfmkjiK4wztbQxJfN6D1XGP+mGgiV4N78GNJ4", - "ZHLIRLHgXXXbONTfmlBBcL/ZY75IsV0k2uUzPDrdopF8D4dadltoOVP94YfnI93kkRZm6F1PVLlk6zHr", - "Wd507WmqE9dHdG6MPtlWyVB2vYoyovsm5peu7ofA1hAcT5y8HJ/N2VbqMkJq48RV9HNpoa2yYejTJa1m", - "09T+ZvDjpjRFAZVej+vTkvVJ2R4utu6M1ydYuwHS8feV2ayDW+0GuCUJqrx1ji5e9QVn+5LH+Lv+o4zg", - "9SAWVfP9+Ghl2FLg61m+3HvP5Z31vxul0uqN/O0iUl3/fHsaLbqK9JFgRS+px6MNWy/OPEguqPYtoC0h", - "H/uDxfYHvO9uYQkGCY/MV5T95tWFGfbUY43Ncta/iomVE0IhqiiAusm/rhXooC6d4umSTPmn0DoJSNI8", - "5JcPmf0296bmK71y3ubJtWb+rK/CKtpqta3q4I/6svVOa8O1wtOWztywqG188c5BhArJsekY+XgEbQFV", - "Se66mr5Pev9C9+nZXHLfvi7wI1P7rs8/bVGev/j4UfWE6+JsHOQfZucd6tF8wX2T5+/6Or5j3zjSNIw5", - "wCTNhG4YbGSpbp6e70q3ziw/v64bb1MGrnCAwBViHG6UiNyfvd8CMrpQBVIKy8R0HzU90mkEYL3xfAOp", - "Oz0oL7871k+l5rfDHqCedctFe3E5704y/qK82bcJXjd3un6cePcB8EjleeVk12GusW4y0yHcT9SgBzr3", - "+h3V9clgd0PwbI98Nq2Dbk8W31WLvHVq+GrUsZZ3bHfpc7jFBSw9nWJfe7+trpvz36yuC/DeynJ7jmny", - "5AR7U1+3Hbm3QK68Y/186FtTmtb33Bvy+3ZS+7FSRFuxtYIBXSECcKS61AOezXO3jxW9ip7LrX2efg81", - "sTV08QCx0h8hnWpO5L6vM15LUbX/9LtKqh8zAWy0ivpuAcbJUw8wFtXVPQOMlsry5OfyHnx5f80+4aBK", - "306+NYLswYsjnDkW3SXadCce+Ioefuk/o24k3T6hGvPLw+fEm9SydZlxlauzqysgCU03WvMDo5kwd9Fw", - "5WLx7bmydy1ZUUX2diVx/YaEt8ugPxGmfK5ua6Nvd4nbnal4zZK3otjtmaSfi/C2lpeclXj3zEryvXmM", - "1gxJzGN0LlgWiIw989Rj46mhv6OtD+U5BfTGuftbUdsfvq9wHrdIfN3gzDOHPHPI9Mc4S1Xi235nqZUN", - "/VGyIjzzzIprL/5UGPH+Q5RWULDOh3+tWmzNcWuqzXarVcDOOpfiu9ZPLPLd+J73tt7HVYd8y+Bzv5tF", - "1ocMt1DYFy3Nt722fksvMZlrFZp61qNOmnYKL/2Z+icnu6pf599e0UVTv+RSHx9hV/mJVpvPr2i2E9IE", - "YqJazw8kqs0Eblkw6Op2H9Kgd4t709N+/C3DweVISeCRLksdlV3BKjJm4LLM1LY3C9U1FstRmFjwqGWb", - "0ORdYItx+Q83X27+HQAA//9o+0ohqLUAAA==", + "H4sIAAAAAAAC/+x9bXPbOJLwX8Gj5z7sTEmWZDtO4qv9kMSerO+cl4o9tbe1lWMgEpSwJgEGAO3Rpvzf", + "r/BCEiQBkrItx5pkP+w4Igh0N/odjea3UUjTjBJEBB8dfxvxcIVSqP58lSAm3kECl4hd0owmdLmWv2eM", + "ZogJjNSoFeVC/hf9AdMsQaPj0Xz/+d5sb7Y3H41HYp3Jn7hgmCxHt+NRRll9+MvZy4NyHCYCLREb3d6O", + "Rwx9zTFD0ej4n3oR8/LncjRd/AuFQs76Jsm5QOwdlP/fhhFGkfo1QjxkOBOYktGx+hVxDmgMxAqBMGcM", + "EQFSNQkgNEKjsQut4xf7R07cYIKvUXsdShJMEOACityshrlZxl5BsByVsy4oTRAkctoEwQg54Mfcnknh", + "YIYOmJTAFNW3TU/jQKyxF+rNAtkSurEmcsfm+FkISkYLUs1pgbDG/QdD8eh49P+nFZNODYdOnex5Ox4t", + "GYwhgYPneavH21NoUpQzBAnWPI4FSnnffJoJ7ekMRSBjUP07YzRFYoVyPhjIj+Ur9sQ3lF3dGc6/q5f9", + "cN76t1K/+t3kbEFzEgWc5ixEQcHI9TXVEKCHADmklDtNs/ay6Zp/TSazrgUFXPqXkg97F1FjXSu0xVFP", + "MVwcJenrkLoI5ZRPSq4RkzwL+dUn9DVHmovqeysgv+pjKTmBYiTIr4KQkhgvgxgnDqLph0A+BJiANUwT", + "EFOWQgFWQmT8eDqNaMj3MkyWIcz2QppO/72aChwtplzARYKmcpGJnidnUM47kdNN4jxJ9pxk68OcZ5Rw", + "9KdE3eYYhY4DUidvMAQFulAc5GUNzWB9FNKTWGrLx/OTfqY3K/ohfiBWdlHOtegJ5nJjPqEErq1lG3ow", + "lH8AQQEXNAMQMDkcMDN+3IDSolKp2Pv1+XuYonM52snwJ3maXSg/pA1e5Z9EeZqBnOA2THLZBAkUBYoR", + "1W+ad0fHo4jmiwRVe0fydCF9ufEIcYFTKFAgqIBJwOjN0DdjTDBfoShYrAXa+KUNFtKQObDCRBwdjno9", + "1Nr74zahWqg0wXRTycVsp2QzXoNM9DKbehosMEnoMlgKHDn5gwlMluDt5dlJYczzjAuGYAr0qzVjh17C", + "eRzu709QOHsxmc/Ry8liH4aT2f7hPgzn89lsdnA8nzx/cfhyNB6RPEkkXg2XtTKRNRDdVr8EUeqzyup3", + "g6kN/wKTvZn83/5wWCJsvJ0Y5onklb2pfqCXqMMmwYgwQ6GgbA1uVoghBZrel4QuAeZSMUh+GgDBNrTD", + "KWOU/R2L1TvEudPXkSyj7A1AcmyLjdSvQSjdnta76hkItUvUlKaxeTXlS9+bqQGqzzZUE41teFyS9BYJ", + "49GekZj6HYBQDwpcYmGeASy3rdQauU9tSE0zzOVvhk1NPC2gunHTAYncdj+GERRwcORQj7YdAY5SYMrS", + "DlCaUlLk6t1IaP59eCRMKLNlJLTv84DQV87U9sHWDsODAm58kC2DL324B6R56eJvGeR3eMmUC8uWSPAH", + "BL428WNg8rCcky+qOR8D+ktpgC8Ey0ORM+THQgMYhCrwCPjXpB7UvPl0+uryFFy+en1+Cr6I+Rfwly84", + "+gIwEX+Zz38B7z9cgve/n5+DV79ffgjO3r/5dPru9P3l+OOns3evPv0D/PfpP/Qbv4Dpr5f/759G76Mo", + "wCRCf3wGb85/v7g8/XR6An6d/gJO3789e3/61zNC6MlrcHL626vfzy/Bm7+9+nRxevnXXMQv0sUhePPh", + "/PzV5Wnxb+lWudISBrV2pBYtnIkS5ew6hqvf5wMi0/L1Yi6Lqs6taiTvHjw9fTCbze6dnj6nMOoPuxIK", + "I3fY1REF+f2MFAlo3GVLKCpUreelx9+mB6NLhjh3PtRxynCYGlRrBUT2fNbSdVQcgLtI3sjC3pcvfOny", + "QTz0Yv9o3ksNw/V9rPRBeeCoO2EVrlB4FTDEVVjS5LiMoYkaAcwIOxqqHmIOMsg5ivaAW9Tvk0QZ12Hs", + "wbSpiXuDXh2nIKB0iDfojZOcr2oRnA626rP+nWGBuIrVNF5yAZXIlhhkFBMBuPwFCnDyDoSQaEnGAsBY", + "RgYMlXGpfK1Iv7WOZPjXJAgpEYg4cONfE7CmObiBRFgY1vbOYWnAl3BemZrCGkhzMwZfwn3/owP3o3vY", + "l/90Gpg1CdvI/p5FsKA5zQROMRc4BHwFWSTJKDWAtN7gBouVzribraEkWYOco0hG2ARAE6gCGoY54wAT", + "75wnJ+cgrQWn5dY0k4/WPrkY13FWs41T0/ubpY85cwX5VUYilPjnGchogsM1qGWc27H/Hxlmxq8r5GnW", + "FCY1SGcQBNb5mXI5O34uTIgnD2KZOfknu9aOX7nuwdGstfTlCoFisJSgDDFMIxzCJFkDo/LidkpGoxWN", + "gZkcXMMkR8dALSEZiqOQkojfDXqGUohJwDMYohoG82dN+N9hgtM8BTFDCESYXwH1loLh7eu7LH/r44kH", + "zWM/Yt6uL09XWzNDIY7XBnieL6zsXEwZaIG9B85iQKgA+k0seUKduUtVJQAlCNzgJAELpBTQHrhQkJqz", + "nWOwD9Hzo8ODw0n8/GU8mc/Ri8kiQvtFOlQ6mi80KvP+BGBD0ts0dsm72tY3Sojb9FAWTR9NFULZFnGV", + "eQ70w8ojtGzYzzzyTuWRb31c0h+t2Gq7ziWmeqIKPepTNGhYHIRqMdGGpSLqXxpUnY/B/OXzl7+4hL22", + "rof5XDx3D2brZi43CJpwRRWEBOjhAQihCFdBngVpWRFVB+JmJT0UJpW4GgvyTDtT5e5Y4ZdPzJ16dTP+", + "rPDem/J8oaZ0uYnu0ouCiJora9N9ygmRL/dpzjqzOpnIRte1wz6iF2C7VPGFclfL45i2nGl3Vukedbwz", + "rpJk/VmYRmLsAoU5w2LdXkY50aZKhvOk7uFp8xZjlESlZVvhKEJEO9dLJMqgxp6oNgmIGU3VEOV7xdLP", + "aaulRviKmAhgktAbFAUhaYP9hqYpJeC90cwXF+dAvoNjHEKdPCiJ1UsczpMghP7Ay5pYq6pipM1tTp6V", + "E0tMvFP/Zk0n8fh4+s54C9P/eTZ7WVSFNFDrX/UKrf2LvqnWk7uSMXwtUbtC67IkxVq8Z71mZFSnpYMG", + "bQCd0mGCsreM5pkjbRwl7VK33o2OMeMiSGiorYzrFRmNomizaYXOpruG5mTzCVvJEjX7uMK5hUgJtrWg", + "k6hllU5D1ejf3b5ezS+JYcJb6ZHSkqgoXGsAGTap12sq3rzetibGrazM5aD1qHSztRMpo7lcKiillbnW", + "OS7z7gUhTuA1dVgz/XtZ11fSquH2uSSxCPFd1AamJtJd+OjMAEDObyiLvDOWA+pTHhw+OxriiRYZBvfc", + "8qE178HB7MgVzWZFQqGzlFUNqlyVMh7peskOXaSgWhat88yoGCff6aoXtSpFB1eFaq9js6Lb3uNPyK+G", + "F3VcQn5VlXSMRzl3+XoGN/mwhR+jVAystgscGWqzZF2Ei391aKEOx8cq2fU7PnrUZJj3Y5Pct17pQbrq", + "WfqLUrRDxFXeT7pEN4y6fM+C53kJTC/PV6xyD/5lKEtwCD183CjHbGfNTDWz8baTNdAlzyYN7tCJG9Zx", + "FpxlA+LkHRmWd1Z2MpTSaxSkSJ8rD7Yk+j2VV1au7AJy5QlF9IaYeKj42Z26hzEKUhqhQOAUBVGRI21H", + "RzhFoHgszYp8s8g7W3p7xp0apyLXIP3QEDats5hQQDpgg/zKlAuqATZA+7PZ0WQ2n8z2wfzZ8ezwePZs", + "WIn1haBZ55bdHycJLM3FYKrfQKzjFo0vzeqkf8YHYlarR2g7qXmaDRR0qyp3g0K4wTonoTAaCIl1UG0d", + "ejrYpDij7+DQPiXlD/L7TN6FGmj89YGYXaxJWGGmTtndmMlHQMFmc4U6pxq7XHyGOE2uURQoD52GV4Hn", + "KL1TzRYXRpykcZ8U+3VnQUqDp1OVVuToyPFJrN0VCSb/oed1ILuQlMBkKaniWsI+dbtZ4XBVJsQwB8XL", + "G8XxrazjwPygw0SHiIhAZEMLLcwBULBAK0wiK+U25N0yQHQYFfmsE6PaCD9Guq4CXRd3PAfAZcrZB9PA", + "koOlDNq79lwPaGw7ZAjkZFLMYm99p1jXMgW90bRNCBvJ2q6PhyUF69vj3IymHLjoZIXvtlD52MolzKo8", + "4r65RF+JVlvSLk3lR1t5+tREjBNJP5brhAKMIizfgsnH2ug+vf8ak3O6/E1N9knO5TLLiKwgCVGgb90G", + "RXHeCpIl6q31sFxCHcMAnmcy0lFHgqp0QF/mjaIEZEm+xGTIZVu8JJShQB0yS2Yoyd+40KuGgYwhcxyt", + "hjl36xoxrpM//YoRCWjIUD9litKJcpSbRHA4vQp9Ligrqi+8BzbVpN4aKr87YXMjv3KHd5QEUa7CGeGY", + "bUVv5OatIIl0bjVOcChQpDBRkWme6gPTLNGp6OIigya+JV+WmpVKRrn37uOOG7hWBymUSl0EBZJmzVos", + "Q5ybepPReFQVn7gX02Z9WFpEeUPqBSs3cpe0RG9trQrvU11AXApycyelwJgxQI0ZDy9OVkrMVCg3hLuR", + "a92ANrrU+QQK+FqGbkWCxb2VBeRFNGZ2L86TRCJCQoZSRHTtMExUPWrFsDBJhjpuFQg92qrB7E38nbvS", + "ZCC3vXDoUtfphEBK4OXEHEBRnNgm6BolLV1vlJyyro7QRf5c+NUe/VcbUyMtiNJkiK4zMJga7HYFXQaF", + "QEzVrmib5AfGN7yC639PmIod+zP6zh34LU8Sw+9SeH0Xha1cgeTEUr4kF3HHBU3CMReIhI7TPqWjiGA0", + "AYXawsT4YeoAT5c7USYVZqwua5WzAch5ziSv1vcmF9RFAjmdp+5GUCaj1wizttrfmxbrB0Zht2bWAwKx", + "YghG9Wqzw6YlUwTTL0j6hZQYd9Ppw+LUO/P8yDm1fqN3ah8HnJGQbcYBlhLyMIA0bMECirBe2Dpv18PZ", + "c0kXdMUowf8ul1JzAPQHCnP1k5SHrzkkAqul3MVsWTKQfE1E7kzD+p0Wt3dRiYy6UdOimdGYlY/Ue8Ju", + "3hDFEZnluPguRijNvcES5o2hS7gTq2a9BsBNcBqL+UyGP8IofbjO+IJfDQ4vKp+mnVhrRLvVCrODOJzt", + "Hx1M9l+EzyfzOXo+gUfPDiZH4Wzx4jB69jI+mB3PJ89nh/PD/YPx7Nnh88PoILSGvzh4tj/Znx1Ei/3D", + "oyg6iI7nk/nzmbMlSL1+zGrxoR5UhXy+NzNaJ9ChMz2wnZx/Rxbet/k1L9MDyoShBErb0V0oLFVn6bSE", + "Zo/7PLmmtbzVHtnG8zR1bt3j9hK5idFgt9bi5L7shA2HdxuKHGnhnV4ImmUqe1BVPP1mLtY44wunr+0v", + "0tNOvaD2UYjt4vOBMX/DeqqHaoKCfx0qQz4edsbHO2sbBvKlHSN78idjcIOTKIQsKhID9eB3Mfn1nlnx", + "1hmnL1suqvKMdhA2AFbhhLXzfM4yFz47ITx2uOKeh9yMiCKuS7JNlqbAmDe2ZX5HCg5cwGeRG+QZ3sTG", + "Ebt2kLRK03TT9ElVpGynAuUuhSFbqppw1kmUNPHuOkozKR/e81J6jdgNwwJtdMBdvqW9bWFWKf/ov/VU", + "rdsPuu9eYgxxolri8Kt2fqqj8sJ5+bBUp/3drgoFVk3q1F1No5KHIeLcA+5mdXztucZtariA0lfhHrQB", + "13A1pBd/5F5ajU41XUelHeGGvwSlvdHVit47T+ZyEweF9RLUlMXwrsZdfQe9dyiZ6SuSabR1fPiLz97G", + "hFu9+XyrUj9CKuPkhIaOhN3JO/AhQ+TVxzNw8uGNVLksGR2P+nrqTaTxnGiXFlNiWuzp+CKmisWxUIi3", + "FigOYY5HR5KA6nQiQwRmeHQ8OlA/SY0vVgraKczw9Ho+Nf0bpsX0xl8qWyudRWqtVx/P6u2J1CGl1qxq", + "vv3ZzGT8ikJvmOlUsUTjX1wXwlR+VGcPVHcjJEX1hlnUikxtIs/TFLL16FjiAMpGSCSmgOfhCkAOat2R", + "BFxyq3PR6LMqGfVhr5VPkwBKDF/TaP1guLf7LLWQNsuChVz39gnvQ65oVtuKPSfhb8ctftQHzHwoS1Zd", + "pR6HMR1drLrIMh4dPiAYrc5ojqW1Oe8QDKvhbWG4NtmY6Tf9h4oIb7X+S5D2Ax079SGOE0yQJtt7fdqU", + "QQZTpHf5n63jLwu8IiZX7SKgWI0KQzCyYBjZalwffbvym/6+0p9bjHPo8MOf2I5STddG++JBG1k4DAMl", + "rGp59jgS5mixtmMSZrVd3kjCzMZMvxkvbCMJM97jAAmzwfNLmAXDjy1h9SbanRsZpXsFcE7JeovECQ3/", + "6+LDe48o1cGSc5X3/NrsFtEQqOUqqCIaNiAyPmoHOH+7fHc+CBw5sAecldAH5D5wdJDXr3qqRoV9zCzl", + "q7jvpW4Ol1coFE9/zRFbW0yNxSooRziY2F06dTt2fExhDRgSOdOtXHSZ1sR0cSiuIrhAqDUv2ASGz9vV", + "vo7ekA5JsS/YJkUH1wYfNIdU/FDE+CpG4779t5t9b8vZdvQT39zhnj8YPGVO5MnbOd0ID0ASFaWJEBB0", + "Y++6a8PbOmD6zTpZ6LdyJ+phyRSdOmGZ0IVqp5MT/DWv3wr3G7z6Qccgg+e9lddWGDHV97toVkACE25a", + "1xR9CVRCx5RTuFSHmuOeOmMHDK/mAwD7eGo8xIbsIq88jk3bpj3p0Gdlx+BDJy8aylMBYvWRkrZ96WKI", + "vjTOzvDE5+3YPVca/7aeCJXg3n4f1nhieshkseB9bds00p/lUElwv9tjPt6xWyzaFzM8OduiifwAm1o1", + "pujYU/2NjJ9bus0tLd3Q++6oCsk2E9ZPRX+6H9OcuL43dGvsya5qhqpBWJwT3WKyuHT1MAy2geL4wdnL", + "8YWhXeUuo6S2zlxl65sO3qp6q/64rNXuLzvcDX7anKY4oNYWc3Nesr6+OyDE1k0EhyRrt8A6/hY82w1w", + "640Td+SAqugypItXfcnZoewx/ab/qDJ4A5hF1Xw/PV4ZdxT4epavcB+4vLP+d6tcWr+Rv1tMquuf786j", + "ZVeRIRqsbLv1dKxh58WZRzkLanw2aUfYx/62s/2t8/t7WIJBwmPzwWm/e3Vphv3oucZ2OeufxcUqGKFU", + "VRRA/T0EXSvQw136iKdPMxVfjetlIMnzkF895um3uTe1WBfNy3SbJ9eaxbOhBqtsq9W1qkM+mss227mN", + "N0pPWzZzy6q29XFABxMqIiemzdzTUbQlVBW762r6Icf7l7pPz/YO9+3rAt/zaN/1pawdOucvvxNV3+Gm", + "OpuGxTfseY95NB+73+b+1z+n72UBHGsexhxgkuVC91Y2ulT3mS+w0l1Gqy/V6x7llIFrHCJwjRiHW2Wi", + "Bkq7w0aXqkBKUZmYRq2mnTyNAWz26G8RdW8A5xV3x4aZ1OJ22CPUs+64ai8v591Lx19WN/u2IevmTtf3", + "U+8+AJ6oPq/t7CbCNdVNZnqU+5ka9Ej73ryjujkb7G8Jnt3Rz6Z10N3Z4ptqkbdJDV+DOzaKju0ufY6w", + "uIRlYFDsa++303Vz/pvVTQU+2FjuzjbNfjjF3rbXXVvuLZCr7lj/3PSdKU0buu8t/X03rf1UOaKr2FrB", + "gK4RAThWXeoBzxdF2MfKXkU/y619kf4AM7EzfPEIudLvoZ0aQeShrzNeR1G1f/f7SqqfMgNstYr6fgnG", + "2Y+eYCyrqwcmGC2T5TmfK3rwFf01h6SDan07+c4oskcvjnCesegu0aY78chX9PDr8Bl1I+nuCdWYXx//", + "TLzNLTt3Mq7O6uzqCkgi043W/MBoLsxdNFy7WHx3qRxcS1ZWkb1eS1q/ItHdTtB/EKH8Wd3Wxd/uErd7", + "c/GGJW9lsdtPlv5ZhLezsuSsxHtgUZLvLRK0YUpikaALwfJQ5OynTD01mRr7O9r6SF5wwGCau78Vtfvp", + "+5rkcYvFN03O/JSQnxIy/z7BUp35dj9Y6hRDf5asTM/8FMWNF/9RBPHhU5RWUrAph3+uWmwtcRuazW6v", + "VcDeOpfyE+A/WOa79enzXb2Pq78vfrfk87CbRdaHDHdQ2ZctzXe9tn5HLzGZaxWaezbjTpr1Ki/9Lfwf", + "TndptHdfddHMr7nUx0fYdbGj9ebza5rvRTSFmKjW8yNJajOBWxeM+rrdRzQc3OLe9LSffs1xeDVRGnii", + "y1InVVewmo4ZuTwzhfZ2obrBYjWJUgsetWwbmqILbDmu+OH28+3/BQAA//8+kDDk07YAAA==", } // GetSwagger returns the content of the embedded swagger specification file diff --git a/dm/openapi/gen.types.go b/dm/openapi/gen.types.go index 5a3bd34fcc7..25731a6c3ff 100644 --- a/dm/openapi/gen.types.go +++ b/dm/openapi/gen.types.go @@ -404,14 +404,23 @@ type StartTaskRequest struct { // whether to remove meta database in downstream database RemoveMeta *bool `json:"remove_meta,omitempty"` + // time duration of safe mode + SafeModeTimeDuration *string `json:"safe_mode_time_duration,omitempty"` + // source name list SourceNameList *SourceNameList `json:"source_name_list,omitempty"` + + // task start time + StartTime *string `json:"start_time,omitempty"` } // StopTaskRequest defines model for StopTaskRequest. type StopTaskRequest struct { // source name list SourceNameList *SourceNameList `json:"source_name_list,omitempty"` + + // time duration waiting task stop + TimeoutDuration *string `json:"timeout_duration,omitempty"` } // SubTaskStatus defines model for SubTaskStatus. diff --git a/dm/openapi/spec/dm.yaml b/dm/openapi/spec/dm.yaml index 0f88624b28e..a8a88239cf9 100644 --- a/dm/openapi/spec/dm.yaml +++ b/dm/openapi/spec/dm.yaml @@ -2173,19 +2173,21 @@ components: description: whether to remove meta database in downstream database source_name_list: $ref: "#/components/schemas/SourceNameList" - # start_time: - # type: string - # example: "2006-01-02 15:04:05" - # description: task start time - # safe_mode_time_duration: - # example: "1s" - # description: time duration of safe mode + start_time: + type: string + example: "2006-01-02 15:04:05" + description: task start time + safe_mode_time_duration: + type: string + example: "10s" + description: time duration of safe mode StopTaskRequest: type: object properties: - # timeout_duration: - # example: "15s" - # description: time duration waiting task stop + timeout_duration: + type: string + example: "15s" + description: time duration waiting task stop source_name_list: $ref: "#/components/schemas/SourceNameList" UpdateTaskRequest: diff --git a/dm/pkg/ha/task_cli_args.go b/dm/pkg/ha/task_cli_args.go index 4b9d3483bb7..6f6406bf611 100644 --- a/dm/pkg/ha/task_cli_args.go +++ b/dm/pkg/ha/task_cli_args.go @@ -84,9 +84,15 @@ func DeleteAllTaskCliArgs(cli *clientv3.Client, taskName string) error { } // DeleteTaskCliArgs deleted the command line arguments of this task. -func DeleteTaskCliArgs(cli *clientv3.Client, taskName, source string) error { - key := common.TaskCliArgsKeyAdapter.Encode(taskName, source) - op := clientv3.OpDelete(key) - _, _, err := etcdutil.DoTxnWithRepeatable(cli, etcdutil.ThenOpFunc(op)) +func DeleteTaskCliArgs(cli *clientv3.Client, taskName string, sources []string) error { + if len(sources) == 0 { + return nil + } + ops := []clientv3.Op{} + for _, source := range sources { + key := common.TaskCliArgsKeyAdapter.Encode(taskName, source) + ops = append(ops, clientv3.OpDelete(key)) + } + _, _, err := etcdutil.DoTxnWithRepeatable(cli, etcdutil.ThenOpFunc(ops...)) return err } diff --git a/dm/pkg/ha/task_cli_args_test.go b/dm/pkg/ha/task_cli_args_test.go index 160268e2da5..f1436f3c501 100644 --- a/dm/pkg/ha/task_cli_args_test.go +++ b/dm/pkg/ha/task_cli_args_test.go @@ -62,7 +62,7 @@ func (t *testForEtcd) TestTaskCliArgs(c *C) { c.Assert(*ret, Equals, args) // test delete one source - err = DeleteTaskCliArgs(etcdTestCli, task, source1) + err = DeleteTaskCliArgs(etcdTestCli, task, []string{source1}) c.Assert(err, IsNil) checkNotExist(source1) diff --git a/dm/pkg/terror/error_list.go b/dm/pkg/terror/error_list.go index c47dcf68278..f054d077c68 100644 --- a/dm/pkg/terror/error_list.go +++ b/dm/pkg/terror/error_list.go @@ -915,7 +915,7 @@ var ( ErrConfigNeedUniqueTaskName = New(codeConfigNeedUniqueTaskName, ClassConfig, ScopeInternal, LevelMedium, "must specify a unique task name", "Please check the `name` config in task configuration file.") ErrConfigInvalidTaskMode = New(codeConfigInvalidTaskMode, ClassConfig, ScopeInternal, LevelMedium, "please specify right task-mode, support `full`, `incremental`, `all`", "Please check the `task-mode` config in task configuration file.") ErrConfigNeedTargetDB = New(codeConfigNeedTargetDB, ClassConfig, ScopeInternal, LevelMedium, "must specify target-database", "Please check the `target-database` config in task configuration file.") - ErrConfigMetadataNotSet = New(codeConfigMetadataNotSet, ClassConfig, ScopeInternal, LevelMedium, "mysql-instance(%d) must set meta for task-mode %s", "Please check the `meta` config in task configuration file.") + ErrConfigMetadataNotSet = New(codeConfigMetadataNotSet, ClassConfig, ScopeInternal, LevelMedium, "mysql-instance(%s) must set meta for task-mode %s", "Please check the `meta` config in task configuration file.") ErrConfigRouteRuleNotFound = New(codeConfigRouteRuleNotFound, ClassConfig, ScopeInternal, LevelMedium, "mysql-instance(%d)'s route-rules %s not exist in routes", "Please check the `route-rules` config in task configuration file.") ErrConfigFilterRuleNotFound = New(codeConfigFilterRuleNotFound, ClassConfig, ScopeInternal, LevelMedium, "mysql-instance(%d)'s filter-rules %s not exist in filters", "Please check the `filter-rules` config in task configuration file.") ErrConfigColumnMappingNotFound = New(codeConfigColumnMappingNotFound, ClassConfig, ScopeInternal, LevelMedium, "mysql-instance(%d)'s column-mapping-rules %s not exist in column-mapping", "Please check the `column-mapping-rules` config in task configuration file.") diff --git a/dm/syncer/syncer.go b/dm/syncer/syncer.go index 0db696b65a7..b48638c2fe6 100644 --- a/dm/syncer/syncer.go +++ b/dm/syncer/syncer.go @@ -1470,6 +1470,10 @@ func (s *Syncer) waitBeforeRunExit(ctx context.Context) { s.tctx.L().Info("received subtask's done, try graceful stop") needToExitTime := time.Now() s.waitTransactionLock.Lock() + + failpoint.Inject("checkWaitDuration", func(_ failpoint.Value) { + s.isTransactionEnd = false + }) if s.isTransactionEnd { s.waitXIDJob.Store(int64(waitComplete)) s.waitTransactionLock.Unlock() @@ -1498,6 +1502,17 @@ func (s *Syncer) waitBeforeRunExit(ctx context.Context) { s.runCancel() return } + failpoint.Inject("checkWaitDuration", func(val failpoint.Value) { + if testDuration, testError := time.ParseDuration(val.(string)); testError == nil { + if testDuration.Seconds() == waitDuration.Seconds() { + panic("success check wait_time_on_stop !!!") + } else { + s.tctx.L().Error("checkWaitDuration fail", zap.Duration("testDuration", testDuration), zap.Duration("waitDuration", waitDuration)) + } + } else { + s.tctx.L().Error("checkWaitDuration error", zap.Error(testError)) + } + }) select { case <-s.runCtx.Ctx.Done(): @@ -2196,7 +2211,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { } // set exitSafeModeTS when meet first binlog - if s.firstMeetBinlogTS == nil && s.cliArgs != nil && s.cliArgs.SafeModeDuration != "" { + if s.firstMeetBinlogTS == nil && s.cliArgs != nil && s.cliArgs.SafeModeDuration != "" && int64(e.Header.Timestamp) != 0 && e.Header.EventType != replication.FORMAT_DESCRIPTION_EVENT { if checkErr := s.initSafeModeExitTS(int64(e.Header.Timestamp)); checkErr != nil { return checkErr } diff --git a/dm/tests/openapi/client/openapi_task_check b/dm/tests/openapi/client/openapi_task_check index 38ea1240482..46f3d2473f7 100755 --- a/dm/tests/openapi/client/openapi_task_check +++ b/dm/tests/openapi/client/openapi_task_check @@ -147,6 +147,66 @@ def create_noshard_task_success(task_name, tartget_table_name=""): print("create_noshard_task_success resp=", resp.json()) assert resp.status_code == 201 +def create_incremental_task_with_gitd_success(task_name,binlog_name1,binlog_pos1,binlog_gtid1,binlog_name2,binlog_pos2,binlog_gtid2): + task = { + "name": task_name, + "task_mode": "incremental", + "meta_schema": "dm_meta", + "enhance_online_schema_change": True, + "on_duplicate": "error", + "target_config": { + "host": "127.0.0.1", + "port": 4000, + "user": "root", + "password": "", + }, + "table_migrate_rule": [ + { + "source": { + "source_name": SOURCE1_NAME, + "schema": "openapi", + "table": "*", + }, + "target": {"schema": "openapi", "table": ""}, + }, + { + "source": { + "source_name": SOURCE2_NAME, + "schema": "openapi", + "table": "*", + }, + "target": {"schema": "openapi", "table": ""}, + }, + ], + "source_config": { + "source_conf": [ + {"source_name": SOURCE1_NAME}, + {"source_name": SOURCE2_NAME}, + ], + }, + } + + if binlog_pos1 != "": + task["source_config"] = { + "source_conf": [ + { + "source_name": SOURCE1_NAME, + "binlog_name": binlog_name1, + "binlog_pos": int(binlog_pos1), + "binlog_gtid": binlog_gtid1, + }, + { + "source_name": SOURCE2_NAME, + "binlog_name": binlog_name2, + "binlog_pos": int(binlog_pos2), + "binlog_gtid": binlog_gtid2, + }, + ], + } + + resp = requests.post(url=API_ENDPOINT, json={"task": task}) + print("create_incremental_task_with_gitd_success resp=", resp.json()) + assert resp.status_code == 201 def create_shard_task_success(): task = { @@ -219,6 +279,43 @@ def start_task_success(task_name, source_name): print("start_task_failed resp=", resp.json()) assert resp.status_code == 200 +def start_task_failed(task_name, source_name, check_result): + url = API_ENDPOINT + "/" + task_name + "/start" + req = {} + if source_name != "": + req = {"source_name_list": [source_name], "remove_meta": True} + resp = requests.post(url=url, json=req) + if resp.status_code == 200: + print("start_task_failed resp should not be 200") + assert resp.status_code == 400 + print("start_task_failed resp=", resp.json()) + assert check_result in resp.json()["error_msg"] + +def start_task_with_condition(task_name, start_time, duration, is_success, check_result): + url = API_ENDPOINT + "/" + task_name + "/start" + req = {"start_time": start_time, "safe_mode_time_duration": duration} + resp = requests.post(url=url, json=req) + if is_success == "success": + if resp.status_code != 200: + print("start_task_with_condition_failed resp=", resp.json()) + assert resp.status_code == 200 + print("start_task_with_condition success") + else: + if resp.status_code == 200: + print("start_task_with_condition_failed resp should not be 200") + assert resp.status_code == 400 + print("start_task_with_condition resp=", resp.json()) + assert check_result in resp.json()["error_msg"] + +def stop_task_with_condition(task_name, source_name, timeout_duration): + url = API_ENDPOINT + "/" + task_name + "/stop" + req = {"timeout_duration": timeout_duration} + if source_name != "": + req = {"source_name_list": [source_name], "timeout_duration": timeout_duration} + resp = requests.post(url=url, json=req) + if resp.status_code != 200: + print("stop_task_failed resp=", resp.json()) + assert resp.status_code == 200 def stop_task_success(task_name, source_name): url = API_ENDPOINT + "/" + task_name + "/stop" @@ -230,7 +327,6 @@ def stop_task_success(task_name, source_name): print("stop_task_failed resp=", resp.json()) assert resp.status_code == 200 - def delete_task_success(task_name): resp = requests.delete(url=API_ENDPOINT + "/" + task_name) assert resp.status_code == 204 @@ -607,10 +703,14 @@ if __name__ == "__main__": "create_task_failed": create_task_failed, "create_noshard_task_success": create_noshard_task_success, "create_shard_task_success": create_shard_task_success, + "create_incremental_task_with_gitd_success": create_incremental_task_with_gitd_success, "delete_task_failed": delete_task_failed, "delete_task_success": delete_task_success, "delete_task_with_force_success": delete_task_with_force_success, "start_task_success": start_task_success, + "start_task_failed": start_task_failed, + "start_task_with_condition": start_task_with_condition, + "stop_task_with_condition": stop_task_with_condition, "stop_task_success": stop_task_success, "get_task_list": get_task_list, "get_task_list_with_status": get_task_list_with_status, diff --git a/dm/tests/openapi/run.sh b/dm/tests/openapi/run.sh index f2e95d1caaf..59c4ad0ac5f 100644 --- a/dm/tests/openapi/run.sh +++ b/dm/tests/openapi/run.sh @@ -610,10 +610,12 @@ function test_delete_task_with_stopped_downstream() { # create source successfully openapi_source_check "create_source1_success" + # create source successfully openapi_source_check "create_source2_success" # get source list success openapi_source_check "list_source_success" 2 + # create no shard task success openapi_task_check "create_noshard_task_success" $task_name $target_table_name run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ @@ -637,6 +639,302 @@ function test_delete_task_with_stopped_downstream() { echo ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>TEST OPENAPI: DELETE TASK WITH STOPPED DOWNSTREAM SUCCESS" } +function test_start_task_with_condition() { + echo ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>START TEST OPENAPI: START TASK WITH CONDITION" + prepare_database + run_sql_tidb "DROP DATABASE if exists openapi;" + + # create source successfully + openapi_source_check "create_source1_success" + openapi_source_check "list_source_success" 1 + + # get source status success + openapi_source_check "get_source_status_success" "mysql-01" + # create source successfully + openapi_source_check "create_source2_success" + # get source list success + openapi_source_check "list_source_success" 2 + + # get source status success + openapi_source_check "get_source_status_success" "mysql-02" + + # incremental task no source meta and start time, still error + task_name="incremental_task_no_source_meta" + run_sql_source1 "CREATE TABLE openapi.t1(i TINYINT, j INT UNIQUE KEY);" + run_sql_source2 "CREATE TABLE openapi.t2(i TINYINT, j INT UNIQUE KEY);" + + openapi_task_check "create_incremental_task_with_gitd_success" $task_name "" "" "" "" "" "" + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status $task_name" \ + "\"stage\": \"Stopped\"" 2 + + check_result="must set meta for task-mode incremental" + openapi_task_check "start_task_failed" $task_name "" "$check_result" + openapi_task_check "delete_task_with_force_success" "$task_name" + openapi_task_check "get_task_list" 0 + + # incremental task use gtid + prepare_database + run_sql_tidb "DROP DATABASE if exists openapi;" + task_name="incremental_task_use_gtid" + run_sql_source1 "CREATE TABLE openapi.t1(i TINYINT, j INT UNIQUE KEY);" + run_sql_source2 "CREATE TABLE openapi.t2(i TINYINT, j INT UNIQUE KEY);" + + master_status1=($(get_master_status $MYSQL_HOST1 $MYSQL_PORT1)) + master_status2=($(get_master_status $MYSQL_HOST2 $MYSQL_PORT2)) + openapi_task_check "create_incremental_task_with_gitd_success" $task_name ${master_status1[0]} ${master_status1[1]} ${master_status1[2]} ${master_status2[0]} ${master_status2[1]} ${master_status2[2]} + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status $task_name" \ + "\"stage\": \"Stopped\"" 2 + openapi_task_check "start_task_success" $task_name "" + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status $task_name" \ + "\"stage\": \"Running\"" 2 + + run_sql_tidb 'CREATE DATABASE openapi;' + run_sql_source1 "CREATE TABLE openapi.t3(i TINYINT, j INT UNIQUE KEY);" + run_sql_source2 "CREATE TABLE openapi.t4(i TINYINT, j INT UNIQUE KEY);" + + run_sql_tidb_with_retry "show tables in openapi;" "t3" + run_sql_tidb_with_retry "show tables in openapi;" "t4" + run_sql_tidb_with_retry "SELECT count(1) FROM information_schema.tables WHERE table_schema = 'openapi';" "count(1): 2" + + openapi_task_check "stop_task_success" "$task_name" "" + openapi_task_check "delete_task_with_force_success" "$task_name" + openapi_task_check "get_task_list" 0 + + # incremental task use start_time + prepare_database + run_sql_tidb "DROP DATABASE if exists openapi;" + task_name="incremental_task_use_start_time" + run_sql_source1 "CREATE TABLE openapi.t1(i TINYINT, j INT UNIQUE KEY);" + run_sql_source2 "CREATE TABLE openapi.t2(i TINYINT, j INT UNIQUE KEY);" + + openapi_task_check "create_incremental_task_with_gitd_success" $task_name "" "" "" "" "" "" + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status $task_name" \ + "\"stage\": \"Stopped\"" 2 + sleep 2 + start_time=$(date '+%Y-%m-%d %T') + sleep 2 + duration="" + is_success="success" + check_result="" + run_sql_tidb 'CREATE DATABASE openapi;' + run_sql_source1 "CREATE TABLE openapi.t3(i TINYINT, j INT UNIQUE KEY);" + run_sql_source2 "CREATE TABLE openapi.t4(i TINYINT, j INT UNIQUE KEY);" + openapi_task_check "start_task_with_condition" $task_name "$start_time" "$duration" "$is_success" "$check_result" + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status $task_name" \ + "\"stage\": \"Running\"" 2 + + run_sql_tidb_with_retry "show tables in openapi;" "t3" + run_sql_tidb_with_retry "show tables in openapi;" "t4" + run_sql_tidb_with_retry "SELECT count(1) FROM information_schema.tables WHERE table_schema = 'openapi';" "count(1): 2" + + openapi_task_check "stop_task_success" "$task_name" "" + openapi_task_check "delete_task_with_force_success" "$task_name" + openapi_task_check "get_task_list" 0 + + # incremental task use start_time, but time is after create table + prepare_database + run_sql_tidb "DROP DATABASE if exists openapi;" + task_name="incremental_task_use_start_time_after_create" + run_sql_source1 "CREATE TABLE openapi.t1(i TINYINT, j INT UNIQUE KEY);" + run_sql_source2 "CREATE TABLE openapi.t2(i TINYINT, j INT UNIQUE KEY);" + + openapi_task_check "create_incremental_task_with_gitd_success" $task_name "" "" "" "" "" "" + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status $task_name" \ + "\"stage\": \"Stopped\"" 2 + sleep 2 + start_time=$(date '+%Y-%m-%d %T') + sleep 2 + duration="" + is_success="success" + check_result="" + run_sql_tidb 'CREATE DATABASE openapi;' + run_sql_source1 "INSERT INTO openapi.t1(i,j) VALUES (1, 2);" + run_sql_source2 "INSERT INTO openapi.t2(i,j) VALUES (3, 4);" + openapi_task_check "start_task_with_condition" $task_name "$start_time" "$duration" "$is_success" "$check_result" + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status $task_name" \ + "Table 'openapi.*' doesn't exist" 2 + + openapi_task_check "stop_task_success" "$task_name" "" + openapi_task_check "delete_task_with_force_success" "$task_name" + openapi_task_check "get_task_list" 0 + + # incremental task both gtid and start_time, start_time first + prepare_database + run_sql_tidb "DROP DATABASE if exists openapi;" + task_name="incremental_task_both_gtid_start_time" + run_sql_source1 "CREATE TABLE openapi.t1(i TINYINT, j INT UNIQUE KEY);" + run_sql_source2 "CREATE TABLE openapi.t2(i TINYINT, j INT UNIQUE KEY);" + master_status1=($(get_master_status $MYSQL_HOST1 $MYSQL_PORT1)) + master_status2=($(get_master_status $MYSQL_HOST2 $MYSQL_PORT2)) + openapi_task_check "create_incremental_task_with_gitd_success" $task_name ${master_status1[0]} ${master_status1[1]} ${master_status1[2]} ${master_status2[0]} ${master_status2[1]} ${master_status2[2]} + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status $task_name" \ + "\"stage\": \"Stopped\"" 2 + run_sql_source1 "CREATE TABLE openapi.t3(i TINYINT, j INT UNIQUE KEY);" + run_sql_source2 "CREATE TABLE openapi.t4(i TINYINT, j INT UNIQUE KEY);" + sleep 2 + start_time=$(date '+%Y-%m-%d %T') + sleep 2 + duration="" + is_success="success" + check_result="" + run_sql_tidb 'CREATE DATABASE openapi;' + run_sql_source1 "CREATE TABLE openapi.t5(i TINYINT, j INT UNIQUE KEY);" + run_sql_source2 "CREATE TABLE openapi.t6(i TINYINT, j INT UNIQUE KEY);" + openapi_task_check "start_task_with_condition" $task_name "$start_time" "$duration" "$is_success" "$check_result" + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status $task_name" \ + "\"stage\": \"Running\"" 2 + + run_sql_tidb_with_retry "show tables in openapi;" "t5" + run_sql_tidb_with_retry "show tables in openapi;" "t6" + run_sql_tidb_with_retry "SELECT count(1) FROM information_schema.tables WHERE table_schema = 'openapi';" "count(1): 2" + + openapi_task_check "stop_task_success" "$task_name" "" + openapi_task_check "delete_task_with_force_success" "$task_name" + openapi_task_check "get_task_list" 0 + + # incremental task no duration has error + export GO_FAILPOINTS='github.com/pingcap/tiflow/dm/syncer/SafeModeInitPhaseSeconds=return(0)' + kill_dm_worker + check_port_offline $WORKER1_PORT 20 + check_port_offline $WORKER2_PORT 20 + + # run dm-worker1 + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + # run dm-worker2 + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + openapi_source_check "list_source_success" 2 + + prepare_database + run_sql_tidb "DROP DATABASE if exists openapi;" + task_name="incremental_task_no_duration_but_error" + run_sql_source1 "CREATE TABLE openapi.t1(i TINYINT, j INT UNIQUE KEY);" + run_sql_source2 "CREATE TABLE openapi.t2(i TINYINT, j INT UNIQUE KEY);" + + sleep 2 + start_time=$(date '+%Y-%m-%d %T') + sleep 2 + duration="" + is_success="success" + check_result="" + + run_sql_source1 "INSERT INTO openapi.t1(i,j) VALUES (1, 2);" + run_sql_source2 "INSERT INTO openapi.t2(i,j) VALUES (1, 2);" + run_sql_source1 "INSERT INTO openapi.t1(i,j) VALUES (3, 4);" + run_sql_source2 "INSERT INTO openapi.t2(i,j) VALUES (3, 4);" + # mock already sync data to downstream + run_sql_tidb 'CREATE DATABASE openapi;' + run_sql_tidb "CREATE TABLE openapi.t1(i TINYINT, j INT UNIQUE KEY);" + run_sql_tidb "CREATE TABLE openapi.t2(i TINYINT, j INT UNIQUE KEY);" + run_sql_tidb "INSERT INTO openapi.t1(i,j) VALUES (1, 2);" + run_sql_tidb "INSERT INTO openapi.t2(i,j) VALUES (1, 2);" + + openapi_task_check "create_incremental_task_with_gitd_success" $task_name "" "" "" "" "" "" + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status $task_name" \ + "\"stage\": \"Stopped\"" 2 + openapi_task_check "start_task_with_condition" $task_name "$start_time" "$duration" "$is_success" "$check_result" + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status $task_name" \ + "Duplicate entry" 2 + + # set duration and start again + openapi_task_check "stop_task_success" "$task_name" "" + duration="600s" + openapi_task_check "start_task_with_condition" $task_name "$start_time" "$duration" "$is_success" "$check_result" + + run_sql_tidb_with_retry "SELECT count(1) FROM openapi.t1;" "count(1): 2" + run_sql_tidb_with_retry "SELECT count(1) FROM openapi.t2;" "count(1): 2" + + openapi_task_check "stop_task_success" "$task_name" "" + openapi_task_check "delete_task_with_force_success" "$task_name" + openapi_task_check "get_task_list" 0 + + clean_cluster_sources_and_tasks + echo ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>TEST OPENAPI: START TASK WITH CONDITION SUCCESS" +} + +function test_stop_task_with_condition() { + echo ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>START TEST OPENAPI: STOP TASK WITH CONDITION" + prepare_database + run_sql_tidb "DROP DATABASE if exists openapi;" + + # create source successfully + openapi_source_check "create_source1_success" + openapi_source_check "list_source_success" 1 + + # get source status success + openapi_source_check "get_source_status_success" "mysql-01" + # create source successfully + openapi_source_check "create_source2_success" + # get source list success + openapi_source_check "list_source_success" 2 + + # get source status success + openapi_source_check "get_source_status_success" "mysql-02" + + # test wait_time_on_stop + export GO_FAILPOINTS='github.com/pingcap/tiflow/dm/syncer/recordAndIgnorePrepareTime=return();github.com/pingcap/tiflow/dm/syncer/checkWaitDuration=return("200s")' + kill_dm_worker + check_port_offline $WORKER1_PORT 20 + check_port_offline $WORKER2_PORT 20 + + # run dm-worker1 + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + # run dm-worker2 + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + + task_name="test_wait_time_on_stop" + # create no shard task success + openapi_task_check "create_noshard_task_success" $task_name "" + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status $task_name" \ + "\"stage\": \"Stopped\"" 2 + + timeout_duration="200s" + + openapi_task_check "start_task_success" $task_name "" + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status $task_name" \ + "\"stage\": \"Running\"" 2 + init_noshard_data + check_sync_diff $WORK_DIR $cur/conf/diff_config_no_shard.toml + openapi_task_check "stop_task_with_condition" "$task_name" "" "$timeout_duration" + echo "error check" + check_log_contain_with_retry 'panic: success check wait_time_on_stop !!!' $WORK_DIR/worker1/log/stdout.log + check_log_contain_with_retry 'panic: success check wait_time_on_stop !!!' $WORK_DIR/worker2/log/stdout.log + + # clean + export GO_FAILPOINTS='' + kill_dm_worker + check_port_offline $WORKER1_PORT 20 + check_port_offline $WORKER2_PORT 20 + + # run dm-worker1 + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + # run dm-worker2 + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + openapi_task_check "delete_task_with_force_success" "$task_name" + openapi_task_check "get_task_list" 0 + + clean_cluster_sources_and_tasks + echo ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>TEST OPENAPI: START TASK WITH CONDITION SUCCESS" +} + function test_cluster() { # list master and worker node openapi_cluster_check "list_master_success" 2 @@ -682,6 +980,8 @@ function run() { test_complex_operations_of_source_and_task test_task_with_ignore_check_items test_delete_task_with_stopped_downstream + test_start_task_with_condition + test_stop_task_with_condition # NOTE: this test case MUST running at last, because it will offline some members of cluster test_cluster From ae3cefd6cdfe3e41f8a2c289f7828a249b5af65a Mon Sep 17 00:00:00 2001 From: Jianyuan Jiang Date: Tue, 17 May 2022 11:02:36 +0800 Subject: [PATCH 13/14] test(ticdc): add processor etcd worker delay fail point (#5426) ref pingcap/tiflow#5326 --- cdc/capture/capture.go | 5 +++-- pkg/chdelay/channel_delayer.go | 2 +- pkg/orchestrator/etcd_worker.go | 17 +++++++++++++---- 3 files changed, 17 insertions(+), 7 deletions(-) diff --git a/cdc/capture/capture.go b/cdc/capture/capture.go index c3ef03fee6a..872f0d6c3f1 100644 --- a/cdc/capture/capture.go +++ b/cdc/capture/capture.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/log" + "github.com/pingcap/tiflow/pkg/util" "go.etcd.io/etcd/client/v3/concurrency" "go.etcd.io/etcd/server/v3/mvcc" "go.uber.org/zap" @@ -290,7 +291,7 @@ func (c *Capture) run(stdCtx context.Context) error { // when the etcd worker of processor returns an error, it means that the processor throws an unrecoverable serious errors // (recoverable errors are intercepted in the processor tick) // so we should also stop the processor and let capture restart or exit - processorErr = c.runEtcdWorker(ctx, c.processorManager, globalState, processorFlushInterval, "processor") + processorErr = c.runEtcdWorker(ctx, c.processorManager, globalState, processorFlushInterval, util.RoleProcessor.String()) log.Info("the processor routine has exited", zap.Error(processorErr)) }() wg.Add(1) @@ -388,7 +389,7 @@ func (c *Capture) campaignOwner(ctx cdcContext.Context) error { c.MessageRouter.RemovePeer(captureID) }) - err = c.runEtcdWorker(ownerCtx, owner, orchestrator.NewGlobalState(), ownerFlushInterval, "owner") + err = c.runEtcdWorker(ownerCtx, owner, orchestrator.NewGlobalState(), ownerFlushInterval, util.RoleOwner.String()) c.setOwner(nil) log.Info("run owner exited", zap.Error(err)) // if owner exits, resign the owner key diff --git a/pkg/chdelay/channel_delayer.go b/pkg/chdelay/channel_delayer.go index 0ed34ea181b..8a29d6ac6c9 100644 --- a/pkg/chdelay/channel_delayer.go +++ b/pkg/chdelay/channel_delayer.go @@ -47,7 +47,7 @@ type entry[T any] struct { // NewChannelDelayer creates a new ChannelDelayer. func NewChannelDelayer[T any]( delayBy time.Duration, - in chan T, + in <-chan T, queueSize int, outChSize int, ) *ChannelDelayer[T] { diff --git a/pkg/orchestrator/etcd_worker.go b/pkg/orchestrator/etcd_worker.go index 263b861b778..64acf720c79 100644 --- a/pkg/orchestrator/etcd_worker.go +++ b/pkg/orchestrator/etcd_worker.go @@ -22,6 +22,11 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/log" + "github.com/pingcap/tiflow/pkg/chdelay" + cerrors "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/etcd" + "github.com/pingcap/tiflow/pkg/orchestrator/util" + pkgutil "github.com/pingcap/tiflow/pkg/util" "github.com/prometheus/client_golang/prometheus" "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/api/v3/mvccpb" @@ -31,10 +36,6 @@ import ( "go.uber.org/zap" "go.uber.org/zap/zapcore" "golang.org/x/time/rate" - - cerrors "github.com/pingcap/tiflow/pkg/errors" - "github.com/pingcap/tiflow/pkg/etcd" - "github.com/pingcap/tiflow/pkg/orchestrator/util" ) const ( @@ -134,6 +135,14 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, defer cancel() watchCh := worker.client.Watch(watchCtx, worker.prefix.String(), role, clientv3.WithPrefix(), clientv3.WithRev(worker.revision+1)) + if role == pkgutil.RoleProcessor.String() { + failpoint.Inject("ProcessorEtcdDelay", func() { + delayer := chdelay.NewChannelDelayer(time.Second*3, watchCh, 1024, 16) + defer delayer.Close() + watchCh = delayer.Out() + }) + } + var ( pendingPatches [][]DataPatch exiting bool From 5e858a4766cf0ead2995581e141a43aa0a3c74ab Mon Sep 17 00:00:00 2001 From: CharlesCheung <61726649+CharlesCheung96@users.noreply.github.com> Date: Tue, 17 May 2022 12:20:37 +0800 Subject: [PATCH 14/14] sink(ticdc): make mysql sink support split transactions (#5281) close pingcap/tiflow#5280 --- cdc/model/mounter.go | 13 ++- cdc/model/sink.go | 3 + cdc/processor/pipeline/sink_test.go | 6 +- cdc/processor/pipeline/sorter.go | 6 +- cdc/processor/pipeline/table.go | 2 +- cdc/sink/flowcontrol/flow_control.go | 92 +++++++++++++++++++--- cdc/sink/flowcontrol/flow_control_test.go | 66 ++++++++-------- cdc/sink/flowcontrol/table_memory_quota.go | 6 +- cdc/sink/mysql/txn_cache.go | 2 +- 9 files changed, 144 insertions(+), 52 deletions(-) diff --git a/cdc/model/mounter.go b/cdc/model/mounter.go index ab2a3d3bec4..9998d6eb5d2 100644 --- a/cdc/model/mounter.go +++ b/cdc/model/mounter.go @@ -13,7 +13,7 @@ package model -// PolymorphicEvent describes an event can be in multiple states +// PolymorphicEvent describes an event can be in multiple states. type PolymorphicEvent struct { StartTs uint64 // Commit or resolved TS @@ -23,7 +23,16 @@ type PolymorphicEvent struct { Row *RowChangedEvent } -// NewPolymorphicEvent creates a new PolymorphicEvent with a raw KV +// NewEmptyPolymorphicEvent creates a new empty PolymorphicEvent. +func NewEmptyPolymorphicEvent(ts uint64) *PolymorphicEvent { + return &PolymorphicEvent{ + CRTs: ts, + RawKV: &RawKVEntry{}, + Row: &RowChangedEvent{}, + } +} + +// NewPolymorphicEvent creates a new PolymorphicEvent with a raw KV. func NewPolymorphicEvent(rawKV *RawKVEntry) *PolymorphicEvent { if rawKV.OpType == OpTypeResolved { return NewResolvedPolymorphicEvent(rawKV.RegionID, rawKV.CRTs) diff --git a/cdc/model/sink.go b/cdc/model/sink.go index e68e4453cce..eaa3740e505 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -267,6 +267,9 @@ type RowChangedEvent struct { // ApproximateDataSize is the approximate size of protobuf binary // representation of this event. ApproximateDataSize int64 `json:"-" msg:"-"` + + // SplitTxn marks this RowChangedEvent as the first line of a new txn. + SplitTxn bool `json:"-" msg:"-"` } // IsDelete returns true if the row is a delete event diff --git a/cdc/processor/pipeline/sink_test.go b/cdc/processor/pipeline/sink_test.go index 343f4c8ed44..87766150120 100644 --- a/cdc/processor/pipeline/sink_test.go +++ b/cdc/processor/pipeline/sink_test.go @@ -40,7 +40,11 @@ type mockSink struct { // we are testing sinkNode by itself. type mockFlowController struct{} -func (c *mockFlowController) Consume(commitTs uint64, size uint64, blockCallBack func() error) error { +func (c *mockFlowController) Consume( + msg *model.PolymorphicEvent, + size uint64, + blockCallBack func(bool) error, +) error { return nil } diff --git a/cdc/processor/pipeline/sorter.go b/cdc/processor/pipeline/sorter.go index a503338f094..f9cb796bcdb 100644 --- a/cdc/processor/pipeline/sorter.go +++ b/cdc/processor/pipeline/sorter.go @@ -206,8 +206,10 @@ func (n *sorterNode) start( size := uint64(msg.Row.ApproximateBytes()) // NOTE we allow the quota to be exceeded if blocking means interrupting a transaction. // Otherwise the pipeline would deadlock. - err = n.flowController.Consume(commitTs, size, func() error { - if lastCRTs > lastSentResolvedTs { + err = n.flowController.Consume(msg, size, func(batch bool) error { + if batch { + log.Panic("cdc does not support the batch resolve mechanism at this time") + } else if lastCRTs > lastSentResolvedTs { // If we are blocking, we send a Resolved Event here to elicit a sink-flush. // Not sending a Resolved Event here will very likely deadlock the pipeline. lastSentResolvedTs = lastCRTs diff --git a/cdc/processor/pipeline/table.go b/cdc/processor/pipeline/table.go index 8c015656d7a..556c08ee1b3 100644 --- a/cdc/processor/pipeline/table.go +++ b/cdc/processor/pipeline/table.go @@ -78,7 +78,7 @@ type tablePipelineImpl struct { // TODO find a better name or avoid using an interface // We use an interface here for ease in unit testing. type tableFlowController interface { - Consume(commitTs uint64, size uint64, blockCallBack func() error) error + Consume(msg *model.PolymorphicEvent, size uint64, blockCallBack func(batch bool) error) error Release(resolvedTs uint64) Abort() GetConsumption() uint64 diff --git a/cdc/sink/flowcontrol/flow_control.go b/cdc/sink/flowcontrol/flow_control.go index c6099fa6557..1e97ed6390e 100644 --- a/cdc/sink/flowcontrol/flow_control.go +++ b/cdc/sink/flowcontrol/flow_control.go @@ -20,9 +20,16 @@ import ( "github.com/edwingeng/deque" "github.com/pingcap/errors" "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/model" "go.uber.org/zap" ) +const ( + maxRowsPerTxn = 1024 + maxSizePerTxn = 1024 * 1024 /* 1MB */ + batchSize = 100 +) + // TableFlowController provides a convenient interface to control the memory consumption of a per table event stream type TableFlowController struct { memoryQuota *tableMemoryQuota @@ -31,13 +38,20 @@ type TableFlowController struct { sync.Mutex queue deque.Deque } + // batchGroupCount is the number of txnSizeEntries with same commitTs, which could be: + // 1. Different txns with same commitTs but different startTs + // 2. TxnSizeEntry split from the same txns which exceeds max rows or max size + batchGroupCount uint lastCommitTs uint64 } -type commitTsSizeEntry struct { +type txnSizeEntry struct { + // txn id + startTs uint64 commitTs uint64 size uint64 + rowCount uint64 } // NewTableFlowController creates a new TableFlowController @@ -55,7 +69,12 @@ func NewTableFlowController(quota uint64) *TableFlowController { // Consume is called when an event has arrived for being processed by the sink. // It will handle transaction boundaries automatically, and will not block intra-transaction. -func (c *TableFlowController) Consume(commitTs uint64, size uint64, blockCallBack func() error) error { +func (c *TableFlowController) Consume( + msg *model.PolymorphicEvent, + size uint64, + callBack func(batch bool) error, +) error { + commitTs := msg.CRTs lastCommitTs := atomic.LoadUint64(&c.lastCommitTs) if commitTs < lastCommitTs { @@ -65,8 +84,7 @@ func (c *TableFlowController) Consume(commitTs uint64, size uint64, blockCallBac } if commitTs > lastCommitTs { - atomic.StoreUint64(&c.lastCommitTs, commitTs) - err := c.memoryQuota.consumeWithBlocking(size, blockCallBack) + err := c.memoryQuota.consumeWithBlocking(size, callBack) if err != nil { return errors.Trace(err) } @@ -82,13 +100,7 @@ func (c *TableFlowController) Consume(commitTs uint64, size uint64, blockCallBac } } - c.queueMu.Lock() - defer c.queueMu.Unlock() - c.queueMu.queue.PushBack(&commitTsSizeEntry{ - commitTs: commitTs, - size: size, - }) - + c.enqueueSingleMsg(msg, size) return nil } @@ -98,7 +110,7 @@ func (c *TableFlowController) Release(resolvedTs uint64) { c.queueMu.Lock() for c.queueMu.queue.Len() > 0 { - if peeked := c.queueMu.queue.Front().(*commitTsSizeEntry); peeked.commitTs <= resolvedTs { + if peeked := c.queueMu.queue.Front().(*txnSizeEntry); peeked.commitTs <= resolvedTs { nBytesToRelease += peeked.size c.queueMu.queue.PopFront() } else { @@ -110,6 +122,62 @@ func (c *TableFlowController) Release(resolvedTs uint64) { c.memoryQuota.release(nBytesToRelease) } +// Note that msgs received by enqueueSingleMsg must be sorted by commitTs_startTs order. +func (c *TableFlowController) enqueueSingleMsg(msg *model.PolymorphicEvent, size uint64) { + commitTs := msg.CRTs + lastCommitTs := atomic.LoadUint64(&c.lastCommitTs) + + c.queueMu.Lock() + defer c.queueMu.Unlock() + + var e deque.Elem + // 1. Processing a new txn with different commitTs. + if e = c.queueMu.queue.Back(); e == nil || lastCommitTs < commitTs { + atomic.StoreUint64(&c.lastCommitTs, commitTs) + c.queueMu.queue.PushBack(&txnSizeEntry{ + startTs: msg.StartTs, + commitTs: commitTs, + size: size, + rowCount: 1, + }) + c.batchGroupCount = 1 + msg.Row.SplitTxn = true + return + } + + // Processing txns with the same commitTs. + txnEntry := e.(*txnSizeEntry) + if txnEntry.commitTs != lastCommitTs { + log.Panic("got wrong commitTs from deque, report a bug", + zap.Uint64("lastCommitTs", c.lastCommitTs), + zap.Uint64("commitTsInDeque", txnEntry.commitTs)) + } + + // 2. Append row to current txn entry. + if txnEntry.startTs == msg.Row.StartTs && + txnEntry.rowCount < maxRowsPerTxn && txnEntry.size < maxSizePerTxn { + txnEntry.size += size + txnEntry.rowCount++ + return + } + + // 3. Split the txn or handle a new txn with the same commitTs. + c.queueMu.queue.PushBack(&txnSizeEntry{ + startTs: msg.StartTs, + commitTs: commitTs, + size: size, + rowCount: 1, + }) + c.batchGroupCount++ + msg.Row.SplitTxn = true + + if c.batchGroupCount >= batchSize { + c.batchGroupCount = 0 + // TODO(CharlesCheung): add batch resolve mechanism to mitigate oom problem + log.Debug("emit batch resolve event throw callback") + } +} + // Abort interrupts any ongoing Consume call func (c *TableFlowController) Abort() { c.memoryQuota.abort() diff --git a/cdc/sink/flowcontrol/flow_control_test.go b/cdc/sink/flowcontrol/flow_control_test.go index 24f639fdf8a..6836299e4a4 100644 --- a/cdc/sink/flowcontrol/flow_control_test.go +++ b/cdc/sink/flowcontrol/flow_control_test.go @@ -21,11 +21,12 @@ import ( "testing" "time" + "github.com/pingcap/tiflow/cdc/model" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" ) -func dummyCallBack() error { +func dummyCallBack(_ bool) error { return nil } @@ -34,7 +35,7 @@ type mockCallBacker struct { injectedErr error } -func (c *mockCallBacker) cb() error { +func (c *mockCallBacker) cb(_ bool) error { c.timesCalled += 1 return c.injectedErr } @@ -173,7 +174,7 @@ func TestFlowControlBasic(t *testing.T) { ctx, cancel := context.WithTimeout(context.TODO(), time.Second*5) defer cancel() errg, ctx := errgroup.WithContext(ctx) - mockedRowsCh := make(chan *commitTsSizeEntry, 1024) + mockedRowsCh := make(chan *txnSizeEntry, 1024) flowController := NewTableFlowController(2048) errg.Go(func() error { @@ -186,7 +187,7 @@ func TestFlowControlBasic(t *testing.T) { select { case <-ctx.Done(): return ctx.Err() - case mockedRowsCh <- &commitTsSizeEntry{ + case mockedRowsCh <- &txnSizeEntry{ commitTs: lastCommitTs, size: size, }: @@ -202,7 +203,7 @@ func TestFlowControlBasic(t *testing.T) { defer close(eventCh) resolvedTs := uint64(0) for { - var mockedRow *commitTsSizeEntry + var mockedRow *txnSizeEntry select { case <-ctx.Done(): return ctx.Err() @@ -227,7 +228,8 @@ func TestFlowControlBasic(t *testing.T) { resolvedTs = mockedRow.commitTs updatedResolvedTs = true } - err := flowController.Consume(mockedRow.commitTs, mockedRow.size, dummyCallBack) + err := flowController.Consume(model.NewEmptyPolymorphicEvent(mockedRow.commitTs), + mockedRow.size, dummyCallBack) require.Nil(t, err) select { case <-ctx.Done(): @@ -290,13 +292,13 @@ func TestFlowControlAbort(t *testing.T) { go func() { defer wg.Done() - err := controller.Consume(1, 1000, callBacker.cb) + err := controller.Consume(model.NewEmptyPolymorphicEvent(1), 1000, callBacker.cb) require.Nil(t, err) require.Equal(t, 0, callBacker.timesCalled) - err = controller.Consume(2, 1000, callBacker.cb) + err = controller.Consume(model.NewEmptyPolymorphicEvent(2), 1000, callBacker.cb) require.Regexp(t, ".*ErrFlowControllerAborted.*", err) require.Equal(t, 1, callBacker.timesCalled) - err = controller.Consume(2, 10, callBacker.cb) + err = controller.Consume(model.NewEmptyPolymorphicEvent(2), 10, callBacker.cb) require.Regexp(t, ".*ErrFlowControllerAborted.*", err) require.Equal(t, 1, callBacker.timesCalled) }() @@ -314,7 +316,7 @@ func TestFlowControlCallBack(t *testing.T) { ctx, cancel := context.WithTimeout(context.TODO(), time.Second*5) defer cancel() errg, ctx := errgroup.WithContext(ctx) - mockedRowsCh := make(chan *commitTsSizeEntry, 1024) + mockedRowsCh := make(chan *txnSizeEntry, 1024) flowController := NewTableFlowController(512) errg.Go(func() error { @@ -327,7 +329,7 @@ func TestFlowControlCallBack(t *testing.T) { select { case <-ctx.Done(): return ctx.Err() - case mockedRowsCh <- &commitTsSizeEntry{ + case mockedRowsCh <- &txnSizeEntry{ commitTs: lastCommitTs, size: size, }: @@ -343,7 +345,7 @@ func TestFlowControlCallBack(t *testing.T) { defer close(eventCh) lastCRTs := uint64(0) for { - var mockedRow *commitTsSizeEntry + var mockedRow *txnSizeEntry select { case <-ctx.Done(): return ctx.Err() @@ -355,16 +357,17 @@ func TestFlowControlCallBack(t *testing.T) { } atomic.AddUint64(&consumedBytes, mockedRow.size) - err := flowController.Consume(mockedRow.commitTs, mockedRow.size, func() error { - select { - case <-ctx.Done(): - return ctx.Err() - case eventCh <- &mockedEvent{ - resolvedTs: lastCRTs, - }: - } - return nil - }) + err := flowController.Consume(model.NewEmptyPolymorphicEvent(mockedRow.commitTs), + mockedRow.size, func(bool) error { + select { + case <-ctx.Done(): + return ctx.Err() + case eventCh <- &mockedEvent{ + resolvedTs: lastCRTs, + }: + } + return nil + }) require.Nil(t, err) lastCRTs = mockedRow.commitTs @@ -426,7 +429,7 @@ func TestFlowControlCallBackNotBlockingRelease(t *testing.T) { go func() { defer wg.Done() - err := controller.Consume(1, 511, func() error { + err := controller.Consume(model.NewEmptyPolymorphicEvent(1), 511, func(bool) error { t.Error("unreachable") return nil }) @@ -443,7 +446,7 @@ func TestFlowControlCallBackNotBlockingRelease(t *testing.T) { cancel() }() - err = controller.Consume(2, 511, func() error { + err = controller.Consume(model.NewEmptyPolymorphicEvent(2), 511, func(bool) error { atomic.StoreInt32(&isBlocked, 1) <-ctx.Done() atomic.StoreInt32(&isBlocked, 0) @@ -468,12 +471,12 @@ func TestFlowControlCallBackError(t *testing.T) { go func() { defer wg.Done() - err := controller.Consume(1, 511, func() error { + err := controller.Consume(model.NewEmptyPolymorphicEvent(1), 511, func(bool) error { t.Error("unreachable") return nil }) require.Nil(t, err) - err = controller.Consume(2, 511, func() error { + err = controller.Consume(model.NewEmptyPolymorphicEvent(2), 511, func(bool) error { <-ctx.Done() return ctx.Err() }) @@ -490,7 +493,7 @@ func TestFlowControlConsumeLargerThanQuota(t *testing.T) { t.Parallel() controller := NewTableFlowController(1024) - err := controller.Consume(1, 2048, func() error { + err := controller.Consume(model.NewEmptyPolymorphicEvent(1), 2048, func(bool) error { t.Error("unreachable") return nil }) @@ -501,7 +504,7 @@ func BenchmarkTableFlowController(B *testing.B) { ctx, cancel := context.WithTimeout(context.TODO(), time.Second*5) defer cancel() errg, ctx := errgroup.WithContext(ctx) - mockedRowsCh := make(chan *commitTsSizeEntry, 102400) + mockedRowsCh := make(chan *txnSizeEntry, 102400) flowController := NewTableFlowController(20 * 1024 * 1024) // 20M errg.Go(func() error { @@ -514,7 +517,7 @@ func BenchmarkTableFlowController(B *testing.B) { select { case <-ctx.Done(): return ctx.Err() - case mockedRowsCh <- &commitTsSizeEntry{ + case mockedRowsCh <- &txnSizeEntry{ commitTs: lastCommitTs, size: size, }: @@ -530,7 +533,7 @@ func BenchmarkTableFlowController(B *testing.B) { defer close(eventCh) resolvedTs := uint64(0) for { - var mockedRow *commitTsSizeEntry + var mockedRow *txnSizeEntry select { case <-ctx.Done(): return ctx.Err() @@ -551,7 +554,8 @@ func BenchmarkTableFlowController(B *testing.B) { } resolvedTs = mockedRow.commitTs } - err := flowController.Consume(mockedRow.commitTs, mockedRow.size, dummyCallBack) + err := flowController.Consume(model.NewEmptyPolymorphicEvent(mockedRow.commitTs), + mockedRow.size, dummyCallBack) if err != nil { B.Fatal(err) } diff --git a/cdc/sink/flowcontrol/table_memory_quota.go b/cdc/sink/flowcontrol/table_memory_quota.go index 7ca15e7857f..c563ba4f333 100644 --- a/cdc/sink/flowcontrol/table_memory_quota.go +++ b/cdc/sink/flowcontrol/table_memory_quota.go @@ -54,7 +54,9 @@ func newTableMemoryQuota(quota uint64) *tableMemoryQuota { // block until enough memory has been freed up by release. // blockCallBack will be called if the function will block. // Should be used with care to prevent deadlock. -func (c *tableMemoryQuota) consumeWithBlocking(nBytes uint64, blockCallBack func() error) error { +func (c *tableMemoryQuota) consumeWithBlocking( + nBytes uint64, blockCallBack func(bool) error, +) error { if nBytes >= c.quota { return cerrors.ErrFlowControllerEventLargerThanQuota.GenWithStackByArgs(nBytes, c.quota) } @@ -62,7 +64,7 @@ func (c *tableMemoryQuota) consumeWithBlocking(nBytes uint64, blockCallBack func c.consumed.Lock() if c.consumed.bytes+nBytes >= c.quota { c.consumed.Unlock() - err := blockCallBack() + err := blockCallBack(false) if err != nil { return errors.Trace(err) } diff --git a/cdc/sink/mysql/txn_cache.go b/cdc/sink/mysql/txn_cache.go index 84c334ba300..8fa1a2227b6 100644 --- a/cdc/sink/mysql/txn_cache.go +++ b/cdc/sink/mysql/txn_cache.go @@ -36,7 +36,7 @@ func (t *txnsWithTheSameCommitTs) Append(row *model.RowChangedEvent) { } var txn *model.SingleTableTxn - if len(t.txns) == 0 || t.txns[len(t.txns)-1].StartTs < row.StartTs { + if len(t.txns) == 0 || row.SplitTxn || t.txns[len(t.txns)-1].StartTs < row.StartTs { txn = &model.SingleTableTxn{ StartTs: row.StartTs, CommitTs: row.CommitTs,