Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added a generic binlog catchup process and reconcile the binlogs #88

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 223 additions & 0 deletions binlog_reconciler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
package ghostferry

import (
"database/sql"
"fmt"
"strings"

"github.com/Masterminds/squirrel"
"github.com/siddontang/go-mysql/schema"
"github.com/sirupsen/logrus"
)

// This event generates a REPLACE INTO SQL statement to overwrite a row if an
// INSERT or UPDATE occured. For a DELETE statement, it generates the
// corresponding DELETE statement based on the primary key.
//
// This is used during the resume when we are catching up to the binlog events
// missed by Ghostferry while it is down.
type ReconcilationDMLEvent struct {
*DMLEventBase
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need to embed this? It doesn't make sense to call this struct a *DMLEvent either. It's not really an event itself. You may store the event that produced this as a field on the struct but calling this a DML event itself is very confusing

Is the only goal of this struct to provide a different AsSQLString so that it works as-is with the BinlogWriter? I feel like we need to refactor the current BinlogWriter to become a MultiStatementWriter.

type MultiStatementWriter struct{}  // current BinlogWriter implementation

type BinlogWriter struct{
  writer *MultiStatementWriter
}

type BinlogReconciler struct{
  writer *MultiStatementWriter
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to refactor the BinlogWriter and it turned out to be a bit more difficult and tedious than I would have liked. Since the interface of the DMLEvent is mainly to call AsSQLString, I decided that it's easier to work with this version and refactor later.

newValues RowData
pk uint64
}

func (e *ReconcilationDMLEvent) OldValues() RowData {
return nil
}

func (e *ReconcilationDMLEvent) NewValues() RowData {
return e.newValues
}

func (e *ReconcilationDMLEvent) PK() (uint64, error) {
return e.pk, nil
}

func (e *ReconcilationDMLEvent) AsSQLString(target *schema.Table) (string, error) {
var query string
if e.newValues != nil {
columns, err := loadColumnsForTable(&e.table, e.newValues)
if err != nil {
return "", err
}

query = "REPLACE INTO " +
QuotedTableNameFromString(target.Schema, target.Name) +
" (" + strings.Join(columns, ",") + ")" +
" VALUES (" + buildStringListForValues(e.newValues) + ")"
} else {
pkColumnName := e.TableSchema().GetPKColumn(0).Name
if pkColumnName == "" {
return "", fmt.Errorf("cannot get PK column for table %s", e.Table())
}

pkColumnName = quoteField(pkColumnName)
query = "DELETE FROM " + QuotedTableNameFromString(target.Schema, target.Name) +
" WHERE " + buildStringMapForWhere([]string{pkColumnName}, []interface{}{e.pk})
}

return query, nil
}

func NewReconciliationDMLEvent(table *schema.Table, pk uint64, row RowData) DMLEvent {
return &ReconcilationDMLEvent{
DMLEventBase: &DMLEventBase{table: *table},
pk: pk,
newValues: row,
}
}

// Instead of replacing/deleting every row, we first store all the rows changed
// during the downtime and perform all the operations at the end. This maybe
// faster (trading memory usage tho) and it is crucial for schema changes (as
// we can simply delete a key from this map when we realize a table has
// changed).
type UniqueRowMap map[TableIdentifier]map[uint64]struct{}

func (m UniqueRowMap) AddRow(table *schema.Table, pk uint64) bool {
tableId := NewTableIdentifierFromSchemaTable(table)
if _, exists := m[tableId]; !exists {
m[tableId] = make(map[uint64]struct{})
}

if _, exists := m[tableId][pk]; !exists {
m[tableId][pk] = struct{}{}
return true
}

return false
}

type BinlogReconciler struct {
BatchSize int
TableSchemaCache TableSchemaCache

SourceDB *sql.DB
BinlogWriter *BinlogWriter

modifiedRows UniqueRowMap
logger *logrus.Entry
}

func (r *BinlogReconciler) Initialize() {
r.modifiedRows = make(UniqueRowMap)
r.logger = logrus.WithField("tag", "binlogreconcile")
}

func (r *BinlogReconciler) AddRowsToStore(events []DMLEvent) error {
for _, ev := range events {
pk, err := ev.PK()
if err != nil {
return err
}

r.modifiedRows.AddRow(ev.TableSchema(), pk)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really wanna REPLACE for every table? seems like the usual binlog playing logic should handle most tables?

}

return nil
}

func (r *BinlogReconciler) ReplaceModifiedRowsAfterCatchup() error {
batch := make([]DMLEvent, 0, r.BatchSize)

totalModifiedRows := 0
for _, pkSet := range r.modifiedRows {
totalModifiedRows += len(pkSet)
}

r.logger.WithField("row_count", totalModifiedRows).Info("begin replacing modified rows")

count := 0
for tableId, pkSet := range r.modifiedRows {
table := r.TableSchemaCache.Get(tableId.SchemaName, tableId.TableName)

for pk, _ := range pkSet {
count++

if len(batch) == r.BatchSize {
r.logger.WithField("rows_replaced", count).Debug("replacing batch")
err := r.replaceBatch(batch)
if err != nil {
return err
}

batch = make([]DMLEvent, 0, r.BatchSize)
}

row, err := r.selectRowFromSource(table, pk)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

selecting one row at a time? wouldn't that be very slow for high traffic dbs?

if err != nil {
r.logger.WithError(err).Error("failed to select row from source")
return err
}

batch = append(batch, NewReconciliationDMLEvent(table, pk, row))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait... why do you need ReconcilationDMLEvent at all? can't you re-use RowBatch here?

}
}

if len(batch) > 0 {
err := r.replaceBatch(batch)
if err != nil {
return err
}
}

r.logger.WithField("row_count", totalModifiedRows).Info("replaced modified rows")

return nil
}

func (r *BinlogReconciler) replaceBatch(batch []DMLEvent) error {
err := r.BinlogWriter.WriteEvents(batch)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these are not binlog events. see my comment earlier about MultiStatementWriter

if err != nil {
r.logger.WithError(err).Error("cannot replace batch")
return err
}

return nil
}

func (r *BinlogReconciler) selectRowFromSource(table *schema.Table, pk uint64) (RowData, error) {
quotedPK := quoteField(table.GetPKColumn(0).Name)

query, args, err := squirrel.
Select("*").
From(QuotedTableName(table)).
Where(squirrel.Eq{quotedPK: pk}).ToSql()

if err != nil {
return nil, err
}

// TODO: make this cached for faster reconciliation
stmt, err := r.SourceDB.Prepare(query)
if err != nil {
return nil, err
}

defer stmt.Close()

rows, err := stmt.Query(args...)
if err != nil {
return nil, err
}
defer rows.Close()

var rowData RowData = nil
count := 0

for rows.Next() {
rowData, err = ScanGenericRow(rows, len(table.Columns))
if err != nil {
return nil, err
}

count++
}

if count > 1 {
return nil, fmt.Errorf("multiple rows detected when only one or zero is expected for %s %v", table.String(), pk)
}

return rowData, rows.Err()
}
5 changes: 5 additions & 0 deletions binlog_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,11 @@ func (s *BinlogStreamer) IsAlmostCaughtUp() bool {
return time.Now().Sub(s.lastProcessedEventTime) < caughtUpThreshold
}

func (s *BinlogStreamer) FlushToTargetBinlogPositionAndStop(target mysql.Position) {
s.targetBinlogPosition = target
s.stopRequested = true
}

func (s *BinlogStreamer) FlushAndStop() {
s.logger.Info("requesting binlog streamer to stop")
// Must first read the binlog position before requesting stop
Expand Down
4 changes: 2 additions & 2 deletions binlog_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (b *BinlogWriter) Run() {
}

err := WithRetries(b.WriteRetries, 0, b.logger, "write events to target", func() error {
return b.writeEvents(batch)
return b.WriteEvents(batch)
})
if err != nil {
b.ErrorHandler.Fatal("binlog_writer", err)
Expand All @@ -75,7 +75,7 @@ func (b *BinlogWriter) BufferBinlogEvents(events []DMLEvent) error {
return nil
}

func (b *BinlogWriter) writeEvents(events []DMLEvent) error {
func (b *BinlogWriter) WriteEvents(events []DMLEvent) error {
WaitForThrottle(b.Throttler)

queryBuffer := []byte("BEGIN;\n")
Expand Down
7 changes: 3 additions & 4 deletions data_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (

type DataIterator struct {
DB *sql.DB
Tables []*schema.Table
Concurrency int

ErrorHandler ErrorHandler
Expand All @@ -25,7 +24,7 @@ type DataIterator struct {
logger *logrus.Entry
}

func (d *DataIterator) Run() {
func (d *DataIterator) Run(tables []*schema.Table) {
d.logger = logrus.WithField("tag", "data_iterator")
d.targetPKs = &sync.Map{}

Expand All @@ -36,8 +35,8 @@ func (d *DataIterator) Run() {
d.StateTracker = NewStateTracker(0)
}

d.logger.WithField("tablesCount", len(d.Tables)).Info("starting data iterator run")
tablesWithData, emptyTables, err := MaxPrimaryKeys(d.DB, d.Tables, d.logger)
d.logger.WithField("tablesCount", len(tables)).Info("starting data iterator run")
tablesWithData, emptyTables, err := MaxPrimaryKeys(d.DB, tables, d.logger)
if err != nil {
d.ErrorHandler.Fatal("data_iterator", err)
}
Expand Down
Loading