Skip to content

Commit

Permalink
DataIterator.Run(tables)
Browse files Browse the repository at this point in the history
Instead of passing the Table as a struct member, this makes it pass by
argument in the Run function. It makes more sense as it's not really a
configuration on how to Run the DataIterator, but what to run the
DataIterator on.

This also makes the DataIterator slightly easier to use independently of
the Ferry.
  • Loading branch information
shuhaowu committed Jan 21, 2019
1 parent 07831b1 commit 9d2d676
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 13 deletions.
7 changes: 3 additions & 4 deletions data_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (

type DataIterator struct {
DB *sql.DB
Tables []*schema.Table
Concurrency int

ErrorHandler ErrorHandler
Expand All @@ -25,7 +24,7 @@ type DataIterator struct {
logger *logrus.Entry
}

func (d *DataIterator) Run() {
func (d *DataIterator) Run(tables []*schema.Table) {
d.logger = logrus.WithField("tag", "data_iterator")
d.targetPKs = &sync.Map{}

Expand All @@ -36,8 +35,8 @@ func (d *DataIterator) Run() {
d.StateTracker = NewStateTracker(0)
}

d.logger.WithField("tablesCount", len(d.Tables)).Info("starting data iterator run")
tablesWithData, emptyTables, err := MaxPrimaryKeys(d.DB, d.Tables, d.logger)
d.logger.WithField("tablesCount", len(tables)).Info("starting data iterator run")
tablesWithData, emptyTables, err := MaxPrimaryKeys(d.DB, tables, d.logger)
if err != nil {
d.ErrorHandler.Fatal("data_iterator", err)
}
Expand Down
6 changes: 2 additions & 4 deletions ferry.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,6 @@ func (f *Ferry) Start() error {

// TODO(pushrax): handle changes to schema during copying and clean this up.
f.BinlogStreamer.TableSchema = f.Tables
f.DataIterator.Tables = f.Tables.AsSlice()

return nil
}
Expand Down Expand Up @@ -411,7 +410,7 @@ func (f *Ferry) Run() {

go func() {
defer dataIteratorWg.Done()
f.DataIterator.Run()
f.DataIterator.Run(f.Tables.AsSlice())
}()

dataIteratorWg.Wait()
Expand Down Expand Up @@ -445,11 +444,10 @@ func (f *Ferry) RunStandaloneDataCopy(tables []*schema.Table) error {
// will get an error dump even though we should not get one, which could be
// misleading.

dataIterator.Tables = tables
dataIterator.AddBatchListener(f.BatchWriter.WriteRowBatch)
f.logger.WithField("tables", tables).Info("starting delta table copy in cutover")

dataIterator.Run()
dataIterator.Run(tables)

return nil
}
Expand Down
12 changes: 7 additions & 5 deletions test/go/data_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sync"
"testing"

"github.com/siddontang/go-mysql/schema"
"github.com/stretchr/testify/suite"

"github.com/Shopify/ghostferry"
Expand All @@ -16,6 +17,7 @@ type DataIteratorTestSuite struct {

di *ghostferry.DataIterator
wg *sync.WaitGroup
tables []*schema.Table
receivedRows map[string][]ghostferry.RowData
}

Expand All @@ -36,6 +38,8 @@ func (this *DataIteratorTestSuite) SetupTest() {
tables, err := ghostferry.LoadTables(sourceDb, tableFilter)
this.Require().Nil(err)

this.tables = tables.AsSlice()

config.DataIterationBatchSize = 2

this.di = &ghostferry.DataIterator{
Expand All @@ -52,8 +56,6 @@ func (this *DataIteratorTestSuite) SetupTest() {
ReadRetries: config.DBReadRetries,
},
StateTracker: ghostferry.NewStateTracker(config.DataIterationConcurrency * 10),

Tables: tables.AsSlice(),
}

this.receivedRows = make(map[string][]ghostferry.RowData, 0)
Expand All @@ -69,7 +71,7 @@ func (this *DataIteratorTestSuite) TestNoEventsForEmptyTable() {
_, err = this.Ferry.SourceDB.Query(fmt.Sprintf("DELETE FROM `%s`.`%s`", testhelpers.TestSchemaName, testhelpers.TestCompressedTable1Name))
this.Require().Nil(err)

this.di.Run()
this.di.Run(this.tables)

this.Require().Equal(0, len(this.receivedRows))
this.Require().Equal(
Expand Down Expand Up @@ -122,7 +124,7 @@ func (this *DataIteratorTestSuite) TestExistingRowsAreIterated() {
this.Require().Equal(0, len(this.receivedRows[testhelpers.TestTable1Name]))
this.Require().Equal(0, len(this.receivedRows[testhelpers.TestCompressedTable1Name]))

this.di.Run()
this.di.Run(this.tables)

this.Require().Equal(5, len(this.receivedRows[testhelpers.TestTable1Name]))
this.Require().Equal(5, len(this.receivedRows[testhelpers.TestCompressedTable1Name]))
Expand Down Expand Up @@ -154,7 +156,7 @@ func (this *DataIteratorTestSuite) TestDoneListenerGetsNotifiedWhenDone() {
return nil
})

this.di.Run()
this.di.Run(this.tables)

this.Require().True(wasNotified)
}
Expand Down

0 comments on commit 9d2d676

Please sign in to comment.