Skip to content

Commit

Permalink
cherry pick pingcap#1291 to release-2.0
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
  • Loading branch information
GMHDBJD authored and ti-srebot committed Dec 22, 2020
1 parent a864d71 commit 97527c6
Show file tree
Hide file tree
Showing 86 changed files with 487 additions and 154 deletions.
2 changes: 2 additions & 0 deletions chaos/cases/conf/source1.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
source-id: "mysql-replica-01"
enable-gtid: false
enable-relay: false

from:
host: "mysql-0.mysql" # same namespace with MySQL
Expand Down
2 changes: 2 additions & 0 deletions chaos/cases/conf/source2.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
source-id: "mysql-replica-02"
enable-gtid: true
enable-relay: true

from:
host: "mysql-1.mysql" # same namespace with MySQL
Expand Down
4 changes: 2 additions & 2 deletions chaos/cases/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ func createSources(ctx context.Context, cli pb.MasterClient, cfg *config) error
return err
}

var cfg1 config2.SourceConfig
var cfg2 config2.SourceConfig
cfg1 := config2.NewSourceConfig()
cfg2 := config2.NewSourceConfig()
if err = cfg1.ParseYaml(string(s1Content)); err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion chaos/cases/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
const (
tableCount = 10 // tables count in schema.
fullInsertCount = 100 // `INSERT INTO` count (not rows count) for each table in full stage.
diffCount = 20 // diff data check count
diffCount = 30 // diff data check count
diffInterval = 10 * time.Second // diff data check interval
incrRoundTime = 20 * time.Second // time to generate incremental data in one round
)
Expand Down
8 changes: 7 additions & 1 deletion dm/config/source_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
const (
// the default base(min) server id generated by random
defaultBaseServerID = math.MaxUint32 / 10
defaultRelayDir = "relay-dir"
)

var getAllServerIDFunc = utils.GetAllServerID
Expand Down Expand Up @@ -53,6 +54,8 @@ type SourceConfig struct {
// relay synchronous starting point (if specified)
RelayBinLogName string `yaml:"relay-binlog-name" toml:"relay-binlog-name" json:"relay-binlog-name"`
RelayBinlogGTID string `yaml:"relay-binlog-gtid" toml:"relay-binlog-gtid" json:"relay-binlog-gtid"`
// only use when worker bound source, do not marsh it
UUIDSuffix int `yaml:"-" toml:"-" json:"-"`

SourceID string `yaml:"source-id" toml:"source-id" json:"source-id"`
From DBConfig `yaml:"from" toml:"from" json:"from"`
Expand All @@ -73,7 +76,6 @@ type SourceConfig struct {
// NewSourceConfig creates a new base config for upstream MySQL/MariaDB source.
func NewSourceConfig() *SourceConfig {
c := &SourceConfig{
RelayDir: "relay-dir",
Purge: PurgeConfig{
Interval: 60 * 60,
Expires: 0,
Expand Down Expand Up @@ -243,6 +245,10 @@ func (c *SourceConfig) Adjust(ctx context.Context, db *sql.DB) (err error) {
}
}

if c.EnableRelay && len(c.RelayDir) == 0 {
c.RelayDir = defaultRelayDir
}

return nil
}

Expand Down
6 changes: 3 additions & 3 deletions dm/ctl/ctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ func NewRootCmd() *cobra.Command {
master.NewQueryStatusCmd(),
master.NewShowDDLLocksCmd(),
master.NewUnlockDDLLockCmd(),
// master.NewPauseRelayCmd(),
// master.NewResumeRelayCmd(),
// master.NewPurgeRelayCmd(),
master.NewPauseRelayCmd(),
master.NewResumeRelayCmd(),
master.NewPurgeRelayCmd(),
master.NewOperateSourceCmd(),
master.NewOfflineMemberCmd(),
master.NewOperateLeaderCmd(),
Expand Down
8 changes: 8 additions & 0 deletions dm/worker/relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,14 @@ func (d *DummyRelay) SaveMeta(pos mysql.Position, gset gtid.Set) error {
return nil
}

// ResetMeta implements Process interface
func (d *DummyRelay) ResetMeta() {}

// PurgeRelayDir implements Process interface
func (d *DummyRelay) PurgeRelayDir() error {
return nil
}

func (t *testRelay) TestRelay(c *C) {
originNewRelay := relay.NewRelay
relay.NewRelay = NewDummyRelay
Expand Down
45 changes: 26 additions & 19 deletions dm/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ var (
keepaliveTime = 3 * time.Second
retryConnectSleepTime = time.Second
syncMasterEndpointsTime = 3 * time.Second
getMinPosForSubTaskFunc = getMinPosForSubTask
getMinLocForSubTaskFunc = getMinLocForSubTask
)

// Server accepts RPC requests
Expand Down Expand Up @@ -546,16 +546,24 @@ func (s *Server) startWorker(cfg *config.SourceConfig) error {
if cfg.EnableRelay {
dctx, dcancel := context.WithTimeout(s.etcdClient.Ctx(), time.Duration(len(subTaskCfgs))*3*time.Second)
defer dcancel()
minPos, err1 := getMinPosInAllSubTasks(dctx, subTaskCfgs)
minLoc, err1 := getMinLocInAllSubTasks(dctx, subTaskCfgs)
if err1 != nil {
return err1
}

// TODO: support GTID
// don't contain GTID information in checkpoint table, just set it to empty
if minPos != nil {
cfg.RelayBinLogName = binlog.AdjustPosition(*minPos).Name
cfg.RelayBinlogGTID = ""
if minLoc != nil {
log.L().Info("get min location in all subtasks", zap.Stringer("location", *minLoc))
cfg.RelayBinLogName = binlog.AdjustPosition(minLoc.Position).Name
cfg.RelayBinlogGTID = minLoc.GTIDSetStr()
// set UUIDSuffix when bound to a source
cfg.UUIDSuffix, err = binlog.ExtractSuffix(minLoc.Position.Name)
if err != nil {
return err
}
} else {
// set UUIDSuffix even not checkpoint exist
// so we will still remove relay dir
cfg.UUIDSuffix = binlog.MinUUIDSuffix
}
}

Expand Down Expand Up @@ -634,32 +642,31 @@ func makeCommonWorkerResponse(reqErr error) *pb.CommonWorkerResponse {
}

// all subTask in subTaskCfgs should have same source
// this function return the min position in all subtasks, used for relay's position
// TODO: get min gtidSet
func getMinPosInAllSubTasks(ctx context.Context, subTaskCfgs []*config.SubTaskConfig) (minPos *mysql.Position, err error) {
// this function return the min location in all subtasks, used for relay's location
func getMinLocInAllSubTasks(ctx context.Context, subTaskCfgs []*config.SubTaskConfig) (minLoc *binlog.Location, err error) {
for _, subTaskCfg := range subTaskCfgs {
pos, err := getMinPosForSubTaskFunc(ctx, subTaskCfg)
loc, err := getMinLocForSubTaskFunc(ctx, subTaskCfg)
if err != nil {
return nil, err
}

if pos == nil {
if loc == nil {
continue
}

if minPos == nil {
minPos = pos
if minLoc == nil {
minLoc = loc
} else {
if minPos.Compare(*pos) >= 1 {
minPos = pos
if binlog.CompareLocation(*minLoc, *loc, subTaskCfg.EnableGTID) >= 1 {
minLoc = loc
}
}
}

return minPos, nil
return minLoc, nil
}

func getMinPosForSubTask(ctx context.Context, subTaskCfg *config.SubTaskConfig) (minPos *mysql.Position, err error) {
func getMinLocForSubTask(ctx context.Context, subTaskCfg *config.SubTaskConfig) (minLoc *binlog.Location, err error) {
if subTaskCfg.Mode == config.ModeFull {
return nil, nil
}
Expand All @@ -682,7 +689,7 @@ func getMinPosForSubTask(ctx context.Context, subTaskCfg *config.SubTaskConfig)
}

location := checkpoint.GlobalPoint()
return &location.Position, nil
return &location, nil
}

// unifyMasterBinlogPos eliminates different masterBinlog in one response
Expand Down
61 changes: 44 additions & 17 deletions dm/worker/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/dm/pb"
"github.com/pingcap/dm/dm/unit"
"github.com/pingcap/dm/pkg/binlog"
"github.com/pingcap/dm/pkg/gtid"
"github.com/pingcap/dm/pkg/ha"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"
Expand Down Expand Up @@ -60,11 +62,11 @@ func (t *testServer) SetUpSuite(c *C) {
err := log.InitLogger(&log.Config{})
c.Assert(err, IsNil)

getMinPosForSubTaskFunc = getFakePosForSubTask
getMinLocForSubTaskFunc = getFakeLocForSubTask
}

func (t *testServer) TearDownSuite(c *C) {
getMinPosForSubTaskFunc = getMinPosForSubTask
getMinLocForSubTaskFunc = getMinLocForSubTask
}

func createMockETCD(dir string, host string) (*embed.Etcd, error) {
Expand Down Expand Up @@ -404,7 +406,7 @@ func (t *testServer) testStopWorkerWhenLostConnect(c *C, s *Server, ETCD *embed.
c.Assert(s.getWorker(true), IsNil)
}

func (t *testServer) TestGetMinPosInAllSubTasks(c *C) {
func (t *testServer) TestGetMinLocInAllSubTasks(c *C) {
subTaskCfg := []*config.SubTaskConfig{
{
Name: "test2",
Expand All @@ -414,10 +416,19 @@ func (t *testServer) TestGetMinPosInAllSubTasks(c *C) {
Name: "test1",
},
}
minPos, err := getMinPosInAllSubTasks(context.Background(), subTaskCfg)
minLoc, err := getMinLocInAllSubTasks(context.Background(), subTaskCfg)
c.Assert(err, IsNil)
c.Assert(minPos.Name, Equals, "mysql-binlog.00001")
c.Assert(minPos.Pos, Equals, uint32(12))
c.Assert(minLoc.Position.Name, Equals, "mysql-binlog.00001")
c.Assert(minLoc.Position.Pos, Equals, uint32(12))

for _, subtask := range subTaskCfg {
subtask.EnableGTID = true
}

minLoc, err = getMinLocInAllSubTasks(context.Background(), subTaskCfg)
c.Assert(err, IsNil)
c.Assert(minLoc.Position.Name, Equals, "mysql-binlog.00001")
c.Assert(minLoc.Position.Pos, Equals, uint32(123))
}

func (t *testServer) TestUnifyMasterBinlogPos(c *C) {
Expand Down Expand Up @@ -530,22 +541,38 @@ func (t *testServer) TestUnifyMasterBinlogPos(c *C) {
c.Assert(relay.RelayCatchUpMaster, IsTrue)
}

func getFakePosForSubTask(ctx context.Context, subTaskCfg *config.SubTaskConfig) (minPos *mysql.Position, err error) {
switch subTaskCfg.Name {
case "test1":
return &mysql.Position{
func getFakeLocForSubTask(ctx context.Context, subTaskCfg *config.SubTaskConfig) (minLoc *binlog.Location, err error) {
gset1, _ := gtid.ParserGTID(mysql.MySQLFlavor, "ba8f633f-1f15-11eb-b1c7-0242ac110001:1-30")
gset2, _ := gtid.ParserGTID(mysql.MySQLFlavor, "ba8f633f-1f15-11eb-b1c7-0242ac110001:1-50")
gset3, _ := gtid.ParserGTID(mysql.MySQLFlavor, "ba8f633f-1f15-11eb-b1c7-0242ac110001:1-50,ba8f633f-1f15-11eb-b1c7-0242ac110002:1")
loc1 := binlog.InitLocation(
mysql.Position{
Name: "mysql-binlog.00001",
Pos: 123,
}, nil
case "test2":
return &mysql.Position{
},
gset1,
)
loc2 := binlog.InitLocation(
mysql.Position{
Name: "mysql-binlog.00001",
Pos: 12,
}, nil
case "test3":
return &mysql.Position{
},
gset2,
)
loc3 := binlog.InitLocation(
mysql.Position{
Name: "mysql-binlog.00003",
}, nil
},
gset3,
)

switch subTaskCfg.Name {
case "test1":
return &loc1, nil
case "test2":
return &loc2, nil
case "test3":
return &loc3, nil
default:
return nil, nil
}
Expand Down
80 changes: 79 additions & 1 deletion dm/worker/task_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/retry"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"
)

//// Backoff related constants
Expand Down Expand Up @@ -116,6 +117,11 @@ type backoffController struct {

// task name -> the latest auto resume time
latestResumeTime map[string]time.Time

latestRelayPausedTime time.Time
latestRelayBlockTime time.Time
latestRelayResumeTime time.Time
relayBackoff *backoff.Backoff
}

// newBackoffController returns a new backoffController instance
Expand Down Expand Up @@ -273,7 +279,72 @@ func (tsc *realTaskStatusChecker) getResumeStrategy(stStatus *pb.SubTaskStatus,
return ResumeDispatch
}

func (tsc *realTaskStatusChecker) check() {
func (tsc *realTaskStatusChecker) getRelayResumeStrategy(relayStatus *pb.RelayStatus, duration time.Duration) ResumeStrategy {
// relay that is not paused or paused manually, just ignore it
if relayStatus == nil || relayStatus.Stage != pb.Stage_Paused || relayStatus.Result == nil || relayStatus.Result.IsCanceled {
return ResumeIgnore
}

for _, err := range relayStatus.Result.Errors {
if _, ok := retry.UnresumableRelayErrCodes[err.ErrCode]; ok {
return ResumeNoSense
}
}

if time.Since(tsc.bc.latestRelayResumeTime) < duration {
return ResumeSkip
}

return ResumeDispatch
}

func (tsc *realTaskStatusChecker) checkRelayStatus() {
ctx, cancel := context.WithTimeout(context.Background(), utils.DefaultDBTimeout)
defer cancel()

relayStatus := tsc.w.relayHolder.Status(ctx)
if tsc.bc.relayBackoff == nil {
tsc.bc.relayBackoff, _ = backoff.NewBackoff(tsc.cfg.BackoffFactor, tsc.cfg.BackoffJitter, tsc.cfg.BackoffMin.Duration, tsc.cfg.BackoffMax.Duration)
tsc.bc.latestRelayPausedTime = time.Now()
tsc.bc.latestRelayResumeTime = time.Now()
}
rbf := tsc.bc.relayBackoff
duration := rbf.Current()
strategy := tsc.getRelayResumeStrategy(relayStatus, duration)
switch strategy {
case ResumeIgnore:
if time.Since(tsc.bc.latestRelayPausedTime) > tsc.cfg.BackoffRollback.Duration {
rbf.Rollback()
// after each rollback, reset this timer
tsc.bc.latestRelayPausedTime = time.Now()
}
case ResumeNoSense:
// this strategy doesn't forward or rollback backoff
tsc.bc.latestRelayPausedTime = time.Now()
blockTime := tsc.bc.latestRelayBlockTime
if !blockTime.IsZero() {
tsc.l.Warn("relay can't auto resume", zap.Duration("paused duration", time.Since(blockTime)))
} else {
tsc.bc.latestRelayBlockTime = time.Now()
tsc.l.Warn("relay can't auto resume")
}
case ResumeSkip:
tsc.l.Warn("backoff skip auto resume relay", zap.Time("latestResumeTime", tsc.bc.latestRelayResumeTime), zap.Duration("duration", duration))
tsc.bc.latestRelayPausedTime = time.Now()
case ResumeDispatch:
tsc.bc.latestRelayPausedTime = time.Now()
err := tsc.w.operateRelay(tsc.ctx, pb.RelayOp_ResumeRelay)
if err != nil {
tsc.l.Error("dispatch auto resume relay failed", zap.Error(err))
} else {
tsc.l.Info("dispatch auto resume relay")
tsc.bc.latestRelayResumeTime = time.Now()
rbf.BoundaryForward()
}
}
}

func (tsc *realTaskStatusChecker) checkTaskStatus() {
allSubTaskStatus := tsc.w.getAllSubTaskStatus()

defer func() {
Expand Down Expand Up @@ -333,3 +404,10 @@ func (tsc *realTaskStatusChecker) check() {
}
}
}

func (tsc *realTaskStatusChecker) check() {
if tsc.w.cfg.EnableRelay {
tsc.checkRelayStatus()
}
tsc.checkTaskStatus()
}
Loading

0 comments on commit 97527c6

Please sign in to comment.