From 38b631c709ba51c358afacbed4ba1f009f93bfd7 Mon Sep 17 00:00:00 2001 From: Etienne Berube <84141531+EtienneBerubeShopify@users.noreply.github.com> Date: Tue, 17 Aug 2021 16:52:42 -0400 Subject: [PATCH] Changed from using StateTracker to Progress --- ferry.go | 23 ++++++++++++----------- state_tracker.go | 35 ----------------------------------- 2 files changed, 12 insertions(+), 46 deletions(-) diff --git a/ferry.go b/ferry.go index cc9961286..bc82788b6 100644 --- a/ferry.go +++ b/ferry.go @@ -482,8 +482,6 @@ func (f *Ferry) Initialize() (err error) { } } - f.initializeTotalRowsAndBytes() - if f.Config.DataIterationBatchSizePerTableOverride != nil { err = f.Config.DataIterationBatchSizePerTableOverride.UpdateBatchSizes(f.SourceDB, f.Tables) if err != nil { @@ -965,6 +963,7 @@ func (f *Ferry) Progress() *Progress { } tables := f.Tables.AsSlice() + totalRowsPerTable, totalBytesPerTable := f.GetTotalRowsAndBytesMap() for _, table := range tables { var currentAction string @@ -989,8 +988,8 @@ func (f *Ferry) Progress() *Progress { BatchSize: f.DataIterator.CursorConfig.GetBatchSize(table.Schema, table.Name), RowsWritten: rowWrittenStats.NumRows, BytesWritten: rowWrittenStats.NumBytes, - TotalBytes: f.StateTracker.TotalBytesPerTable(tableName), - TotalRows: f.StateTracker.TotalRowsPerTable(tableName), + TotalBytes: totalBytesPerTable[tableName], + TotalRows: totalRowsPerTable[tableName], } } @@ -1163,7 +1162,10 @@ func (f *Ferry) checkSourceForeignKeyConstraints() error { return nil } -func (f *Ferry) initializeTotalRowsAndBytes() { +func (f *Ferry) GetTotalRowsAndBytesMap() (totalRowsPerTable map[string]uint64, totalBytesPerTable map[string]uint64) { + totalRowsPerTable = make(map[string]uint64) + totalBytesPerTable = make(map[string]uint64) + for _, table := range f.Tables { query := fmt.Sprintf(` SELECT table_rows, data_length @@ -1185,14 +1187,13 @@ func (f *Ferry) initializeTotalRowsAndBytes() { err = rows.Scan(&totalRows, &totalBytes) if err != nil { - f.StateTracker.UpdateTotalBytesPerTable(table.Name, 0) - f.StateTracker.UpdateTotalRowsPerTable(table.Name, 0) + totalRowsPerTable[table.String()] = 0 + totalBytesPerTable[table.String()] = 0 } else { - f.StateTracker.UpdateTotalBytesPerTable(table.String(), uint64(totalBytes)) - f.StateTracker.UpdateTotalRowsPerTable(table.String(), uint64(totalRows)) + totalRowsPerTable[table.String()] = uint64(totalRows) + totalBytesPerTable[table.String()] = uint64(totalBytes) } - } } - + return totalRowsPerTable, totalBytesPerTable } diff --git a/state_tracker.go b/state_tracker.go index 922f3807c..f346a5c58 100644 --- a/state_tracker.go +++ b/state_tracker.go @@ -40,8 +40,6 @@ type SerializableState struct { BinlogVerifyStore BinlogVerifySerializedStore LastStoredBinlogPositionForInlineVerifier mysql.Position LastStoredBinlogPositionForTargetVerifier mysql.Position - TotalRowsPerTable map[string]uint64 `json:",omitempty"` - TotalBytesPerTable map[string]uint64 `json:",omitempty"` } func (s *SerializableState) MinSourceBinlogPosition() mysql.Position { @@ -101,8 +99,6 @@ type StateTracker struct { // as it confuses the focus of this struct. iterationSpeedLog *ring.Ring rowStatsWrittenPerTable map[string]RowStats - totalRowsPerTable map[string]uint64 - totalBytesPerTable map[string]uint64 } func NewStateTracker(speedLogCount int) *StateTracker { @@ -114,8 +110,6 @@ func NewStateTracker(speedLogCount int) *StateTracker { completedTables: make(map[string]bool), iterationSpeedLog: newSpeedLogRing(speedLogCount), rowStatsWrittenPerTable: make(map[string]RowStats), - totalRowsPerTable: make(map[string]uint64), - totalBytesPerTable: make(map[string]uint64), } } @@ -128,28 +122,9 @@ func NewStateTrackerFromSerializedState(speedLogCount int, serializedState *Seri s.lastWrittenBinlogPosition = serializedState.LastWrittenBinlogPosition s.lastStoredBinlogPositionForInlineVerifier = serializedState.LastStoredBinlogPositionForInlineVerifier s.lastStoredBinlogPositionForTargetVerifier = serializedState.LastStoredBinlogPositionForTargetVerifier - - // TODO: talk with shuhao on serialized states - s.totalBytesPerTable = serializedState.TotalBytesPerTable - s.totalRowsPerTable = serializedState.TotalRowsPerTable return s } -func (s *StateTracker) UpdateTotalRowsPerTable(table string, totalRows uint64) { - s.CopyRWMutex.RLock() - defer s.CopyRWMutex.RUnlock() - - s.totalRowsPerTable[table] = totalRows - -} - -func (s *StateTracker) UpdateTotalBytesPerTable(table string, totalBytes uint64) { - s.CopyRWMutex.RLock() - defer s.CopyRWMutex.RUnlock() - - s.totalBytesPerTable[table] = totalBytes -} - func (s *StateTracker) UpdateLastResumableSourceBinlogPosition(pos mysql.Position) { s.BinlogRWMutex.Lock() defer s.BinlogRWMutex.Unlock() @@ -187,14 +162,6 @@ func (s *StateTracker) UpdateLastSuccessfulPaginationKey(table string, paginatio s.updateSpeedLog(deltaPaginationKey) } -func (s StateTracker) TotalRowsPerTable(table string) uint64 { - return s.totalRowsPerTable[table] -} - -func (s StateTracker) TotalBytesPerTable(table string) uint64 { - return s.totalBytesPerTable[table] -} - func (s *StateTracker) RowStatsWrittenPerTable() map[string]RowStats { s.CopyRWMutex.RLock() defer s.CopyRWMutex.RUnlock() @@ -301,8 +268,6 @@ func (s *StateTracker) Serialize(lastKnownTableSchemaCache TableSchemaCache, bin LastWrittenBinlogPosition: s.lastWrittenBinlogPosition, LastStoredBinlogPositionForInlineVerifier: s.lastStoredBinlogPositionForInlineVerifier, LastStoredBinlogPositionForTargetVerifier: s.lastStoredBinlogPositionForTargetVerifier, - TotalRowsPerTable: s.totalRowsPerTable, - TotalBytesPerTable: s.totalBytesPerTable, } if binlogVerifyStore != nil {