diff --git a/data_iterator.go b/data_iterator.go index f3f91de2..7466c2ed 100644 --- a/data_iterator.go +++ b/data_iterator.go @@ -12,7 +12,6 @@ import ( type DataIterator struct { DB *sql.DB - Tables []*schema.Table Concurrency int ErrorHandler ErrorHandler @@ -25,7 +24,7 @@ type DataIterator struct { logger *logrus.Entry } -func (d *DataIterator) Run() { +func (d *DataIterator) Run(tables []*schema.Table) { d.logger = logrus.WithField("tag", "data_iterator") d.targetPKs = &sync.Map{} @@ -36,8 +35,8 @@ func (d *DataIterator) Run() { d.StateTracker = NewStateTracker(0) } - d.logger.WithField("tablesCount", len(d.Tables)).Info("starting data iterator run") - tablesWithData, emptyTables, err := MaxPrimaryKeys(d.DB, d.Tables, d.logger) + d.logger.WithField("tablesCount", len(tables)).Info("starting data iterator run") + tablesWithData, emptyTables, err := MaxPrimaryKeys(d.DB, tables, d.logger) if err != nil { d.ErrorHandler.Fatal("data_iterator", err) } diff --git a/ferry.go b/ferry.go index 1ef42b02..a906039f 100644 --- a/ferry.go +++ b/ferry.go @@ -344,7 +344,6 @@ func (f *Ferry) Start() error { // TODO(pushrax): handle changes to schema during copying and clean this up. f.BinlogStreamer.TableSchema = f.Tables - f.DataIterator.Tables = f.Tables.AsSlice() return nil } @@ -411,7 +410,7 @@ func (f *Ferry) Run() { go func() { defer dataIteratorWg.Done() - f.DataIterator.Run() + f.DataIterator.Run(f.Tables.AsSlice()) }() dataIteratorWg.Wait() @@ -445,11 +444,10 @@ func (f *Ferry) RunStandaloneDataCopy(tables []*schema.Table) error { // will get an error dump even though we should not get one, which could be // misleading. - dataIterator.Tables = tables dataIterator.AddBatchListener(f.BatchWriter.WriteRowBatch) f.logger.WithField("tables", tables).Info("starting delta table copy in cutover") - dataIterator.Run() + dataIterator.Run(tables) return nil } diff --git a/test/go/data_iterator_test.go b/test/go/data_iterator_test.go index 465920cb..79b51233 100644 --- a/test/go/data_iterator_test.go +++ b/test/go/data_iterator_test.go @@ -5,6 +5,7 @@ import ( "sync" "testing" + "github.com/siddontang/go-mysql/schema" "github.com/stretchr/testify/suite" "github.com/Shopify/ghostferry" @@ -16,6 +17,7 @@ type DataIteratorTestSuite struct { di *ghostferry.DataIterator wg *sync.WaitGroup + tables []*schema.Table receivedRows map[string][]ghostferry.RowData } @@ -36,6 +38,8 @@ func (this *DataIteratorTestSuite) SetupTest() { tables, err := ghostferry.LoadTables(sourceDb, tableFilter) this.Require().Nil(err) + this.tables = tables.AsSlice() + config.DataIterationBatchSize = 2 this.di = &ghostferry.DataIterator{ @@ -52,8 +56,6 @@ func (this *DataIteratorTestSuite) SetupTest() { ReadRetries: config.DBReadRetries, }, StateTracker: ghostferry.NewStateTracker(config.DataIterationConcurrency * 10), - - Tables: tables.AsSlice(), } this.receivedRows = make(map[string][]ghostferry.RowData, 0) @@ -69,7 +71,7 @@ func (this *DataIteratorTestSuite) TestNoEventsForEmptyTable() { _, err = this.Ferry.SourceDB.Query(fmt.Sprintf("DELETE FROM `%s`.`%s`", testhelpers.TestSchemaName, testhelpers.TestCompressedTable1Name)) this.Require().Nil(err) - this.di.Run() + this.di.Run(this.tables) this.Require().Equal(0, len(this.receivedRows)) this.Require().Equal( @@ -122,7 +124,7 @@ func (this *DataIteratorTestSuite) TestExistingRowsAreIterated() { this.Require().Equal(0, len(this.receivedRows[testhelpers.TestTable1Name])) this.Require().Equal(0, len(this.receivedRows[testhelpers.TestCompressedTable1Name])) - this.di.Run() + this.di.Run(this.tables) this.Require().Equal(5, len(this.receivedRows[testhelpers.TestTable1Name])) this.Require().Equal(5, len(this.receivedRows[testhelpers.TestCompressedTable1Name])) @@ -154,7 +156,7 @@ func (this *DataIteratorTestSuite) TestDoneListenerGetsNotifiedWhenDone() { return nil }) - this.di.Run() + this.di.Run(this.tables) this.Require().True(wasNotified) }