From c56328d234cd10d6cfe11cae13e76ed8ad02535b Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Tue, 29 Nov 2022 11:24:03 +0800 Subject: [PATCH 1/6] processor/sinkmanager(ticdc): background GC sort engine data --- cdc/processor/processor.go | 67 +++------------------ cdc/processor/sinkmanager/manager.go | 71 +++++++++++++++++++++-- cdc/processor/sinkmanager/manager_test.go | 3 +- 3 files changed, 74 insertions(+), 67 deletions(-) diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index 8a0056674a6..9b0e9f9c3a4 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -300,16 +300,7 @@ func (p *processor) IsAddTableFinished(tableID model.TableID, isPrepare bool) bo if p.pullBasedSinking { state, alreadyExist = p.sinkManager.GetTableState(tableID) if alreadyExist { - stats, err := p.sinkManager.GetTableStats(tableID) - if err != nil { - log.Warn("Failed to get table stats", - zap.String("captureID", p.captureInfo.ID), - zap.String("namespace", p.changefeedID.Namespace), - zap.String("changefeed", p.changefeedID.ID), - zap.Int64("tableID", tableID), - zap.Error(err)) - return false - } + stats := p.sinkManager.GetTableStats(tableID) tableResolvedTs = stats.ResolvedTs tableCheckpointTs = stats.CheckpointTs } @@ -384,16 +375,7 @@ func (p *processor) IsRemoveTableFinished(tableID model.TableID) (model.Ts, bool if p.pullBasedSinking { state, alreadyExist = p.sinkManager.GetTableState(tableID) if alreadyExist { - stats, err := p.sinkManager.GetTableStats(tableID) - if err != nil { - log.Warn("Failed to get table stats", - zap.String("captureID", p.captureInfo.ID), - zap.String("namespace", p.changefeedID.Namespace), - zap.String("changefeed", p.changefeedID.ID), - zap.Int64("tableID", tableID), - zap.Error(err)) - return 0, false - } + stats := p.sinkManager.GetTableStats(tableID) tableCheckpointTs = stats.CheckpointTs } } else { @@ -426,17 +408,7 @@ func (p *processor) IsRemoveTableFinished(tableID model.TableID) (model.Ts, bool } if p.pullBasedSinking { - stats, err := p.sinkManager.GetTableStats(tableID) - // TODO: handle error - if err != nil { - log.Warn("Failed to get table stats", - zap.String("captureID", p.captureInfo.ID), - zap.String("namespace", p.changefeedID.Namespace), - zap.String("changefeed", p.changefeedID.ID), - zap.Int64("tableID", tableID), - zap.Error(err)) - return 0, false - } + stats := p.sinkManager.GetTableStats(tableID) p.sourceManager.RemoveTable(tableID) p.sinkManager.RemoveTable(tableID) if p.redoManager.Enabled() { @@ -493,20 +465,7 @@ func (p *processor) GetTableStatus(tableID model.TableID) tablepb.TableStatus { State: tablepb.TableStateAbsent, } } - sinkStats, err := p.sinkManager.GetTableStats(tableID) - // TODO: handle the error - if err != nil { - log.Warn("Failed to get table sinkStats", - zap.String("captureID", p.captureInfo.ID), - zap.String("namespace", p.changefeedID.Namespace), - zap.String("changefeed", p.changefeedID.ID), - zap.Int64("tableID", tableID), - zap.Error(err)) - return tablepb.TableStatus{ - TableID: tableID, - State: tablepb.TableStateAbsent, - } - } + sinkStats := p.sinkManager.GetTableStats(tableID) return tablepb.TableStatus{ TableID: tableID, Checkpoint: tablepb.Checkpoint{ @@ -749,9 +708,7 @@ func (p *processor) tick(ctx cdcContext.Context) error { // it is no need to check the error here, because we will use // local time when an error return, which is acceptable pdTime, _ := p.upstream.PDClock.CurrentTime() - if err := p.handlePosition(oracle.GetPhysical(pdTime)); err != nil { - return errors.Trace(err) - } + p.handlePosition(oracle.GetPhysical(pdTime)) p.doGCSchemaStorage() @@ -1080,7 +1037,7 @@ func (p *processor) sendError(err error) { // resolvedTs = min(schemaStorage's resolvedTs, all table's resolvedTs). // table's resolvedTs = redo's resolvedTs if redo enable, else sorter's resolvedTs. // checkpointTs = min(resolvedTs, all table's checkpointTs). -func (p *processor) handlePosition(currentTs int64) error { +func (p *processor) handlePosition(currentTs int64) { minResolvedTs := uint64(math.MaxUint64) minResolvedTableID := int64(0) if p.schemaStorage != nil { @@ -1091,10 +1048,7 @@ func (p *processor) handlePosition(currentTs int64) error { if p.pullBasedSinking { tableIDs := p.sinkManager.GetAllCurrentTableIDs() for _, tableID := range tableIDs { - stats, err := p.sinkManager.GetTableStats(tableID) - if err != nil { - return errors.Trace(err) - } + stats := p.sinkManager.GetTableStats(tableID) log.Debug("sink manager gets table stats", zap.String("namespace", p.changefeedID.Namespace), zap.String("changefeed", p.changefeedID.ID), @@ -1139,8 +1093,6 @@ func (p *processor) handlePosition(currentTs int64) error { p.checkpointTs = minCheckpointTs p.resolvedTs = minResolvedTs - - return nil } // pushResolvedTs2Table sends global resolved ts to all the table pipelines. @@ -1442,10 +1394,7 @@ func (p *processor) WriteDebugInfo(w io.Writer) error { tables := p.sinkManager.GetAllCurrentTableIDs() for _, tableID := range tables { state, _ := p.sinkManager.GetTableState(tableID) - stats, err := p.sinkManager.GetTableStats(tableID) - if err != nil { - return err - } + stats := p.sinkManager.GetTableStats(tableID) // TODO: add table name. fmt.Fprintf(w, "tableID: %d, resolvedTs: %d, checkpointTs: %d, state: %s\n", tableID, stats.ResolvedTs, stats.CheckpointTs, state) diff --git a/cdc/processor/sinkmanager/manager.go b/cdc/processor/sinkmanager/manager.go index 0b06e77b364..24874a6f65f 100644 --- a/cdc/processor/sinkmanager/manager.go +++ b/cdc/processor/sinkmanager/manager.go @@ -37,8 +37,14 @@ const ( sinkWorkerNum = 8 redoWorkerNum = 4 defaultGenerateTaskInterval = 100 * time.Millisecond + defaultEngineGCChanSize = 128 ) +type gcEvent struct { + tableID model.TableID + cleanPos engine.Position +} + // SinkManager is the implementation of SinkManager. type SinkManager struct { changefeedID model.ChangeFeedID @@ -68,6 +74,9 @@ type SinkManager struct { // lastBarrierTs is the last barrier ts. lastBarrierTs atomic.Uint64 + // engineGCChan is used to GC engine when the table is advanced. + engineGCChan chan *gcEvent + // sinkWorkers used to pull data from source manager. sinkWorkers []*sinkWorker // sinkTaskChan is used to send tasks to sinkWorkers. @@ -134,6 +143,7 @@ func New( m.startWorkers(mg, changefeedInfo.Config.Sink.TxnAtomicity.ShouldSplitTxn(), changefeedInfo.Config.EnableOldValue) m.startGenerateTasks() + m.backgroundGC() log.Info("Sink manager is created", zap.String("namespace", changefeedID.Namespace), @@ -247,6 +257,39 @@ func (m *SinkManager) startGenerateTasks() { }() } +// backgroundGC is used to clean up the old data in the sorter. +func (m *SinkManager) backgroundGC() { + m.wg.Add(1) + go func() { + defer m.wg.Done() + for { + select { + case <-m.ctx.Done(): + log.Info("Background GC is stooped because context is canceled", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID)) + return + case gcEvent := <-m.engineGCChan: + if err := m.sortEngine.CleanByTable(gcEvent.tableID, gcEvent.cleanPos); err != nil { + log.Error("Failed to clean table in sort engine", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID), + zap.Int64("tableID", gcEvent.tableID), + zap.Error(err)) + select { + case m.errChan <- err: + default: + log.Error("Failed to send error to error channel, error channel is full", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID), + zap.Error(err)) + } + } + } + } + }() +} + // generateSinkTasks generates tasks to fetch data from the source manager. func (m *SinkManager) generateSinkTasks() error { taskTicker := time.NewTicker(defaultGenerateTaskInterval) @@ -508,7 +551,14 @@ func (m *SinkManager) AsyncStopTable(tableID model.TableID) { zap.String("changefeed", m.changefeedID.ID), zap.Int64("tableID", tableID), zap.Error(err)) - m.errChan <- err + select { + case m.errChan <- err: + default: + log.Error("Failed to send error to error channel, error channel is full", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID), + zap.Error(err)) + } } cleanedBytes := m.memQuota.clean(tableID) log.Debug("MemoryQuotaTracing: Clean up memory quota for table sink task when removing table", @@ -555,7 +605,7 @@ func (m *SinkManager) GetTableState(tableID model.TableID) (tablepb.TableState, } // GetTableStats returns the state of the table. -func (m *SinkManager) GetTableStats(tableID model.TableID) (pipeline.Stats, error) { +func (m *SinkManager) GetTableStats(tableID model.TableID) pipeline.Stats { tableSink, ok := m.tableSinks.Load(tableID) if !ok { log.Panic("Table sink not found when getting table stats", @@ -570,9 +620,18 @@ func (m *SinkManager) GetTableStats(tableID model.TableID) (pipeline.Stats, erro StartTs: resolvedMark - 1, CommitTs: resolvedMark, } - err := m.sortEngine.CleanByTable(tableID, cleanPos) - if err != nil { - return pipeline.Stats{}, errors.Trace(err) + gcEvent := &gcEvent{ + tableID: tableID, + cleanPos: cleanPos, + } + select { + case m.engineGCChan <- gcEvent: + default: + log.Debug("Failed to send GC event to engine GC channel, engine GC channel is full", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID), + zap.Int64("tableID", tableID), + zap.Any("cleanPos", cleanPos)) } var resolvedTs model.Ts // If redo log is enabled, we have to use redo log's resolved ts to calculate processor's min resolved ts. @@ -585,7 +644,7 @@ func (m *SinkManager) GetTableStats(tableID model.TableID) (pipeline.Stats, erro CheckpointTs: resolvedMark, ResolvedTs: resolvedTs, BarrierTs: m.lastBarrierTs.Load(), - }, nil + } } // Close closes all workers. diff --git a/cdc/processor/sinkmanager/manager_test.go b/cdc/processor/sinkmanager/manager_test.go index 3b87a322c12..5b29b3b2cf6 100644 --- a/cdc/processor/sinkmanager/manager_test.go +++ b/cdc/processor/sinkmanager/manager_test.go @@ -281,8 +281,7 @@ func TestGetTableStatsToReleaseMemQuota(t *testing.T) { manager.StartTable(tableID, 0) require.Eventually(t, func() bool { - s, err := manager.GetTableStats(tableID) - require.NoError(t, err) + s := manager.GetTableStats(tableID) return manager.memQuota.getUsedBytes() == 0 && s.CheckpointTs == 4 }, 5*time.Second, 10*time.Millisecond) } From 8d35fc2520b242821c1e46c179b71ef34b63fbd3 Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Tue, 29 Nov 2022 11:26:10 +0800 Subject: [PATCH 2/6] processor/sinkmanager(ticdc): init chan --- cdc/processor/sinkmanager/manager.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cdc/processor/sinkmanager/manager.go b/cdc/processor/sinkmanager/manager.go index 24874a6f65f..3229c777f8b 100644 --- a/cdc/processor/sinkmanager/manager.go +++ b/cdc/processor/sinkmanager/manager.go @@ -125,6 +125,8 @@ func New( sinkFactory: tableSinkFactory, sortEngine: sortEngine, + engineGCChan: make(chan *gcEvent, defaultEngineGCChanSize), + sinkProgressHeap: newTableProgresses(), sinkWorkers: make([]*sinkWorker, 0, sinkWorkerNum), sinkTaskChan: make(chan *sinkTask), From f3a9fa843d8f1664559cef3a51cf3d2b9856563c Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Tue, 29 Nov 2022 16:37:57 +0800 Subject: [PATCH 3/6] processor/sinkmanager(ticdc): add tests --- cdc/processor/sinkmanager/manager_test.go | 82 +++++- .../sourcemanager/engine/mock/engine_mock.go | 242 ++++++++++++++++++ scripts/generate-mock.sh | 1 + 3 files changed, 316 insertions(+), 9 deletions(-) create mode 100644 cdc/processor/sourcemanager/engine/mock/engine_mock.go diff --git a/cdc/processor/sinkmanager/manager_test.go b/cdc/processor/sinkmanager/manager_test.go index 5b29b3b2cf6..161b1ddb5fc 100644 --- a/cdc/processor/sinkmanager/manager_test.go +++ b/cdc/processor/sinkmanager/manager_test.go @@ -18,10 +18,12 @@ import ( "testing" "time" + "github.com/golang/mock/gomock" "github.com/pingcap/tiflow/cdc/entry" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine" "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine/memory" + mock_engine "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine/mock" "github.com/pingcap/tiflow/cdc/processor/tablepb" "github.com/pingcap/tiflow/pkg/config" "github.com/prometheus/client_golang/prometheus" @@ -30,7 +32,7 @@ import ( // nolint:revive // In test it is ok move the ctx to the second parameter. -func createManager( +func createManagerWithMemEngine( t *testing.T, ctx context.Context, changefeedID model.ChangeFeedID, @@ -46,6 +48,25 @@ func createManager( return manager } +// nolint:revive +// In test it is ok move the ctx to the second parameter. +func createManagerWithMockEngine( + t *testing.T, + ctx context.Context, + changefeedID model.ChangeFeedID, + changefeedInfo *model.ChangeFeedInfo, + errChan chan error, +) (*SinkManager, *mock_engine.MockSortEngine) { + ctrl := gomock.NewController(t) + sortEngine := mock_engine.NewMockSortEngine(ctrl) + manager, err := New( + ctx, changefeedID, changefeedInfo, + nil, sortEngine, &entry.MockMountGroup{}, + errChan, prometheus.NewCounter(prometheus.CounterOpts{})) + require.NoError(t, err) + return manager, sortEngine +} + func getChangefeedInfo() *model.ChangeFeedInfo { return &model.ChangeFeedInfo{ Error: nil, @@ -124,7 +145,7 @@ func TestAddTable(t *testing.T) { defer cancel() changefeedInfo := getChangefeedInfo() - manager := createManager(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1)) + manager := createManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1)) defer func() { err := manager.Close() require.NoError(t, err) @@ -152,7 +173,7 @@ func TestRemoveTable(t *testing.T) { defer cancel() changefeedInfo := getChangefeedInfo() - manager := createManager(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1)) + manager := createManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1)) defer func() { err := manager.Close() require.NoError(t, err) @@ -193,7 +214,7 @@ func TestUpdateBarrierTs(t *testing.T) { defer cancel() changefeedInfo := getChangefeedInfo() - manager := createManager(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1)) + manager := createManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1)) defer func() { err := manager.Close() require.NoError(t, err) @@ -211,7 +232,7 @@ func TestGenerateTableSinkTaskWithBarrierTs(t *testing.T) { defer cancel() changefeedInfo := getChangefeedInfo() - manager := createManager(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1)) + manager := createManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1)) defer func() { err := manager.Close() require.NoError(t, err) @@ -238,7 +259,7 @@ func TestGenerateTableSinkTaskWithResolvedTs(t *testing.T) { defer cancel() changefeedInfo := getChangefeedInfo() - manager := createManager(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1)) + manager := createManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1)) defer func() { err := manager.Close() require.NoError(t, err) @@ -267,7 +288,7 @@ func TestGetTableStatsToReleaseMemQuota(t *testing.T) { defer cancel() changefeedInfo := getChangefeedInfo() - manager := createManager(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1)) + manager := createManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1)) defer func() { err := manager.Close() require.NoError(t, err) @@ -293,7 +314,7 @@ func TestDoNotGenerateTableSinkTaskWhenTableIsNotReplicating(t *testing.T) { defer cancel() changefeedInfo := getChangefeedInfo() - manager := createManager(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1)) + manager := createManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1)) tableID := model.TableID(1) manager.AddTable(tableID, 1, 100) addTableAndAddEventsToSortEngine(t, manager.sortEngine, tableID) @@ -314,8 +335,51 @@ func TestClose(t *testing.T) { defer cancel() changefeedInfo := getChangefeedInfo() - manager := createManager(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1)) + manager := createManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1)) err := manager.Close() require.NoError(t, err) } + +func TestGetTableStats(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + changefeedInfo := getChangefeedInfo() + manager, mockEngine := createManagerWithMockEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1)) + + defer func() { + err := manager.Close() + require.NoError(t, err) + }() + tableID := model.TableID(1) + mockEngine.EXPECT().AddTable(tableID) + mockEngine.EXPECT().Add(gomock.Any(), gomock.Any()).AnyTimes() + mockEngine.EXPECT().FetchByTable(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + mockEngine.EXPECT().GetResolvedTs(tableID).AnyTimes() + + manager.AddTable(tableID, 1, 100) + addTableAndAddEventsToSortEngine(t, manager.sortEngine, tableID) + // This would happen when the table just added to this node and redo log is enabled. + // So there is possibility that the resolved ts is smaller than the global barrier ts. + manager.UpdateBarrierTs(4) + manager.UpdateReceivedSorterResolvedTs(tableID, 3) + manager.StartTable(tableID, 0) + + require.Eventually(t, func() bool { + tableSink, ok := manager.tableSinks.Load(tableID) + require.True(t, ok) + checkpointTS := tableSink.(*tableSinkWrapper).getCheckpointTs() + return checkpointTS.ResolvedMark() == 3 + }, 5*time.Second, 10*time.Millisecond) + + cleanPos := engine.Position{ + StartTs: 2, + CommitTs: 3, + } + mockEngine.EXPECT().CleanByTable(tableID, cleanPos).Times(1) + stats := manager.GetTableStats(tableID) + require.Equal(t, uint64(3), stats.CheckpointTs) +} diff --git a/cdc/processor/sourcemanager/engine/mock/engine_mock.go b/cdc/processor/sourcemanager/engine/mock/engine_mock.go new file mode 100644 index 00000000000..5995b1aabfd --- /dev/null +++ b/cdc/processor/sourcemanager/engine/mock/engine_mock.go @@ -0,0 +1,242 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: cdc/processor/sourcemanager/engine/engine.go + +// Package mock_engine is a generated GoMock package. +package mock_engine + +import ( + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + model "github.com/pingcap/tiflow/cdc/model" + engine "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine" +) + +// MockSortEngine is a mock of SortEngine interface. +type MockSortEngine struct { + ctrl *gomock.Controller + recorder *MockSortEngineMockRecorder +} + +// MockSortEngineMockRecorder is the mock recorder for MockSortEngine. +type MockSortEngineMockRecorder struct { + mock *MockSortEngine +} + +// NewMockSortEngine creates a new mock instance. +func NewMockSortEngine(ctrl *gomock.Controller) *MockSortEngine { + mock := &MockSortEngine{ctrl: ctrl} + mock.recorder = &MockSortEngineMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockSortEngine) EXPECT() *MockSortEngineMockRecorder { + return m.recorder +} + +// Add mocks base method. +func (m *MockSortEngine) Add(tableID model.TableID, events ...*model.PolymorphicEvent) error { + m.ctrl.T.Helper() + varargs := []interface{}{tableID} + for _, a := range events { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "Add", varargs...) + ret0, _ := ret[0].(error) + return ret0 +} + +// Add indicates an expected call of Add. +func (mr *MockSortEngineMockRecorder) Add(tableID interface{}, events ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{tableID}, events...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Add", reflect.TypeOf((*MockSortEngine)(nil).Add), varargs...) +} + +// AddTable mocks base method. +func (m *MockSortEngine) AddTable(tableID model.TableID) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "AddTable", tableID) +} + +// AddTable indicates an expected call of AddTable. +func (mr *MockSortEngineMockRecorder) AddTable(tableID interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddTable", reflect.TypeOf((*MockSortEngine)(nil).AddTable), tableID) +} + +// CleanAllTables mocks base method. +func (m *MockSortEngine) CleanAllTables(upperBound engine.Position) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CleanAllTables", upperBound) + ret0, _ := ret[0].(error) + return ret0 +} + +// CleanAllTables indicates an expected call of CleanAllTables. +func (mr *MockSortEngineMockRecorder) CleanAllTables(upperBound interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CleanAllTables", reflect.TypeOf((*MockSortEngine)(nil).CleanAllTables), upperBound) +} + +// CleanByTable mocks base method. +func (m *MockSortEngine) CleanByTable(tableID model.TableID, upperBound engine.Position) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CleanByTable", tableID, upperBound) + ret0, _ := ret[0].(error) + return ret0 +} + +// CleanByTable indicates an expected call of CleanByTable. +func (mr *MockSortEngineMockRecorder) CleanByTable(tableID, upperBound interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CleanByTable", reflect.TypeOf((*MockSortEngine)(nil).CleanByTable), tableID, upperBound) +} + +// Close mocks base method. +func (m *MockSortEngine) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close. +func (mr *MockSortEngineMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockSortEngine)(nil).Close)) +} + +// FetchAllTables mocks base method. +func (m *MockSortEngine) FetchAllTables(lowerBound engine.Position) engine.EventIterator { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FetchAllTables", lowerBound) + ret0, _ := ret[0].(engine.EventIterator) + return ret0 +} + +// FetchAllTables indicates an expected call of FetchAllTables. +func (mr *MockSortEngineMockRecorder) FetchAllTables(lowerBound interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchAllTables", reflect.TypeOf((*MockSortEngine)(nil).FetchAllTables), lowerBound) +} + +// FetchByTable mocks base method. +func (m *MockSortEngine) FetchByTable(tableID model.TableID, lowerBound, upperBound engine.Position) engine.EventIterator { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FetchByTable", tableID, lowerBound, upperBound) + ret0, _ := ret[0].(engine.EventIterator) + return ret0 +} + +// FetchByTable indicates an expected call of FetchByTable. +func (mr *MockSortEngineMockRecorder) FetchByTable(tableID, lowerBound, upperBound interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchByTable", reflect.TypeOf((*MockSortEngine)(nil).FetchByTable), tableID, lowerBound, upperBound) +} + +// GetResolvedTs mocks base method. +func (m *MockSortEngine) GetResolvedTs(tableID model.TableID) model.Ts { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetResolvedTs", tableID) + ret0, _ := ret[0].(model.Ts) + return ret0 +} + +// GetResolvedTs indicates an expected call of GetResolvedTs. +func (mr *MockSortEngineMockRecorder) GetResolvedTs(tableID interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetResolvedTs", reflect.TypeOf((*MockSortEngine)(nil).GetResolvedTs), tableID) +} + +// IsTableBased mocks base method. +func (m *MockSortEngine) IsTableBased() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IsTableBased") + ret0, _ := ret[0].(bool) + return ret0 +} + +// IsTableBased indicates an expected call of IsTableBased. +func (mr *MockSortEngineMockRecorder) IsTableBased() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsTableBased", reflect.TypeOf((*MockSortEngine)(nil).IsTableBased)) +} + +// OnResolve mocks base method. +func (m *MockSortEngine) OnResolve(action func(model.TableID, model.Ts)) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnResolve", action) +} + +// OnResolve indicates an expected call of OnResolve. +func (mr *MockSortEngineMockRecorder) OnResolve(action interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnResolve", reflect.TypeOf((*MockSortEngine)(nil).OnResolve), action) +} + +// RemoveTable mocks base method. +func (m *MockSortEngine) RemoveTable(tableID model.TableID) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "RemoveTable", tableID) +} + +// RemoveTable indicates an expected call of RemoveTable. +func (mr *MockSortEngineMockRecorder) RemoveTable(tableID interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveTable", reflect.TypeOf((*MockSortEngine)(nil).RemoveTable), tableID) +} + +// MockEventIterator is a mock of EventIterator interface. +type MockEventIterator struct { + ctrl *gomock.Controller + recorder *MockEventIteratorMockRecorder +} + +// MockEventIteratorMockRecorder is the mock recorder for MockEventIterator. +type MockEventIteratorMockRecorder struct { + mock *MockEventIterator +} + +// NewMockEventIterator creates a new mock instance. +func NewMockEventIterator(ctrl *gomock.Controller) *MockEventIterator { + mock := &MockEventIterator{ctrl: ctrl} + mock.recorder = &MockEventIteratorMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockEventIterator) EXPECT() *MockEventIteratorMockRecorder { + return m.recorder +} + +// Close mocks base method. +func (m *MockEventIterator) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close. +func (mr *MockEventIteratorMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockEventIterator)(nil).Close)) +} + +// Next mocks base method. +func (m *MockEventIterator) Next() (*model.PolymorphicEvent, engine.Position, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Next") + ret0, _ := ret[0].(*model.PolymorphicEvent) + ret1, _ := ret[1].(engine.Position) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// Next indicates an expected call of Next. +func (mr *MockEventIteratorMockRecorder) Next() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Next", reflect.TypeOf((*MockEventIterator)(nil).Next)) +} diff --git a/scripts/generate-mock.sh b/scripts/generate-mock.sh index 763e3e4e6fc..8072c4c14a5 100755 --- a/scripts/generate-mock.sh +++ b/scripts/generate-mock.sh @@ -31,6 +31,7 @@ fi "$MOCKGEN" -source cdc/processor/manager.go -destination cdc/processor/mock/manager_mock.go "$MOCKGEN" -source cdc/capture/capture.go -destination cdc/capture/mock/capture_mock.go "$MOCKGEN" -source pkg/cmd/factory/factory.go -destination pkg/cmd/factory/mock/factory_mock.go -package mock_factory +"$MOCKGEN" -source cdc/processor/sourcemanager/engine/engine.go -destination cdc/processor/sourcemanager/engine/mock/engine_mock.go # DM mock "$MOCKGEN" -package pbmock -destination dm/pbmock/dmmaster.go github.com/pingcap/tiflow/dm/pb MasterClient,MasterServer From 53dbc7432b96d1e96d07200110ee98eb7fb6d531 Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Tue, 29 Nov 2022 16:51:13 +0800 Subject: [PATCH 4/6] processor/sinkmanager(ticdc): refine import --- cdc/processor/sinkmanager/manager_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cdc/processor/sinkmanager/manager_test.go b/cdc/processor/sinkmanager/manager_test.go index 161b1ddb5fc..682031da53f 100644 --- a/cdc/processor/sinkmanager/manager_test.go +++ b/cdc/processor/sinkmanager/manager_test.go @@ -23,7 +23,7 @@ import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine" "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine/memory" - mock_engine "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine/mock" + mockengine "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine/mock" "github.com/pingcap/tiflow/cdc/processor/tablepb" "github.com/pingcap/tiflow/pkg/config" "github.com/prometheus/client_golang/prometheus" @@ -56,9 +56,9 @@ func createManagerWithMockEngine( changefeedID model.ChangeFeedID, changefeedInfo *model.ChangeFeedInfo, errChan chan error, -) (*SinkManager, *mock_engine.MockSortEngine) { +) (*SinkManager, *mockengine.MockSortEngine) { ctrl := gomock.NewController(t) - sortEngine := mock_engine.NewMockSortEngine(ctrl) + sortEngine := mockengine.NewMockSortEngine(ctrl) manager, err := New( ctx, changefeedID, changefeedInfo, nil, sortEngine, &entry.MockMountGroup{}, From e09c07eef64727334cbb66746e292553235e1f40 Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Wed, 30 Nov 2022 10:49:51 +0800 Subject: [PATCH 5/6] processor/sinkmanager(ticdc): fix test --- cdc/processor/sinkmanager/manager_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdc/processor/sinkmanager/manager_test.go b/cdc/processor/sinkmanager/manager_test.go index 2a9627938c2..078198be4a4 100644 --- a/cdc/processor/sinkmanager/manager_test.go +++ b/cdc/processor/sinkmanager/manager_test.go @@ -75,7 +75,7 @@ func createManagerWithMockEngine( ctrl := gomock.NewController(t) sortEngine := mockengine.NewMockSortEngine(ctrl) manager, err := New( - ctx, changefeedID, changefeedInfo, + ctx, changefeedID, changefeedInfo, upstream.NewUpstream4Test(&mockPD{}), nil, sortEngine, &entry.MockMountGroup{}, errChan, prometheus.NewCounter(prometheus.CounterOpts{})) require.NoError(t, err) From d2757cccaf5e9bf773b3744f8c4eada7d93e7af1 Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Wed, 30 Nov 2022 16:00:11 +0800 Subject: [PATCH 6/6] processor/sourcemanager(ticdc): address comment --- cdc/processor/sinkmanager/manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdc/processor/sinkmanager/manager.go b/cdc/processor/sinkmanager/manager.go index 4c79336c1a8..97176c829fb 100644 --- a/cdc/processor/sinkmanager/manager.go +++ b/cdc/processor/sinkmanager/manager.go @@ -648,7 +648,7 @@ func (m *SinkManager) GetTableStats(tableID model.TableID) pipeline.Stats { select { case m.engineGCChan <- gcEvent: default: - log.Debug("Failed to send GC event to engine GC channel, engine GC channel is full", + log.Warn("Failed to send GC event to engine GC channel, engine GC channel is full", zap.String("namespace", m.changefeedID.Namespace), zap.String("changefeed", m.changefeedID.ID), zap.Int64("tableID", tableID),