diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index cee6550d8a7..a58c2763553 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -305,16 +305,7 @@ func (p *processor) IsAddTableFinished(tableID model.TableID, isPrepare bool) bo if p.pullBasedSinking { state, alreadyExist = p.sinkManager.GetTableState(tableID) if alreadyExist { - stats, err := p.sinkManager.GetTableStats(tableID) - if err != nil { - log.Warn("Failed to get table stats", - zap.String("captureID", p.captureInfo.ID), - zap.String("namespace", p.changefeedID.Namespace), - zap.String("changefeed", p.changefeedID.ID), - zap.Int64("tableID", tableID), - zap.Error(err)) - return false - } + stats := p.sinkManager.GetTableStats(tableID) tableResolvedTs = stats.ResolvedTs tableCheckpointTs = stats.CheckpointTs } @@ -389,16 +380,7 @@ func (p *processor) IsRemoveTableFinished(tableID model.TableID) (model.Ts, bool if p.pullBasedSinking { state, alreadyExist = p.sinkManager.GetTableState(tableID) if alreadyExist { - stats, err := p.sinkManager.GetTableStats(tableID) - if err != nil { - log.Warn("Failed to get table stats", - zap.String("captureID", p.captureInfo.ID), - zap.String("namespace", p.changefeedID.Namespace), - zap.String("changefeed", p.changefeedID.ID), - zap.Int64("tableID", tableID), - zap.Error(err)) - return 0, false - } + stats := p.sinkManager.GetTableStats(tableID) tableCheckpointTs = stats.CheckpointTs } } else { @@ -431,17 +413,7 @@ func (p *processor) IsRemoveTableFinished(tableID model.TableID) (model.Ts, bool } if p.pullBasedSinking { - stats, err := p.sinkManager.GetTableStats(tableID) - // TODO: handle error - if err != nil { - log.Warn("Failed to get table stats", - zap.String("captureID", p.captureInfo.ID), - zap.String("namespace", p.changefeedID.Namespace), - zap.String("changefeed", p.changefeedID.ID), - zap.Int64("tableID", tableID), - zap.Error(err)) - return 0, false - } + stats := p.sinkManager.GetTableStats(tableID) p.sourceManager.RemoveTable(tableID) p.sinkManager.RemoveTable(tableID) if p.redoManager.Enabled() { @@ -499,20 +471,7 @@ func (p *processor) GetTableStatus(tableID model.TableID) tablepb.TableStatus { State: tablepb.TableStateAbsent, } } - sinkStats, err := p.sinkManager.GetTableStats(tableID) - // TODO: handle the error - if err != nil { - log.Warn("Failed to get table sinkStats", - zap.String("captureID", p.captureInfo.ID), - zap.String("namespace", p.changefeedID.Namespace), - zap.String("changefeed", p.changefeedID.ID), - zap.Int64("tableID", tableID), - zap.Error(err)) - return tablepb.TableStatus{ - TableID: tableID, - State: tablepb.TableStateAbsent, - } - } + sinkStats := p.sinkManager.GetTableStats(tableID) return tablepb.TableStatus{ TableID: tableID, Checkpoint: tablepb.Checkpoint{ @@ -755,9 +714,7 @@ func (p *processor) tick(ctx cdcContext.Context) error { // it is no need to check the error here, because we will use // local time when an error return, which is acceptable pdTime, _ := p.upstream.PDClock.CurrentTime() - if err := p.handlePosition(oracle.GetPhysical(pdTime)); err != nil { - return errors.Trace(err) - } + p.handlePosition(oracle.GetPhysical(pdTime)) p.doGCSchemaStorage() @@ -1092,7 +1049,7 @@ func (p *processor) sendError(err error) { // resolvedTs = min(schemaStorage's resolvedTs, all table's resolvedTs). // table's resolvedTs = redo's resolvedTs if redo enable, else sorter's resolvedTs. // checkpointTs = min(resolvedTs, all table's checkpointTs). -func (p *processor) handlePosition(currentTs int64) error { +func (p *processor) handlePosition(currentTs int64) { minResolvedTs := uint64(math.MaxUint64) minResolvedTableID := int64(0) if p.schemaStorage != nil { @@ -1103,10 +1060,7 @@ func (p *processor) handlePosition(currentTs int64) error { if p.pullBasedSinking { tableIDs := p.sinkManager.GetAllCurrentTableIDs() for _, tableID := range tableIDs { - stats, err := p.sinkManager.GetTableStats(tableID) - if err != nil { - return errors.Trace(err) - } + stats := p.sinkManager.GetTableStats(tableID) log.Debug("sink manager gets table stats", zap.String("namespace", p.changefeedID.Namespace), zap.String("changefeed", p.changefeedID.ID), @@ -1151,8 +1105,6 @@ func (p *processor) handlePosition(currentTs int64) error { p.checkpointTs = minCheckpointTs p.resolvedTs = minResolvedTs - - return nil } // pushResolvedTs2Table sends global resolved ts to all the table pipelines. @@ -1457,10 +1409,7 @@ func (p *processor) WriteDebugInfo(w io.Writer) error { tables := p.sinkManager.GetAllCurrentTableIDs() for _, tableID := range tables { state, _ := p.sinkManager.GetTableState(tableID) - stats, err := p.sinkManager.GetTableStats(tableID) - if err != nil { - return err - } + stats := p.sinkManager.GetTableStats(tableID) // TODO: add table name. fmt.Fprintf(w, "tableID: %d, resolvedTs: %d, checkpointTs: %d, state: %s\n", tableID, stats.ResolvedTs, stats.CheckpointTs, state) diff --git a/cdc/processor/sinkmanager/manager.go b/cdc/processor/sinkmanager/manager.go index 5be712654d4..97176c829fb 100644 --- a/cdc/processor/sinkmanager/manager.go +++ b/cdc/processor/sinkmanager/manager.go @@ -40,8 +40,14 @@ const ( sinkWorkerNum = 8 redoWorkerNum = 4 defaultGenerateTaskInterval = 100 * time.Millisecond + defaultEngineGCChanSize = 128 ) +type gcEvent struct { + tableID model.TableID + cleanPos engine.Position +} + // SinkManager is the implementation of SinkManager. type SinkManager struct { changefeedID model.ChangeFeedID @@ -74,6 +80,9 @@ type SinkManager struct { // lastBarrierTs is the last barrier ts. lastBarrierTs atomic.Uint64 + // engineGCChan is used to GC engine when the table is advanced. + engineGCChan chan *gcEvent + // sinkWorkers used to pull data from source manager. sinkWorkers []*sinkWorker // sinkTaskChan is used to send tasks to sinkWorkers. @@ -124,6 +133,8 @@ func New( sinkFactory: tableSinkFactory, sortEngine: sortEngine, + engineGCChan: make(chan *gcEvent, defaultEngineGCChanSize), + sinkProgressHeap: newTableProgresses(), sinkWorkers: make([]*sinkWorker, 0, sinkWorkerNum), sinkTaskChan: make(chan *sinkTask), @@ -142,6 +153,7 @@ func New( m.startWorkers(mg, changefeedInfo.Config.Sink.TxnAtomicity.ShouldSplitTxn(), changefeedInfo.Config.EnableOldValue) m.startGenerateTasks() + m.backgroundGC() log.Info("Sink manager is created", zap.String("namespace", changefeedID.Namespace), @@ -255,6 +267,39 @@ func (m *SinkManager) startGenerateTasks() { }() } +// backgroundGC is used to clean up the old data in the sorter. +func (m *SinkManager) backgroundGC() { + m.wg.Add(1) + go func() { + defer m.wg.Done() + for { + select { + case <-m.ctx.Done(): + log.Info("Background GC is stooped because context is canceled", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID)) + return + case gcEvent := <-m.engineGCChan: + if err := m.sortEngine.CleanByTable(gcEvent.tableID, gcEvent.cleanPos); err != nil { + log.Error("Failed to clean table in sort engine", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID), + zap.Int64("tableID", gcEvent.tableID), + zap.Error(err)) + select { + case m.errChan <- err: + default: + log.Error("Failed to send error to error channel, error channel is full", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID), + zap.Error(err)) + } + } + } + } + }() +} + // generateSinkTasks generates tasks to fetch data from the source manager. func (m *SinkManager) generateSinkTasks() error { taskTicker := time.NewTicker(defaultGenerateTaskInterval) @@ -581,7 +626,7 @@ func (m *SinkManager) GetTableState(tableID model.TableID) (tablepb.TableState, } // GetTableStats returns the state of the table. -func (m *SinkManager) GetTableStats(tableID model.TableID) (pipeline.Stats, error) { +func (m *SinkManager) GetTableStats(tableID model.TableID) pipeline.Stats { tableSink, ok := m.tableSinks.Load(tableID) if !ok { log.Panic("Table sink not found when getting table stats", @@ -596,9 +641,18 @@ func (m *SinkManager) GetTableStats(tableID model.TableID) (pipeline.Stats, erro StartTs: resolvedMark - 1, CommitTs: resolvedMark, } - err := m.sortEngine.CleanByTable(tableID, cleanPos) - if err != nil { - return pipeline.Stats{}, errors.Trace(err) + gcEvent := &gcEvent{ + tableID: tableID, + cleanPos: cleanPos, + } + select { + case m.engineGCChan <- gcEvent: + default: + log.Warn("Failed to send GC event to engine GC channel, engine GC channel is full", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID), + zap.Int64("tableID", tableID), + zap.Any("cleanPos", cleanPos)) } var resolvedTs model.Ts // If redo log is enabled, we have to use redo log's resolved ts to calculate processor's min resolved ts. @@ -611,7 +665,7 @@ func (m *SinkManager) GetTableStats(tableID model.TableID) (pipeline.Stats, erro CheckpointTs: resolvedMark, ResolvedTs: resolvedTs, BarrierTs: m.lastBarrierTs.Load(), - }, nil + } } // Close closes all workers. diff --git a/cdc/processor/sinkmanager/manager_test.go b/cdc/processor/sinkmanager/manager_test.go index f0f6e09d0fa..078198be4a4 100644 --- a/cdc/processor/sinkmanager/manager_test.go +++ b/cdc/processor/sinkmanager/manager_test.go @@ -19,10 +19,12 @@ import ( "testing" "time" + "github.com/golang/mock/gomock" "github.com/pingcap/tiflow/cdc/entry" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine" "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine/memory" + mockengine "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine/mock" "github.com/pingcap/tiflow/cdc/processor/tablepb" "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/upstream" @@ -45,7 +47,7 @@ func (p *mockPD) GetTS(_ context.Context) (int64, int64, error) { // nolint:revive // In test it is ok move the ctx to the second parameter. -func createManager( +func createManagerWithMemEngine( t *testing.T, ctx context.Context, changefeedID model.ChangeFeedID, @@ -61,6 +63,25 @@ func createManager( return manager } +// nolint:revive +// In test it is ok move the ctx to the second parameter. +func createManagerWithMockEngine( + t *testing.T, + ctx context.Context, + changefeedID model.ChangeFeedID, + changefeedInfo *model.ChangeFeedInfo, + errChan chan error, +) (*SinkManager, *mockengine.MockSortEngine) { + ctrl := gomock.NewController(t) + sortEngine := mockengine.NewMockSortEngine(ctrl) + manager, err := New( + ctx, changefeedID, changefeedInfo, upstream.NewUpstream4Test(&mockPD{}), + nil, sortEngine, &entry.MockMountGroup{}, + errChan, prometheus.NewCounter(prometheus.CounterOpts{})) + require.NoError(t, err) + return manager, sortEngine +} + func getChangefeedInfo() *model.ChangeFeedInfo { return &model.ChangeFeedInfo{ Error: nil, @@ -139,7 +160,7 @@ func TestAddTable(t *testing.T) { defer cancel() changefeedInfo := getChangefeedInfo() - manager := createManager(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1)) + manager := createManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1)) defer func() { err := manager.Close() require.NoError(t, err) @@ -169,7 +190,7 @@ func TestRemoveTable(t *testing.T) { defer cancel() changefeedInfo := getChangefeedInfo() - manager := createManager(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1)) + manager := createManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1)) defer func() { err := manager.Close() require.NoError(t, err) @@ -211,7 +232,7 @@ func TestUpdateBarrierTs(t *testing.T) { defer cancel() changefeedInfo := getChangefeedInfo() - manager := createManager(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1)) + manager := createManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1)) defer func() { err := manager.Close() require.NoError(t, err) @@ -229,7 +250,7 @@ func TestGenerateTableSinkTaskWithBarrierTs(t *testing.T) { defer cancel() changefeedInfo := getChangefeedInfo() - manager := createManager(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1)) + manager := createManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1)) defer func() { err := manager.Close() require.NoError(t, err) @@ -257,7 +278,7 @@ func TestGenerateTableSinkTaskWithResolvedTs(t *testing.T) { defer cancel() changefeedInfo := getChangefeedInfo() - manager := createManager(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1)) + manager := createManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1)) defer func() { err := manager.Close() require.NoError(t, err) @@ -287,7 +308,7 @@ func TestGetTableStatsToReleaseMemQuota(t *testing.T) { defer cancel() changefeedInfo := getChangefeedInfo() - manager := createManager(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1)) + manager := createManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1)) defer func() { err := manager.Close() require.NoError(t, err) @@ -302,8 +323,7 @@ func TestGetTableStatsToReleaseMemQuota(t *testing.T) { require.NoError(t, err) require.Eventually(t, func() bool { - s, err := manager.GetTableStats(tableID) - require.NoError(t, err) + s := manager.GetTableStats(tableID) return manager.memQuota.getUsedBytes() == 0 && s.CheckpointTs == 4 }, 5*time.Second, 10*time.Millisecond) } @@ -315,7 +335,7 @@ func TestDoNotGenerateTableSinkTaskWhenTableIsNotReplicating(t *testing.T) { defer cancel() changefeedInfo := getChangefeedInfo() - manager := createManager(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1)) + manager := createManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1)) tableID := model.TableID(1) manager.AddTable(tableID, 1, 100) addTableAndAddEventsToSortEngine(t, manager.sortEngine, tableID) @@ -336,8 +356,51 @@ func TestClose(t *testing.T) { defer cancel() changefeedInfo := getChangefeedInfo() - manager := createManager(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1)) + manager := createManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1)) err := manager.Close() require.NoError(t, err) } + +func TestGetTableStats(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + changefeedInfo := getChangefeedInfo() + manager, mockEngine := createManagerWithMockEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1)) + + defer func() { + err := manager.Close() + require.NoError(t, err) + }() + tableID := model.TableID(1) + mockEngine.EXPECT().AddTable(tableID) + mockEngine.EXPECT().Add(gomock.Any(), gomock.Any()).AnyTimes() + mockEngine.EXPECT().FetchByTable(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + mockEngine.EXPECT().GetResolvedTs(tableID).AnyTimes() + + manager.AddTable(tableID, 1, 100) + addTableAndAddEventsToSortEngine(t, manager.sortEngine, tableID) + // This would happen when the table just added to this node and redo log is enabled. + // So there is possibility that the resolved ts is smaller than the global barrier ts. + manager.UpdateBarrierTs(4) + manager.UpdateReceivedSorterResolvedTs(tableID, 3) + manager.StartTable(tableID, 0) + + require.Eventually(t, func() bool { + tableSink, ok := manager.tableSinks.Load(tableID) + require.True(t, ok) + checkpointTS := tableSink.(*tableSinkWrapper).getCheckpointTs() + return checkpointTS.ResolvedMark() == 3 + }, 5*time.Second, 10*time.Millisecond) + + cleanPos := engine.Position{ + StartTs: 2, + CommitTs: 3, + } + mockEngine.EXPECT().CleanByTable(tableID, cleanPos).Times(1) + stats := manager.GetTableStats(tableID) + require.Equal(t, uint64(3), stats.CheckpointTs) +} diff --git a/cdc/processor/sourcemanager/engine/mock/engine_mock.go b/cdc/processor/sourcemanager/engine/mock/engine_mock.go new file mode 100644 index 00000000000..5995b1aabfd --- /dev/null +++ b/cdc/processor/sourcemanager/engine/mock/engine_mock.go @@ -0,0 +1,242 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: cdc/processor/sourcemanager/engine/engine.go + +// Package mock_engine is a generated GoMock package. +package mock_engine + +import ( + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + model "github.com/pingcap/tiflow/cdc/model" + engine "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine" +) + +// MockSortEngine is a mock of SortEngine interface. +type MockSortEngine struct { + ctrl *gomock.Controller + recorder *MockSortEngineMockRecorder +} + +// MockSortEngineMockRecorder is the mock recorder for MockSortEngine. +type MockSortEngineMockRecorder struct { + mock *MockSortEngine +} + +// NewMockSortEngine creates a new mock instance. +func NewMockSortEngine(ctrl *gomock.Controller) *MockSortEngine { + mock := &MockSortEngine{ctrl: ctrl} + mock.recorder = &MockSortEngineMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockSortEngine) EXPECT() *MockSortEngineMockRecorder { + return m.recorder +} + +// Add mocks base method. +func (m *MockSortEngine) Add(tableID model.TableID, events ...*model.PolymorphicEvent) error { + m.ctrl.T.Helper() + varargs := []interface{}{tableID} + for _, a := range events { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "Add", varargs...) + ret0, _ := ret[0].(error) + return ret0 +} + +// Add indicates an expected call of Add. +func (mr *MockSortEngineMockRecorder) Add(tableID interface{}, events ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{tableID}, events...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Add", reflect.TypeOf((*MockSortEngine)(nil).Add), varargs...) +} + +// AddTable mocks base method. +func (m *MockSortEngine) AddTable(tableID model.TableID) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "AddTable", tableID) +} + +// AddTable indicates an expected call of AddTable. +func (mr *MockSortEngineMockRecorder) AddTable(tableID interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddTable", reflect.TypeOf((*MockSortEngine)(nil).AddTable), tableID) +} + +// CleanAllTables mocks base method. +func (m *MockSortEngine) CleanAllTables(upperBound engine.Position) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CleanAllTables", upperBound) + ret0, _ := ret[0].(error) + return ret0 +} + +// CleanAllTables indicates an expected call of CleanAllTables. +func (mr *MockSortEngineMockRecorder) CleanAllTables(upperBound interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CleanAllTables", reflect.TypeOf((*MockSortEngine)(nil).CleanAllTables), upperBound) +} + +// CleanByTable mocks base method. +func (m *MockSortEngine) CleanByTable(tableID model.TableID, upperBound engine.Position) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CleanByTable", tableID, upperBound) + ret0, _ := ret[0].(error) + return ret0 +} + +// CleanByTable indicates an expected call of CleanByTable. +func (mr *MockSortEngineMockRecorder) CleanByTable(tableID, upperBound interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CleanByTable", reflect.TypeOf((*MockSortEngine)(nil).CleanByTable), tableID, upperBound) +} + +// Close mocks base method. +func (m *MockSortEngine) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close. +func (mr *MockSortEngineMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockSortEngine)(nil).Close)) +} + +// FetchAllTables mocks base method. +func (m *MockSortEngine) FetchAllTables(lowerBound engine.Position) engine.EventIterator { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FetchAllTables", lowerBound) + ret0, _ := ret[0].(engine.EventIterator) + return ret0 +} + +// FetchAllTables indicates an expected call of FetchAllTables. +func (mr *MockSortEngineMockRecorder) FetchAllTables(lowerBound interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchAllTables", reflect.TypeOf((*MockSortEngine)(nil).FetchAllTables), lowerBound) +} + +// FetchByTable mocks base method. +func (m *MockSortEngine) FetchByTable(tableID model.TableID, lowerBound, upperBound engine.Position) engine.EventIterator { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FetchByTable", tableID, lowerBound, upperBound) + ret0, _ := ret[0].(engine.EventIterator) + return ret0 +} + +// FetchByTable indicates an expected call of FetchByTable. +func (mr *MockSortEngineMockRecorder) FetchByTable(tableID, lowerBound, upperBound interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchByTable", reflect.TypeOf((*MockSortEngine)(nil).FetchByTable), tableID, lowerBound, upperBound) +} + +// GetResolvedTs mocks base method. +func (m *MockSortEngine) GetResolvedTs(tableID model.TableID) model.Ts { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetResolvedTs", tableID) + ret0, _ := ret[0].(model.Ts) + return ret0 +} + +// GetResolvedTs indicates an expected call of GetResolvedTs. +func (mr *MockSortEngineMockRecorder) GetResolvedTs(tableID interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetResolvedTs", reflect.TypeOf((*MockSortEngine)(nil).GetResolvedTs), tableID) +} + +// IsTableBased mocks base method. +func (m *MockSortEngine) IsTableBased() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IsTableBased") + ret0, _ := ret[0].(bool) + return ret0 +} + +// IsTableBased indicates an expected call of IsTableBased. +func (mr *MockSortEngineMockRecorder) IsTableBased() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsTableBased", reflect.TypeOf((*MockSortEngine)(nil).IsTableBased)) +} + +// OnResolve mocks base method. +func (m *MockSortEngine) OnResolve(action func(model.TableID, model.Ts)) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnResolve", action) +} + +// OnResolve indicates an expected call of OnResolve. +func (mr *MockSortEngineMockRecorder) OnResolve(action interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnResolve", reflect.TypeOf((*MockSortEngine)(nil).OnResolve), action) +} + +// RemoveTable mocks base method. +func (m *MockSortEngine) RemoveTable(tableID model.TableID) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "RemoveTable", tableID) +} + +// RemoveTable indicates an expected call of RemoveTable. +func (mr *MockSortEngineMockRecorder) RemoveTable(tableID interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveTable", reflect.TypeOf((*MockSortEngine)(nil).RemoveTable), tableID) +} + +// MockEventIterator is a mock of EventIterator interface. +type MockEventIterator struct { + ctrl *gomock.Controller + recorder *MockEventIteratorMockRecorder +} + +// MockEventIteratorMockRecorder is the mock recorder for MockEventIterator. +type MockEventIteratorMockRecorder struct { + mock *MockEventIterator +} + +// NewMockEventIterator creates a new mock instance. +func NewMockEventIterator(ctrl *gomock.Controller) *MockEventIterator { + mock := &MockEventIterator{ctrl: ctrl} + mock.recorder = &MockEventIteratorMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockEventIterator) EXPECT() *MockEventIteratorMockRecorder { + return m.recorder +} + +// Close mocks base method. +func (m *MockEventIterator) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close. +func (mr *MockEventIteratorMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockEventIterator)(nil).Close)) +} + +// Next mocks base method. +func (m *MockEventIterator) Next() (*model.PolymorphicEvent, engine.Position, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Next") + ret0, _ := ret[0].(*model.PolymorphicEvent) + ret1, _ := ret[1].(engine.Position) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// Next indicates an expected call of Next. +func (mr *MockEventIteratorMockRecorder) Next() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Next", reflect.TypeOf((*MockEventIterator)(nil).Next)) +} diff --git a/scripts/generate-mock.sh b/scripts/generate-mock.sh index 4324bc131f2..0e46f81ba67 100755 --- a/scripts/generate-mock.sh +++ b/scripts/generate-mock.sh @@ -31,6 +31,7 @@ fi "$MOCKGEN" -source cdc/processor/manager.go -destination cdc/processor/mock/manager_mock.go "$MOCKGEN" -source cdc/capture/capture.go -destination cdc/capture/mock/capture_mock.go "$MOCKGEN" -source pkg/cmd/factory/factory.go -destination pkg/cmd/factory/mock/factory_mock.go -package mock_factory +"$MOCKGEN" -source cdc/processor/sourcemanager/engine/engine.go -destination cdc/processor/sourcemanager/engine/mock/engine_mock.go # DM mock "$MOCKGEN" -package pbmock -destination dm/pbmock/dmmaster.go github.com/pingcap/tiflow/dm/pb MasterClient,MasterServer