Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

syncer: fix syncer gtid auto switch from off to on #1723

Merged
merged 19 commits into from
Jun 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions dm/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,10 @@ func (s *Server) doStartKeepAlive() {
}

func (s *Server) stopKeepAlive() {
s.kaCancel()
s.kaWg.Wait()
if s.kaCancel != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in which case stopKeepAlive is called before or concurrent with startKeepAlive? we may need a lock if so 😂

Copy link
Contributor Author

@lichunzhu lichunzhu May 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. dm-worker/main.go s.Start()
  2. Start() return an error before s.startKeepAlive(). e.g.: worker port already in use
  3. dm-worker/main.go s.Close() -> s.stopKeepAlive(), report a nil pointer info in stdout

s.kaCancel()
s.kaWg.Wait()
}
}

func (s *Server) restartKeepAlive() {
Expand Down
28 changes: 28 additions & 0 deletions pkg/binlog/position.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,34 @@ func CompareLocation(location1, location2 Location, cmpGTID bool) int {
return compareIndex(location1.Suffix, location2.Suffix)
}

// IsFreshPosition returns true when location1 is a fresh location without any info.
func IsFreshPosition(location1 Location, flavor string, cmpGTID bool) bool {
location2 := NewLocation(flavor)
if cmpGTID {
cmp, canCmp := CompareGTID(location1.gtidSet, location2.gtidSet)
if canCmp {
if cmp != 0 {
return cmp <= 0
}
// not supposed to happen, for safety here.
if location1.gtidSet != nil && location1.gtidSet.String() != "" {
return false
}
// empty GTIDSet, then compare by position
log.L().Warn("both gtidSets are empty, will compare by position", zap.Stringer("location1", location1), zap.Stringer("location2", location2))
} else {
// if can't compare by GTIDSet, then compare by position
log.L().Warn("gtidSet can't be compared, will compare by position", zap.Stringer("location1", location1), zap.Stringer("location2", location2))
}
}

cmp := ComparePosition(location1.Position, location2.Position)
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
if cmp != 0 {
return cmp <= 0
}
return compareIndex(location1.Suffix, location2.Suffix) <= 0
}

// CompareGTID returns:
// 1, true if gSet1 is bigger than gSet2
// 0, true if gSet1 is equal to gSet2
Expand Down
86 changes: 86 additions & 0 deletions pkg/binlog/position_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -794,3 +794,89 @@ func (t *testPositionSuite) TestExtractSuffix(c *C) {
c.Assert(suffix, Equals, tc.suffix)
}
}

func (t *testPositionSuite) TestIsFreshPosition(c *C) {
mysqlPos := gmysql.Position{
Name: "mysql-binlog.00001",
Pos: 123,
}
mysqlGTIDSet, err := gtid.ParserGTID(gmysql.MySQLFlavor, "e8e592a6-7a59-11eb-8da1-0242ac110002:1-36")
c.Assert(err, IsNil)
mariaGTIDSet, err := gtid.ParserGTID(gmysql.MariaDBFlavor, "0-1001-233")
c.Assert(err, IsNil)
testCases := []struct {
loc Location
flavor string
cmpGTID bool
fresh bool
}{
{
InitLocation(mysqlPos, mysqlGTIDSet),
gmysql.MySQLFlavor,
true,
false,
},
{
InitLocation(mysqlPos, gtid.MinGTIDSet(gmysql.MySQLFlavor)),
gmysql.MySQLFlavor,
true,
false,
},
{

InitLocation(MinPosition, mysqlGTIDSet),
gmysql.MySQLFlavor,
true,
false,
},
{
InitLocation(MinPosition, mysqlGTIDSet),
gmysql.MySQLFlavor,
false,
true,
},
{
InitLocation(MinPosition, gtid.MinGTIDSet(gmysql.MySQLFlavor)),
gmysql.MySQLFlavor,
true,
true,
},

{
InitLocation(mysqlPos, mariaGTIDSet),
gmysql.MariaDBFlavor,
true,
false,
},
{
InitLocation(mysqlPos, gtid.MinGTIDSet(gmysql.MariaDBFlavor)),
gmysql.MariaDBFlavor,
true,
false,
},
{

InitLocation(MinPosition, mariaGTIDSet),
gmysql.MariaDBFlavor,
true,
false,
},
{
InitLocation(MinPosition, mariaGTIDSet),
gmysql.MariaDBFlavor,
false,
true,
},
{
InitLocation(MinPosition, gtid.MinGTIDSet(gmysql.MariaDBFlavor)),
gmysql.MariaDBFlavor,
true,
true,
},
}

for _, tc := range testCases {
fresh := IsFreshPosition(tc.loc, tc.flavor, tc.cmpGTID)
c.Assert(fresh, Equals, tc.fresh)
}
}
6 changes: 6 additions & 0 deletions pkg/binlog/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,9 @@ type Reader interface {
// Status returns the status of the reader.
Status() interface{}
}

// Streamer provides the ability to get binlog event from remote server or local file.
type Streamer interface {
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
// GetEvent returns binlog event
GetEvent(ctx context.Context) (*replication.BinlogEvent, error)
}
36 changes: 21 additions & 15 deletions pkg/binlog/reader/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,26 +30,14 @@ import (
"github.com/pingcap/dm/relay/common"
)

// GetGTIDsForPos tries to get GTID sets for the specified binlog position (for the corresponding txn).
// NOTE: this method is very similar with `relay/writer/file_util.go/getTxnPosGTIDs`, unify them if needed later.
// NOTE: this method is not well tested directly, but more tests have already been done for `relay/writer/file_util.go/getTxnPosGTIDs`.
func GetGTIDsForPos(ctx context.Context, r Reader, endPos gmysql.Position) (gtid.Set, error) {
// start to get and parse binlog event from the beginning of the file.
startPos := gmysql.Position{
Name: endPos.Name,
Pos: 0,
}
err := r.StartSyncByPos(startPos)
if err != nil {
return nil, err
}
defer r.Close()

// GetGTIDsForPosFromStreamer tries to get GTID sets for the specified binlog position (for the corresponding txn) from a Streamer.
func GetGTIDsForPosFromStreamer(ctx context.Context, r Streamer, endPos gmysql.Position) (gtid.Set, error) {
var (
flavor string
latestPos uint32
latestGSet gmysql.GTIDSet
nextGTIDStr string // can be recorded if the coming transaction completed
err error
)
for {
var e *replication.BinlogEvent
Expand Down Expand Up @@ -141,6 +129,24 @@ func GetGTIDsForPos(ctx context.Context, r Reader, endPos gmysql.Position) (gtid
}
}

// GetGTIDsForPos tries to get GTID sets for the specified binlog position (for the corresponding txn).
// NOTE: this method is very similar with `relay/writer/file_util.go/getTxnPosGTIDs`, unify them if needed later.
// NOTE: this method is not well tested directly, but more tests have already been done for `relay/writer/file_util.go/getTxnPosGTIDs`.
func GetGTIDsForPos(ctx context.Context, r Reader, endPos gmysql.Position) (gtid.Set, error) {
// start to get and parse binlog event from the beginning of the file.
startPos := gmysql.Position{
Name: endPos.Name,
Pos: 0,
}
err := r.StartSyncByPos(startPos)
if err != nil {
return nil, err
}
defer r.Close()

return GetGTIDsForPosFromStreamer(ctx, r, endPos)
}

// GetPreviousGTIDFromGTIDSet tries to get previous GTID sets from Previous_GTID_EVENT GTID for the specified GITD Set.
// events should be [fake_rotate_event,format_description_event,previous_gtids_event/mariadb_gtid_list_event].
func GetPreviousGTIDFromGTIDSet(ctx context.Context, r Reader, gset gtid.Set) (gtid.Set, error) {
Expand Down
6 changes: 2 additions & 4 deletions pkg/streamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/pingcap/dm/pkg/binlog/common"
"github.com/pingcap/dm/pkg/binlog/event"
"github.com/pingcap/dm/pkg/binlog/reader"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"

Expand All @@ -32,10 +33,7 @@ var heartbeatInterval = common.MasterHeartbeatPeriod
// TODO: maybe one day we can make a pull request to go-mysql to support LocalStreamer.

// Streamer provides the ability to get binlog event from remote server or local file.
type Streamer interface {
// GetEvent returns binlog event
GetEvent(ctx context.Context) (*replication.BinlogEvent, error)
}
type Streamer reader.Streamer

// LocalStreamer reads and parses binlog events from local binlog file.
type LocalStreamer struct {
Expand Down
3 changes: 2 additions & 1 deletion syncer/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -781,7 +781,8 @@ func (cp *RemoteCheckPoint) Load(tctx *tcontext.Context) error {
gset,
)
if isGlobal {
if binlog.CompareLocation(location, binlog.NewLocation(cp.cfg.Flavor), cp.cfg.EnableGTID) > 0 {
// Use IsFreshPosition here to make sure checkpoint can be updated if gset is empty
if !binlog.IsFreshPosition(location, cp.cfg.Flavor, cp.cfg.EnableGTID) {
cp.globalPoint = newBinlogPoint(location, location, nil, nil, cp.cfg.EnableGTID)
cp.logCtx.L().Info("fetch global checkpoint from DB", log.WrapStringerField("global checkpoint", cp.globalPoint))
}
Expand Down
87 changes: 72 additions & 15 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"github.com/pingcap/dm/pkg/binlog"
"github.com/pingcap/dm/pkg/binlog/common"
"github.com/pingcap/dm/pkg/binlog/event"
"github.com/pingcap/dm/pkg/binlog/reader"
"github.com/pingcap/dm/pkg/conn"
tcontext "github.com/pingcap/dm/pkg/context"
fr "github.com/pingcap/dm/pkg/func-rollback"
Expand Down Expand Up @@ -427,7 +428,8 @@ func (s *Syncer) initShardingGroups(ctx context.Context) error {
func (s *Syncer) IsFreshTask(ctx context.Context) (bool, error) {
globalPoint := s.checkpoint.GlobalPoint()
tablePoint := s.checkpoint.TablePoint()
return binlog.CompareLocation(globalPoint, binlog.NewLocation(s.cfg.Flavor), s.cfg.EnableGTID) <= 0 && len(tablePoint) == 0, nil
// doesn't have neither GTID nor binlog pos
return binlog.IsFreshPosition(globalPoint, s.cfg.Flavor, s.cfg.EnableGTID) && len(tablePoint) == 0, nil
}

func (s *Syncer) reset() {
Expand Down Expand Up @@ -1128,23 +1130,30 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
if err != nil {
return err
}

// for fresh and all-mode task, flush checkpoint so we could delete metadata file
if s.cfg.Mode == config.ModeAll {
if err = s.flushCheckPoints(); err != nil {
tctx.L().Warn("fail to flush checkpoints when starting task", zap.Error(err))
} else if s.cfg.CleanDumpFile {
tctx.L().Info("try to remove loaded files")
metadataFile := path.Join(s.cfg.Dir, "metadata")
if err = os.Remove(metadataFile); err != nil {
tctx.L().Warn("error when remove loaded dump file", zap.String("data file", metadataFile), zap.Error(err))
}
if err = os.Remove(s.cfg.Dir); err != nil {
tctx.L().Warn("error when remove loaded dump folder", zap.String("data folder", s.cfg.Dir), zap.Error(err))
}
}
needFlushCheckpoint, err := s.adjustGlobalPointGTID(tctx)
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
if needFlushCheckpoint || s.cfg.Mode == config.ModeAll {
if err = s.flushCheckPoints(); err != nil {
tctx.L().Warn("fail to flush checkpoints when starting task", zap.Error(err))
} else if s.cfg.Mode == config.ModeAll && s.cfg.CleanDumpFile {
tctx.L().Info("try to remove loaded files")
metadataFile := path.Join(s.cfg.Dir, "metadata")
if err = os.Remove(metadataFile); err != nil {
tctx.L().Warn("error when remove loaded dump file", zap.String("data file", metadataFile), zap.Error(err))
}
if err = os.Remove(s.cfg.Dir); err != nil {
tctx.L().Warn("error when remove loaded dump folder", zap.String("data folder", s.cfg.Dir), zap.Error(err))
}
}
}
failpoint.Inject("AdjustGTIDExit", func() {
tctx.L().Warn("exit triggered", zap.String("failpoint", "AdjustGTIDExit"))
s.streamerController.Close(tctx)
utils.OsExit(1)
})

// startLocation is the start location for current received event
// currentLocation is the end location for current received event (End_log_pos in `show binlog events` for mysql)
Expand Down Expand Up @@ -2855,3 +2864,51 @@ func (s *Syncer) getEvent(tctx *tcontext.Context, startLocation binlog.Location)

return s.streamerController.GetEvent(tctx)
}

func (s *Syncer) adjustGlobalPointGTID(tctx *tcontext.Context) (bool, error) {
location := s.checkpoint.GlobalPoint()
// situations that don't need to adjust
// 1. GTID is not enabled
// 2. location already has GTID position
// 3. location is totally new, has no position info
if !s.cfg.EnableGTID || location.GTIDSetStr() != "" || location.Position.Name == "" {
return false, nil
}
// set enableGTID to false for new streamerController
streamerController := NewStreamerController(s.syncCfg, false, s.fromDB, s.binlogType, s.cfg.RelayDir, s.timezone)

endPos := binlog.AdjustPosition(location.Position)
startPos := mysql.Position{
Name: endPos.Name,
Pos: 0,
}
startLocation := location.Clone()
startLocation.Position = startPos

err := streamerController.Start(tctx, startLocation)
if err != nil {
return false, err
}
defer streamerController.Close(tctx)

gs, err := reader.GetGTIDsForPosFromStreamer(tctx.Context(), streamerController.streamer, endPos)
if err != nil {
s.tctx.L().Warn("fail to get gtids for global location", zap.Stringer("pos", location), zap.Error(err))
return false, err
}
err = location.SetGTID(gs.Origin())
if err != nil {
s.tctx.L().Warn("fail to set gtid for global location", zap.Stringer("pos", location),
zap.String("adjusted_gtid", gs.String()), zap.Error(err))
return false, err
}
s.checkpoint.SaveGlobalPoint(location)
// redirect streamer for new gtid set location
err = s.streamerController.RedirectStreamer(tctx, location)
if err != nil {
s.tctx.L().Warn("fail to redirect streamer for global location", zap.Stringer("pos", location),
zap.String("adjusted_gtid", gs.String()), zap.Error(err))
return false, err
}
return true, nil
}
Loading