Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

syncer: fix syncer gtid auto switch from off to on #1723

Merged
merged 19 commits into from
Jun 3, 2021
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/dm-worker/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func TestRunMain(_ *testing.T) {
defer func() { utils.OsExit = oldOsExit }()
utils.OsExit = func(code int) {
log.L().Info("os exits", zap.Int("exit code", code))
_ = utils.KillMySelf()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't remember why we need this helper function now. maybe the process won't exit after this test function returns?

if so, I think the reason may be that lthe test function should be named TestMain not TestRunMain, please check if this works.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exit <- code
// sleep here to prevent following code execution in the caller routine
time.Sleep(time.Second * 60)
Expand Down
6 changes: 4 additions & 2 deletions dm/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,10 @@ func (s *Server) doStartKeepAlive() {
}

func (s *Server) stopKeepAlive() {
s.kaCancel()
s.kaWg.Wait()
if s.kaCancel != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in which case stopKeepAlive is called before or concurrent with startKeepAlive? we may need a lock if so 😂

Copy link
Contributor Author

@lichunzhu lichunzhu May 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. dm-worker/main.go s.Start()
  2. Start() return an error before s.startKeepAlive(). e.g.: worker port already in use
  3. dm-worker/main.go s.Close() -> s.stopKeepAlive(), report a nil pointer info in stdout

s.kaCancel()
s.kaWg.Wait()
}
}

func (s *Server) restartKeepAlive() {
Expand Down
33 changes: 33 additions & 0 deletions pkg/binlog/position.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,39 @@ func CompareLocation(location1, location2 Location, cmpGTID bool) int {
return compareIndex(location1.Suffix, location2.Suffix)
}

// CompareLocationAsPossible returns:
// 1 if point1 is bigger than point2
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
// 0 if point1 is equal to point2
// -1 if point1 is less than point2
// The difference is that this function will compare positions if gtid sets are both empty.
func CompareLocationAsPossible(location1, location2 Location, cmpGTID bool) int {
if cmpGTID {
cmp, canCmp := CompareGTID(location1.gtidSet, location2.gtidSet)
if canCmp {
if cmp != 0 {
return cmp
}
if cmp = compareIndex(location1.Suffix, location2.Suffix); cmp != 0 {
return cmp
}
if location1.gtidSet != nil && location1.gtidSet.String() != "" {
return cmp
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if both gtid sets are empty, but suffix are not? e.g. location1:(pos1,"",1), location2:(pos2,"",0)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Suffix is only used in handle-error?

// empty GTIDSet, then compare by position
log.L().Warn("both gtidSets are empty, will compare by position", zap.Stringer("location1", location1), zap.Stringer("location2", location2))
} else {
// if can't compare by GTIDSet, then compare by position
log.L().Warn("gtidSet can't be compared, will compare by position", zap.Stringer("location1", location1), zap.Stringer("location2", location2))
}
}

cmp := ComparePosition(location1.Position, location2.Position)
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
if cmp != 0 {
return cmp
}
return compareIndex(location1.Suffix, location2.Suffix)
}

// CompareGTID returns:
// 1, true if gSet1 is bigger than gSet2
// 0, true if gSet1 is equal to gSet2
Expand Down
6 changes: 6 additions & 0 deletions pkg/binlog/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,9 @@ type Reader interface {
// Status returns the status of the reader.
Status() interface{}
}

// Streamer provides the ability to get binlog event from remote server or local file.
type Streamer interface {
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
// GetEvent returns binlog event
GetEvent(ctx context.Context) (*replication.BinlogEvent, error)
}
36 changes: 21 additions & 15 deletions pkg/binlog/reader/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,26 +30,14 @@ import (
"github.com/pingcap/dm/relay/common"
)

// GetGTIDsForPos tries to get GTID sets for the specified binlog position (for the corresponding txn).
// NOTE: this method is very similar with `relay/writer/file_util.go/getTxnPosGTIDs`, unify them if needed later.
// NOTE: this method is not well tested directly, but more tests have already been done for `relay/writer/file_util.go/getTxnPosGTIDs`.
func GetGTIDsForPos(ctx context.Context, r Reader, endPos gmysql.Position) (gtid.Set, error) {
// start to get and parse binlog event from the beginning of the file.
startPos := gmysql.Position{
Name: endPos.Name,
Pos: 0,
}
err := r.StartSyncByPos(startPos)
if err != nil {
return nil, err
}
defer r.Close()

// GetGTIDsForPosStreamer tries to get GTID sets for the specified binlog position (for the corresponding txn) from a Streamer.
func GetGTIDsForPosStreamer(ctx context.Context, r Streamer, endPos gmysql.Position) (gtid.Set, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func GetGTIDsForPosStreamer(ctx context.Context, r Streamer, endPos gmysql.Position) (gtid.Set, error) {
func GetGTIDsForPosFromStreamer(ctx context.Context, r Streamer, endPos gmysql.Position) (gtid.Set, error) {

var (
flavor string
latestPos uint32
latestGSet gmysql.GTIDSet
nextGTIDStr string // can be recorded if the coming transaction completed
err error
)
for {
var e *replication.BinlogEvent
Expand Down Expand Up @@ -141,6 +129,24 @@ func GetGTIDsForPos(ctx context.Context, r Reader, endPos gmysql.Position) (gtid
}
}

// GetGTIDsForPos tries to get GTID sets for the specified binlog position (for the corresponding txn).
// NOTE: this method is very similar with `relay/writer/file_util.go/getTxnPosGTIDs`, unify them if needed later.
// NOTE: this method is not well tested directly, but more tests have already been done for `relay/writer/file_util.go/getTxnPosGTIDs`.
func GetGTIDsForPos(ctx context.Context, r Reader, endPos gmysql.Position) (gtid.Set, error) {
// start to get and parse binlog event from the beginning of the file.
startPos := gmysql.Position{
Name: endPos.Name,
Pos: 0,
}
err := r.StartSyncByPos(startPos)
if err != nil {
return nil, err
}
defer r.Close()

return GetGTIDsForPosStreamer(ctx, r, endPos)
}

// GetPreviousGTIDFromGTIDSet tries to get previous GTID sets from Previous_GTID_EVENT GTID for the specified GITD Set.
// events should be [fake_rotate_event,format_description_event,previous_gtids_event/mariadb_gtid_list_event].
func GetPreviousGTIDFromGTIDSet(ctx context.Context, r Reader, gset gtid.Set) (gtid.Set, error) {
Expand Down
6 changes: 2 additions & 4 deletions pkg/streamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/pingcap/dm/pkg/binlog/common"
"github.com/pingcap/dm/pkg/binlog/event"
"github.com/pingcap/dm/pkg/binlog/reader"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"

Expand All @@ -32,10 +33,7 @@ var heartbeatInterval = common.MasterHeartbeatPeriod
// TODO: maybe one day we can make a pull request to go-mysql to support LocalStreamer.

// Streamer provides the ability to get binlog event from remote server or local file.
type Streamer interface {
// GetEvent returns binlog event
GetEvent(ctx context.Context) (*replication.BinlogEvent, error)
}
type Streamer reader.Streamer

// LocalStreamer reads and parses binlog events from local binlog file.
type LocalStreamer struct {
Expand Down
12 changes: 12 additions & 0 deletions pkg/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"regexp"
"strconv"
"strings"
"syscall"
"time"

"github.com/go-mysql-org/go-mysql/mysql"
Expand Down Expand Up @@ -219,3 +220,14 @@ func WrapSchemesForInitialCluster(s string, https bool) string {
}
return strings.Join(output, ",")
}

// KillMySelf sends sigint to current process, used in integration test only
//
// Only works on Unix. Signaling on Windows is not supported.
func KillMySelf() error {
proc, err := os.FindProcess(os.Getpid())
if err == nil {
err = proc.Signal(syscall.SIGINT)
}
return errors.Trace(err)
}
3 changes: 2 additions & 1 deletion syncer/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -781,7 +781,8 @@ func (cp *RemoteCheckPoint) Load(tctx *tcontext.Context) error {
gset,
)
if isGlobal {
if binlog.CompareLocation(location, binlog.NewLocation(cp.cfg.Flavor), cp.cfg.EnableGTID) > 0 {
// Use CompareLocationAsPossible here to make sure checkpoint can be updated if gset is empty
if binlog.CompareLocationAsPossible(location, binlog.NewLocation(cp.cfg.Flavor), cp.cfg.EnableGTID) > 0 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this function always called with location2 as empty location? If so, we could not be bothered to add an unit test, just simplifying the logic and tighten function name

cp.globalPoint = newBinlogPoint(location, location, nil, nil, cp.cfg.EnableGTID)
cp.logCtx.L().Info("fetch global checkpoint from DB", log.WrapStringerField("global checkpoint", cp.globalPoint))
}
Expand Down
87 changes: 72 additions & 15 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"github.com/pingcap/dm/pkg/binlog"
"github.com/pingcap/dm/pkg/binlog/common"
"github.com/pingcap/dm/pkg/binlog/event"
"github.com/pingcap/dm/pkg/binlog/reader"
"github.com/pingcap/dm/pkg/conn"
tcontext "github.com/pingcap/dm/pkg/context"
fr "github.com/pingcap/dm/pkg/func-rollback"
Expand Down Expand Up @@ -427,7 +428,8 @@ func (s *Syncer) initShardingGroups(ctx context.Context) error {
func (s *Syncer) IsFreshTask(ctx context.Context) (bool, error) {
globalPoint := s.checkpoint.GlobalPoint()
tablePoint := s.checkpoint.TablePoint()
return binlog.CompareLocation(globalPoint, binlog.NewLocation(s.cfg.Flavor), s.cfg.EnableGTID) <= 0 && len(tablePoint) == 0, nil
// doesn't have neither GTID nor binlog pos
return binlog.CompareLocationAsPossible(globalPoint, binlog.NewLocation(s.cfg.Flavor), s.cfg.EnableGTID) <= 0 && len(tablePoint) == 0, nil
}

func (s *Syncer) reset() {
Expand Down Expand Up @@ -1128,23 +1130,30 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
if err != nil {
return err
}

// for fresh and all-mode task, flush checkpoint so we could delete metadata file
if s.cfg.Mode == config.ModeAll {
if err = s.flushCheckPoints(); err != nil {
tctx.L().Warn("fail to flush checkpoints when starting task", zap.Error(err))
} else if s.cfg.CleanDumpFile {
tctx.L().Info("try to remove loaded files")
metadataFile := path.Join(s.cfg.Dir, "metadata")
if err = os.Remove(metadataFile); err != nil {
tctx.L().Warn("error when remove loaded dump file", zap.String("data file", metadataFile), zap.Error(err))
}
if err = os.Remove(s.cfg.Dir); err != nil {
tctx.L().Warn("error when remove loaded dump folder", zap.String("data folder", s.cfg.Dir), zap.Error(err))
}
}
needFlushCheckpoint, err := s.adjustGlobalPointGTID(tctx)
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
if needFlushCheckpoint || s.cfg.Mode == config.ModeAll {
if err = s.flushCheckPoints(); err != nil {
tctx.L().Warn("fail to flush checkpoints when starting task", zap.Error(err))
} else if s.cfg.Mode == config.ModeAll && s.cfg.CleanDumpFile {
tctx.L().Info("try to remove loaded files")
metadataFile := path.Join(s.cfg.Dir, "metadata")
if err = os.Remove(metadataFile); err != nil {
tctx.L().Warn("error when remove loaded dump file", zap.String("data file", metadataFile), zap.Error(err))
}
if err = os.Remove(s.cfg.Dir); err != nil {
tctx.L().Warn("error when remove loaded dump folder", zap.String("data folder", s.cfg.Dir), zap.Error(err))
}
}
}
failpoint.Inject("AdjustGTIDExit", func() {
tctx.L().Warn("exit triggered", zap.String("failpoint", "AdjustGTIDExit"))
s.streamerController.Close(tctx)
utils.OsExit(1)
})

// startLocation is the start location for current received event
// currentLocation is the end location for current received event (End_log_pos in `show binlog events` for mysql)
Expand Down Expand Up @@ -2854,3 +2863,51 @@ func (s *Syncer) getEvent(tctx *tcontext.Context, startLocation binlog.Location)

return s.streamerController.GetEvent(tctx)
}

func (s *Syncer) adjustGlobalPointGTID(tctx *tcontext.Context) (bool, error) {
location := s.checkpoint.GlobalPoint()
// situations that don't need to adjust
// 1. GTID is not enabled
// 2. location already has GTID position
// 3. location is totally new, has no position info
if !s.cfg.EnableGTID || location.GTIDSetStr() != "" || location.Position.Name == "" {
return false, nil
}
// set enableGTID to false for new streamerController
streamerController := NewStreamerController(s.syncCfg, false, s.fromDB, s.binlogType, s.cfg.RelayDir, s.timezone)

endPos := binlog.AdjustPosition(location.Position)
startPos := mysql.Position{
Name: endPos.Name,
Pos: 0,
}
startLocation := location.Clone()
startLocation.Position = startPos

err := streamerController.Start(tctx, startLocation)
if err != nil {
return false, err
}
defer streamerController.Close(tctx)

gs, err := reader.GetGTIDsForPosStreamer(tctx.Context(), streamerController.streamer, endPos)
if err != nil {
s.tctx.L().Warn("fail to get gtids for global location", zap.Stringer("pos", location), zap.Error(err))
return false, err
}
err = location.SetGTID(gs.Origin())
if err != nil {
s.tctx.L().Warn("fail to set gtid for global location", zap.Stringer("pos", location),
zap.String("adjusted_gtid", gs.String()), zap.Error(err))
return false, err
}
s.checkpoint.SaveGlobalPoint(location)
// redirect streamer for new gtid set location
err = s.streamerController.RedirectStreamer(tctx, location)
if err != nil {
s.tctx.L().Warn("fail to redirect streamer for global location", zap.Stringer("pos", location),
zap.String("adjusted_gtid", gs.String()), zap.Error(err))
return false, err
}
return true, nil
}
56 changes: 56 additions & 0 deletions tests/adjust_gtid/conf/diff_config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# diff Configuration.

log-level = "info"

chunk-size = 1000

check-thread-count = 4

sample-percent = 100

use-checksum = true

fix-sql-file = "fix.sql"

# tables need to check.
[[check-tables]]
schema = "adjust_gtid"
tables = ["~t.*"]

[[table-config]]
schema = "adjust_gtid"
table = "t1"

[[table-config.source-tables]]
instance-id = "source-1"
schema = "adjust_gtid"
table = "t1"

[[table-config]]
schema = "adjust_gtid"
table = "t2"

[[table-config.source-tables]]
instance-id = "source-2"
schema = "adjust_gtid"
table = "t2"

[[source-db]]
host = "127.0.0.1"
port = 3306
user = "root"
password = "123456"
instance-id = "source-1"

[[source-db]]
host = "127.0.0.1"
port = 3307
user = "root"
password = "123456"
instance-id = "source-2"

[target-db]
host = "127.0.0.1"
port = 4000
user = "test"
password = "123456"
6 changes: 6 additions & 0 deletions tests/adjust_gtid/conf/dm-master.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Master Configuration.
master-addr = ":8261"
advertise-addr = "127.0.0.1:8261"

rpc-timeout = "30s"
auto-compaction-retention = "3s"
Loading