Skip to content

Commit

Permalink
Fix resuming binlog streaming #156
Browse files Browse the repository at this point in the history
A mysql replication event for changing data is always started by
sending a TableMapEvent (describing the table to be changed),
followed by one or more RowsEvent (containing the data to be
changed). If multiple consecutive RowsEvent events are sent for the
same table, the TableMapEvent is typically skipped (after sending it
once), meaning that resuming from such a binlog is not possible.

Resuming at a binlog position between the TableMapEvent and a
RowsEvent is not possible, as the binlog streamer would miss the
table definition for following DML statements.

This commit tracks the position of the most recently processed
TableMapEvent and uses this for resuming (skipping any actual events
between the last write and resume position that were already
processed).

Change-Id: I2bef401ba4f1c0d5f50f177f48c2e866bb411f5d
  • Loading branch information
kolbitsch-lastline committed Mar 22, 2020
1 parent 76555c1 commit fb6b953
Show file tree
Hide file tree
Showing 10 changed files with 192 additions and 87 deletions.
147 changes: 122 additions & 25 deletions binlog_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,66 @@ import (

const caughtUpThreshold = 10 * time.Second

type BinlogStreamer struct {
DB *sql.DB
DBConfig *DatabaseConfig
MyServerId uint32
ErrorHandler ErrorHandler
Filter CopyFilter
type BinlogPosition struct {
// A binlog position emitted by the binlog-streamer consists of two parts:
// First, the last emitted event position, which refers to the event that
// we received from the MySQL master and that we hand to clients. Second,
// a position from which we can resume a binlog-streamer.
// Ideally, these two values would be the same, but in reality they are
// not, because some events are streamed in a series (e.g. DML events
// require a table-map events to be seen before).
// As a result, we always stream event positions as a pair - if a binlog
// streamer is resumed from an event that is not safe to resume from, we
// resume from the most recent (earlier) event from which we can safely
// resume and simply suppress emitting these events up to the point of the
// last event returned.
//
// the actual binlog position of an event emitted by the streamer
EventPosition mysql.Position
// the position from which one needs to point the streamer if we want to
// resume from after this event
ResumePosition mysql.Position
}

func NewResumableBinlogPosition(pos mysql.Position) BinlogPosition {
return BinlogPosition{pos, pos}
}

TableSchema TableSchemaCache
func (p BinlogPosition) Compare(o BinlogPosition) int {
// comparison always happens on the actual event
return p.EventPosition.Compare(o.EventPosition)
}

binlogSyncer *replication.BinlogSyncer
binlogStreamer *replication.BinlogStreamer
lastStreamedBinlogPosition mysql.Position
targetBinlogPosition mysql.Position
lastProcessedEventTime time.Time
lastLagMetricEmittedTime time.Time
func (b BinlogPosition) String() string {
return fmt.Sprintf("Position(event %s, resume at %s)", b.EventPosition, b.ResumePosition)
}

type BinlogStreamer struct {
DB *sql.DB
DBConfig *DatabaseConfig
MyServerId uint32
ErrorHandler ErrorHandler
Filter CopyFilter

TableSchema TableSchemaCache

binlogSyncer *replication.BinlogSyncer
binlogStreamer *replication.BinlogStreamer
// what is the last event that we ever received from the streamer
lastStreamedBinlogPosition mysql.Position
// what is the last event that we received and from which it is possible
// to resume
lastResumeBinlogPosition mysql.Position
// if we have resumed from an earlier position than where we last streamed
// to (that is, if lastResumeBinlogPosition is before
// lastStreamedBinlogPosition when resuming), up to what event should we
// suppress emitting events
suppressEmitUpToBinlogPosition mysql.Position
// up to what position to we want to continue streaming (if a stop was
// requested)
targetBinlogPosition mysql.Position
lastProcessedEventTime time.Time
lastLagMetricEmittedTime time.Time

stopRequested bool

Expand Down Expand Up @@ -77,40 +122,49 @@ func (s *BinlogStreamer) createBinlogSyncer() error {
return nil
}

func (s *BinlogStreamer) ConnectBinlogStreamerToMysql() (mysql.Position, error) {
func (s *BinlogStreamer) ConnectBinlogStreamerToMysql() (BinlogPosition, error) {
s.ensureLogger()

currentPosition, err := ShowMasterStatusBinlogPosition(s.DB)
if err != nil {
s.logger.WithError(err).Error("failed to read current binlog position")
return mysql.Position{}, err
return BinlogPosition{}, err
}

return s.ConnectBinlogStreamerToMysqlFrom(currentPosition)
return s.ConnectBinlogStreamerToMysqlFrom(NewResumableBinlogPosition(currentPosition))
}

func (s *BinlogStreamer) ConnectBinlogStreamerToMysqlFrom(startFromBinlogPosition mysql.Position) (mysql.Position, error) {
func (s *BinlogStreamer) ConnectBinlogStreamerToMysqlFrom(startFromBinlogPosition BinlogPosition) (BinlogPosition, error) {
s.ensureLogger()

err := s.createBinlogSyncer()
if err != nil {
return mysql.Position{}, err
return BinlogPosition{}, err
}

s.lastStreamedBinlogPosition = startFromBinlogPosition
if startFromBinlogPosition.EventPosition.Compare(startFromBinlogPosition.ResumePosition) < 0 {
err = fmt.Errorf("invalid resume position %s: last event must not be before resume position", startFromBinlogPosition)
return BinlogPosition{}, err
}

s.lastStreamedBinlogPosition = startFromBinlogPosition.EventPosition
s.suppressEmitUpToBinlogPosition = startFromBinlogPosition.EventPosition
s.lastResumeBinlogPosition = startFromBinlogPosition.ResumePosition

s.logger.WithFields(logrus.Fields{
"file": s.lastStreamedBinlogPosition.Name,
"pos": s.lastStreamedBinlogPosition.Pos,
"stream.file": s.lastStreamedBinlogPosition.Name,
"stream.pos": s.lastStreamedBinlogPosition.Pos,
"resume.file": s.lastResumeBinlogPosition.Name,
"resume.pos": s.lastResumeBinlogPosition.Pos,
}).Info("starting binlog streaming")

s.binlogStreamer, err = s.binlogSyncer.StartSync(s.lastStreamedBinlogPosition)
s.binlogStreamer, err = s.binlogSyncer.StartSync(s.lastResumeBinlogPosition)
if err != nil {
s.logger.WithError(err).Error("unable to start binlog streamer")
return mysql.Position{}, err
return BinlogPosition{}, err
}

return s.lastStreamedBinlogPosition, err
return startFromBinlogPosition, err
}

func (s *BinlogStreamer) Run() {
Expand Down Expand Up @@ -233,13 +287,44 @@ func (s *BinlogStreamer) updateLastStreamedPosAndTime(ev *replication.BinlogEven
eventTime := time.Unix(int64(ev.Header.Timestamp), 0)
s.lastProcessedEventTime = eventTime

if resumablePosition, evIsResumable := s.getResumePositionForEvent(ev); evIsResumable {
s.lastResumeBinlogPosition = resumablePosition
}

if time.Since(s.lastLagMetricEmittedTime) >= time.Second {
lag := time.Since(eventTime)
metrics.Gauge("BinlogStreamer.Lag", lag.Seconds(), nil, 1.0)
s.lastLagMetricEmittedTime = time.Now()
}
}

func (s *BinlogStreamer) getResumePositionForEvent(ev *replication.BinlogEvent) (resumablePosition mysql.Position, evIsResumable bool) {
// resuming from a RowsEvent is not possible, as it may be followed by
// another rows-event without a subsequent TableMapEvent. Thus, if we have
// a rows-event, we need to keep resuming from whatever last non-rows-event
//
// The same is true for TableMapEvents themselves, as we cannot resume right
// after the event: we need to re-stream the event itself to get ready for
// a RowsEvent
switch ev.Event.(type) {
case *replication.RowsEvent, *replication.TableMapEvent:
// it's not resumable - we need to return whatever was save to resume
// from before
resumablePosition = s.lastResumeBinlogPosition
default:
// it is safe to resume from here
evIsResumable = true
resumablePosition = mysql.Position{
// The filename is only changed and visible during the RotateEvent, which
// is handled transparently in Run().
Name: s.lastStreamedBinlogPosition.Name,
Pos: ev.Header.LogPos,
}
}

return
}

func (s *BinlogStreamer) handleRowsEvent(ev *replication.BinlogEvent) error {
eventTime := time.Unix(int64(ev.Header.Timestamp), 0)
rowsEvent := ev.Event.(*replication.RowsEvent)
Expand All @@ -256,12 +341,24 @@ func (s *BinlogStreamer) handleRowsEvent(ev *replication.BinlogEvent) error {
Pos: ev.Header.LogPos,
}

// we may still be searching for the first event to stream to listeners, if
// we resumed reading upstream events from an earlier event
if pos.Compare(s.suppressEmitUpToBinlogPosition) <= 0 {
return nil
}

resumePosition, _ := s.getResumePositionForEvent(ev)
binlogPosition := BinlogPosition{
EventPosition: pos,
ResumePosition: resumePosition,
}

table := s.TableSchema.Get(string(rowsEvent.Table.Schema), string(rowsEvent.Table.Table))
if table == nil {
return nil
}

dmlEvs, err := NewBinlogDMLEvents(table, ev, pos)
dmlEvs, err := NewBinlogDMLEvents(table, ev, binlogPosition)
if err != nil {
return err
}
Expand Down
15 changes: 7 additions & 8 deletions dml_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"github.com/shopspring/decimal"

"github.com/siddontang/go-mysql/mysql"
"github.com/siddontang/go-mysql/replication"
"github.com/siddontang/go-mysql/schema"
)
Expand Down Expand Up @@ -43,13 +42,13 @@ type DMLEvent interface {
OldValues() RowData
NewValues() RowData
PaginationKey() (uint64, error)
BinlogPosition() mysql.Position
BinlogPosition() BinlogPosition
}

// The base of DMLEvent to provide the necessary methods.
type DMLEventBase struct {
table *TableSchema
pos mysql.Position
pos BinlogPosition
}

func (e *DMLEventBase) Database() string {
Expand All @@ -64,7 +63,7 @@ func (e *DMLEventBase) TableSchema() *TableSchema {
return e.table
}

func (e *DMLEventBase) BinlogPosition() mysql.Position {
func (e *DMLEventBase) BinlogPosition() BinlogPosition {
return e.pos
}

Expand All @@ -73,7 +72,7 @@ type BinlogInsertEvent struct {
*DMLEventBase
}

func NewBinlogInsertEvents(table *TableSchema, rowsEvent *replication.RowsEvent, pos mysql.Position) ([]DMLEvent, error) {
func NewBinlogInsertEvents(table *TableSchema, rowsEvent *replication.RowsEvent, pos BinlogPosition) ([]DMLEvent, error) {
insertEvents := make([]DMLEvent, len(rowsEvent.Rows))

for i, row := range rowsEvent.Rows {
Expand Down Expand Up @@ -117,7 +116,7 @@ type BinlogUpdateEvent struct {
*DMLEventBase
}

func NewBinlogUpdateEvents(table *TableSchema, rowsEvent *replication.RowsEvent, pos mysql.Position) ([]DMLEvent, error) {
func NewBinlogUpdateEvents(table *TableSchema, rowsEvent *replication.RowsEvent, pos BinlogPosition) ([]DMLEvent, error) {
// UPDATE events have two rows in the RowsEvent. The first row is the
// entries of the old record (for WHERE) and the second row is the
// entries of the new record (for SET).
Expand Down Expand Up @@ -177,7 +176,7 @@ func (e *BinlogDeleteEvent) NewValues() RowData {
return nil
}

func NewBinlogDeleteEvents(table *TableSchema, rowsEvent *replication.RowsEvent, pos mysql.Position) ([]DMLEvent, error) {
func NewBinlogDeleteEvents(table *TableSchema, rowsEvent *replication.RowsEvent, pos BinlogPosition) ([]DMLEvent, error) {
deleteEvents := make([]DMLEvent, len(rowsEvent.Rows))

for i, row := range rowsEvent.Rows {
Expand Down Expand Up @@ -205,7 +204,7 @@ func (e *BinlogDeleteEvent) PaginationKey() (uint64, error) {
return paginationKeyFromEventData(e.table, e.oldValues)
}

func NewBinlogDMLEvents(table *TableSchema, ev *replication.BinlogEvent, pos mysql.Position) ([]DMLEvent, error) {
func NewBinlogDMLEvents(table *TableSchema, ev *replication.BinlogEvent, pos BinlogPosition) ([]DMLEvent, error) {
rowsEvent := ev.Event.(*replication.RowsEvent)

for _, row := range rowsEvent.Rows {
Expand Down
8 changes: 4 additions & 4 deletions ferry.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ func (f *Ferry) Start() error {
// miss some records that are inserted between the time the
// DataIterator determines the range of IDs to copy and the time that
// the starting binlog coordinates are determined.
var pos siddontangmysql.Position
var pos BinlogPosition
var err error
if f.StateToResumeFrom != nil {
pos, err = f.BinlogStreamer.ConnectBinlogStreamerToMysqlFrom(f.StateToResumeFrom.MinBinlogPosition())
Expand All @@ -504,9 +504,9 @@ func (f *Ferry) Start() error {
return err
}

// If we don't set this now, there is a race condition where Ghostferry
// If we don't set this now, there is a race condition where ghostferry
// is terminated with some rows copied but no binlog events are written.
// This guarentees that we are able to restart from a valid location.
// This guarantees that we are able to restart from a valid location.
f.StateTracker.UpdateLastWrittenBinlogPosition(pos)
if f.inlineVerifier != nil {
f.StateTracker.UpdateLastStoredBinlogPositionForInlineVerifier(pos)
Expand Down Expand Up @@ -749,7 +749,7 @@ func (f *Ferry) Progress() *Progress {
s.Throttled = f.Throttler.Throttled()

// Binlog Progress
s.LastSuccessfulBinlogPos = f.BinlogStreamer.lastStreamedBinlogPosition
s.LastSuccessfulBinlogPos = f.BinlogStreamer.GetLastStreamedBinlogPosition()
s.BinlogStreamerLag = time.Now().Sub(f.BinlogStreamer.lastProcessedEventTime).Seconds()
s.FinalBinlogPos = f.BinlogStreamer.targetBinlogPosition

Expand Down
5 changes: 2 additions & 3 deletions sharding/test/copy_filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"github.com/Shopify/ghostferry"
"github.com/Shopify/ghostferry/sharding"

"github.com/siddontang/go-mysql/mysql"
"github.com/siddontang/go-mysql/replication"
"github.com/siddontang/go-mysql/schema"
"github.com/stretchr/testify/suite"
Expand Down Expand Up @@ -142,15 +141,15 @@ func (t *CopyFilterTestSuite) TestShardingValueTypes() {
}

for _, tenantId := range tenantIds {
dmlEvents, _ := ghostferry.NewBinlogInsertEvents(t.normalTable, t.newRowsEvent([]interface{}{1001, tenantId, "data"}), mysql.Position{})
dmlEvents, _ := ghostferry.NewBinlogInsertEvents(t.normalTable, t.newRowsEvent([]interface{}{1001, tenantId, "data"}), ghostferry.BinlogPosition{})
applicable, err := t.filter.ApplicableEvent(dmlEvents[0])
t.Require().Nil(err)
t.Require().True(applicable, fmt.Sprintf("value %t wasn't applicable", tenantId))
}
}

func (t *CopyFilterTestSuite) TestInvalidShardingValueTypesErrors() {
dmlEvents, err := ghostferry.NewBinlogInsertEvents(t.normalTable, t.newRowsEvent([]interface{}{1001, string("1"), "data"}), mysql.Position{})
dmlEvents, err := ghostferry.NewBinlogInsertEvents(t.normalTable, t.newRowsEvent([]interface{}{1001, string("1"), "data"}), ghostferry.BinlogPosition{})
_, err = t.filter.ApplicableEvent(dmlEvents[0])
t.Require().Equal("parsing new sharding key: invalid type %!t(string=1)", err.Error())
}
Expand Down
18 changes: 8 additions & 10 deletions state_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"math"
"sync"
"time"

"github.com/siddontang/go-mysql/mysql"
)

// StateTracker design
Expand Down Expand Up @@ -36,13 +34,13 @@ type SerializableState struct {

LastSuccessfulPaginationKeys map[string]uint64
CompletedTables map[string]bool
LastWrittenBinlogPosition mysql.Position
LastStoredBinlogPositionForInlineVerifier mysql.Position
LastWrittenBinlogPosition BinlogPosition
LastStoredBinlogPositionForInlineVerifier BinlogPosition
BinlogVerifyStore BinlogVerifySerializedStore
}

func (s *SerializableState) MinBinlogPosition() mysql.Position {
nilPosition := mysql.Position{}
func (s *SerializableState) MinBinlogPosition() BinlogPosition {
nilPosition := BinlogPosition{}
if s.LastWrittenBinlogPosition == nilPosition {
return s.LastStoredBinlogPositionForInlineVerifier
}
Expand Down Expand Up @@ -82,8 +80,8 @@ type StateTracker struct {
BinlogRWMutex *sync.RWMutex
CopyRWMutex *sync.RWMutex

lastWrittenBinlogPosition mysql.Position
lastStoredBinlogPositionForInlineVerifier mysql.Position
lastWrittenBinlogPosition BinlogPosition
lastStoredBinlogPositionForInlineVerifier BinlogPosition

lastSuccessfulPaginationKeys map[string]uint64
completedTables map[string]bool
Expand Down Expand Up @@ -113,14 +111,14 @@ func NewStateTrackerFromSerializedState(speedLogCount int, serializedState *Seri
return s
}

func (s *StateTracker) UpdateLastWrittenBinlogPosition(pos mysql.Position) {
func (s *StateTracker) UpdateLastWrittenBinlogPosition(pos BinlogPosition) {
s.BinlogRWMutex.Lock()
defer s.BinlogRWMutex.Unlock()

s.lastWrittenBinlogPosition = pos
}

func (s *StateTracker) UpdateLastStoredBinlogPositionForInlineVerifier(pos mysql.Position) {
func (s *StateTracker) UpdateLastStoredBinlogPositionForInlineVerifier(pos BinlogPosition) {
s.BinlogRWMutex.Lock()
defer s.BinlogRWMutex.Unlock()

Expand Down
Loading

0 comments on commit fb6b953

Please sign in to comment.