From 97527c6f04e0b9105c0534e679f201662778dc54 Mon Sep 17 00:00:00 2001 From: GMHDBJD <35025882+GMHDBJD@users.noreply.github.com> Date: Tue, 22 Dec 2020 17:35:11 +0800 Subject: [PATCH] cherry pick #1291 to release-2.0 Signed-off-by: ti-srebot --- chaos/cases/conf/source1.yaml | 2 + chaos/cases/conf/source2.yaml | 2 + chaos/cases/source.go | 4 +- chaos/cases/task.go | 2 +- dm/config/source_config.go | 8 +- dm/ctl/ctl.go | 6 +- dm/worker/relay_test.go | 8 ++ dm/worker/server.go | 45 ++++++----- dm/worker/server_test.go | 61 ++++++++++---- dm/worker/task_checker.go | 80 ++++++++++++++++++- dm/worker/worker_test.go | 4 +- pkg/binlog/position.go | 20 +++++ pkg/binlog/position_test.go | 28 +++++++ pkg/binlog/reader/file.go | 5 +- pkg/retry/errors.go | 9 +++ pkg/utils/db.go | 27 +++++++ relay/config.go | 2 + relay/meta.go | 18 +++-- relay/meta_test.go | 4 +- relay/relay.go | 70 +++++++++++++--- relay/relay_test.go | 35 +++++--- relay/transformer/transformer.go | 2 + syncer/streamer_controller.go | 1 + tests/all_mode/conf/source1.yaml | 1 + tests/all_mode/conf/source2.yaml | 3 +- tests/compatibility/conf/source1.yaml | 1 + tests/compatibility/conf/source2.yaml | 3 +- tests/compatibility/start.sh | 2 +- tests/dm_syncer/conf/source1.yaml | 1 + tests/dm_syncer/conf/source2.yaml | 3 +- tests/dmctl_basic/check_list/get_config.sh | 4 +- tests/dmctl_basic/conf/get_source2.yaml | 2 +- tests/dmctl_basic/conf/source2.yaml | 2 +- tests/dmctl_basic/run.sh | 36 ++++----- tests/dmctl_command/conf/source1.yaml | 1 + tests/dmctl_command/conf/source2.yaml | 4 +- tests/dmctl_command/run.sh | 3 +- .../drop_column_with_index/conf/source1.yaml | 1 + tests/drop_column_with_index/run.sh | 3 +- tests/full_mode/conf/source1.yaml | 1 + tests/full_mode/conf/source2.yaml | 3 +- tests/full_mode/run.sh | 2 +- tests/ha/conf/dm-worker3.toml | 2 +- tests/ha/conf/source2.yaml | 2 +- tests/ha/run.sh | 12 +-- tests/ha_cases/conf/source1.yaml | 2 +- tests/ha_cases/conf/source2.yaml | 2 +- tests/ha_cases/lib.sh | 4 +- tests/ha_cases/run.sh | 42 +++++----- tests/ha_master/conf/source2.yaml | 2 +- tests/ha_master/run.sh | 4 +- tests/http_apis/conf/source1.yaml | 1 + tests/http_apis/run.sh | 4 +- tests/import_goroutine_leak/conf/source1.yaml | 1 + tests/import_goroutine_leak/run.sh | 2 +- tests/import_v10x/conf/source1.yaml | 1 + tests/import_v10x/conf/source2.yaml | 1 + tests/import_v10x/run.sh | 2 +- tests/incremental_mode/conf/source1.yaml | 1 + tests/incremental_mode/conf/source2.yaml | 1 + tests/incremental_mode/run.sh | 2 +- tests/load_interrupt/conf/source1.yaml | 1 + tests/online_ddl/conf/source1.yaml | 1 + tests/online_ddl/conf/source2.yaml | 1 + tests/online_ddl/run.sh | 2 +- tests/print_status/conf/source1.yaml | 1 + tests/safe_mode/conf/source1.yaml | 1 + tests/safe_mode/conf/source2.yaml | 1 + tests/sequence_safe_mode/conf/source1.yaml | 1 + tests/sequence_safe_mode/conf/source2.yaml | 1 + tests/sequence_sharding/conf/source1.yaml | 1 + tests/sequence_sharding/conf/source2.yaml | 1 + .../conf/source1.yaml | 1 + .../conf/source2.yaml | 3 +- tests/sequence_sharding_optimistic/run.sh | 4 +- .../conf/source1.yaml | 1 + .../conf/source2.yaml | 1 + tests/sharding/conf/source1.yaml | 1 + tests/sharding/conf/source2.yaml | 1 + tests/sharding2/conf/source1.yaml | 1 + tests/sharding2/conf/source2.yaml | 1 + tests/sharding2/run.sh | 2 +- tests/start_task/conf/source1.yaml | 1 + tests/tiup/conf/source1.yaml | 3 +- tests/tiup/conf/source2.yaml | 1 + tests/tls/conf/source1.yaml | 1 + 86 files changed, 487 insertions(+), 154 deletions(-) diff --git a/chaos/cases/conf/source1.yaml b/chaos/cases/conf/source1.yaml index 686897f149..796c553a90 100644 --- a/chaos/cases/conf/source1.yaml +++ b/chaos/cases/conf/source1.yaml @@ -1,4 +1,6 @@ source-id: "mysql-replica-01" +enable-gtid: false +enable-relay: false from: host: "mysql-0.mysql" # same namespace with MySQL diff --git a/chaos/cases/conf/source2.yaml b/chaos/cases/conf/source2.yaml index b9c01ef575..3d561d71c6 100644 --- a/chaos/cases/conf/source2.yaml +++ b/chaos/cases/conf/source2.yaml @@ -1,4 +1,6 @@ source-id: "mysql-replica-02" +enable-gtid: true +enable-relay: true from: host: "mysql-1.mysql" # same namespace with MySQL diff --git a/chaos/cases/source.go b/chaos/cases/source.go index 64150d691f..053c4fc551 100644 --- a/chaos/cases/source.go +++ b/chaos/cases/source.go @@ -39,8 +39,8 @@ func createSources(ctx context.Context, cli pb.MasterClient, cfg *config) error return err } - var cfg1 config2.SourceConfig - var cfg2 config2.SourceConfig + cfg1 := config2.NewSourceConfig() + cfg2 := config2.NewSourceConfig() if err = cfg1.ParseYaml(string(s1Content)); err != nil { return err } diff --git a/chaos/cases/task.go b/chaos/cases/task.go index 6a56cb8b77..3ff1d2800f 100644 --- a/chaos/cases/task.go +++ b/chaos/cases/task.go @@ -37,7 +37,7 @@ import ( const ( tableCount = 10 // tables count in schema. fullInsertCount = 100 // `INSERT INTO` count (not rows count) for each table in full stage. - diffCount = 20 // diff data check count + diffCount = 30 // diff data check count diffInterval = 10 * time.Second // diff data check interval incrRoundTime = 20 * time.Second // time to generate incremental data in one round ) diff --git a/dm/config/source_config.go b/dm/config/source_config.go index d8affbef76..facdbc41e3 100644 --- a/dm/config/source_config.go +++ b/dm/config/source_config.go @@ -24,6 +24,7 @@ import ( const ( // the default base(min) server id generated by random defaultBaseServerID = math.MaxUint32 / 10 + defaultRelayDir = "relay-dir" ) var getAllServerIDFunc = utils.GetAllServerID @@ -53,6 +54,8 @@ type SourceConfig struct { // relay synchronous starting point (if specified) RelayBinLogName string `yaml:"relay-binlog-name" toml:"relay-binlog-name" json:"relay-binlog-name"` RelayBinlogGTID string `yaml:"relay-binlog-gtid" toml:"relay-binlog-gtid" json:"relay-binlog-gtid"` + // only use when worker bound source, do not marsh it + UUIDSuffix int `yaml:"-" toml:"-" json:"-"` SourceID string `yaml:"source-id" toml:"source-id" json:"source-id"` From DBConfig `yaml:"from" toml:"from" json:"from"` @@ -73,7 +76,6 @@ type SourceConfig struct { // NewSourceConfig creates a new base config for upstream MySQL/MariaDB source. func NewSourceConfig() *SourceConfig { c := &SourceConfig{ - RelayDir: "relay-dir", Purge: PurgeConfig{ Interval: 60 * 60, Expires: 0, @@ -243,6 +245,10 @@ func (c *SourceConfig) Adjust(ctx context.Context, db *sql.DB) (err error) { } } + if c.EnableRelay && len(c.RelayDir) == 0 { + c.RelayDir = defaultRelayDir + } + return nil } diff --git a/dm/ctl/ctl.go b/dm/ctl/ctl.go index 4ca3827393..28ac57365f 100644 --- a/dm/ctl/ctl.go +++ b/dm/ctl/ctl.go @@ -64,9 +64,9 @@ func NewRootCmd() *cobra.Command { master.NewQueryStatusCmd(), master.NewShowDDLLocksCmd(), master.NewUnlockDDLLockCmd(), - // master.NewPauseRelayCmd(), - // master.NewResumeRelayCmd(), - // master.NewPurgeRelayCmd(), + master.NewPauseRelayCmd(), + master.NewResumeRelayCmd(), + master.NewPurgeRelayCmd(), master.NewOperateSourceCmd(), master.NewOfflineMemberCmd(), master.NewOperateLeaderCmd(), diff --git a/dm/worker/relay_test.go b/dm/worker/relay_test.go index c0774f17d6..9fc6480a3c 100644 --- a/dm/worker/relay_test.go +++ b/dm/worker/relay_test.go @@ -121,6 +121,14 @@ func (d *DummyRelay) SaveMeta(pos mysql.Position, gset gtid.Set) error { return nil } +// ResetMeta implements Process interface +func (d *DummyRelay) ResetMeta() {} + +// PurgeRelayDir implements Process interface +func (d *DummyRelay) PurgeRelayDir() error { + return nil +} + func (t *testRelay) TestRelay(c *C) { originNewRelay := relay.NewRelay relay.NewRelay = NewDummyRelay diff --git a/dm/worker/server.go b/dm/worker/server.go index ff6fb97cc1..f539ce1042 100644 --- a/dm/worker/server.go +++ b/dm/worker/server.go @@ -49,7 +49,7 @@ var ( keepaliveTime = 3 * time.Second retryConnectSleepTime = time.Second syncMasterEndpointsTime = 3 * time.Second - getMinPosForSubTaskFunc = getMinPosForSubTask + getMinLocForSubTaskFunc = getMinLocForSubTask ) // Server accepts RPC requests @@ -546,16 +546,24 @@ func (s *Server) startWorker(cfg *config.SourceConfig) error { if cfg.EnableRelay { dctx, dcancel := context.WithTimeout(s.etcdClient.Ctx(), time.Duration(len(subTaskCfgs))*3*time.Second) defer dcancel() - minPos, err1 := getMinPosInAllSubTasks(dctx, subTaskCfgs) + minLoc, err1 := getMinLocInAllSubTasks(dctx, subTaskCfgs) if err1 != nil { return err1 } - // TODO: support GTID - // don't contain GTID information in checkpoint table, just set it to empty - if minPos != nil { - cfg.RelayBinLogName = binlog.AdjustPosition(*minPos).Name - cfg.RelayBinlogGTID = "" + if minLoc != nil { + log.L().Info("get min location in all subtasks", zap.Stringer("location", *minLoc)) + cfg.RelayBinLogName = binlog.AdjustPosition(minLoc.Position).Name + cfg.RelayBinlogGTID = minLoc.GTIDSetStr() + // set UUIDSuffix when bound to a source + cfg.UUIDSuffix, err = binlog.ExtractSuffix(minLoc.Position.Name) + if err != nil { + return err + } + } else { + // set UUIDSuffix even not checkpoint exist + // so we will still remove relay dir + cfg.UUIDSuffix = binlog.MinUUIDSuffix } } @@ -634,32 +642,31 @@ func makeCommonWorkerResponse(reqErr error) *pb.CommonWorkerResponse { } // all subTask in subTaskCfgs should have same source -// this function return the min position in all subtasks, used for relay's position -// TODO: get min gtidSet -func getMinPosInAllSubTasks(ctx context.Context, subTaskCfgs []*config.SubTaskConfig) (minPos *mysql.Position, err error) { +// this function return the min location in all subtasks, used for relay's location +func getMinLocInAllSubTasks(ctx context.Context, subTaskCfgs []*config.SubTaskConfig) (minLoc *binlog.Location, err error) { for _, subTaskCfg := range subTaskCfgs { - pos, err := getMinPosForSubTaskFunc(ctx, subTaskCfg) + loc, err := getMinLocForSubTaskFunc(ctx, subTaskCfg) if err != nil { return nil, err } - if pos == nil { + if loc == nil { continue } - if minPos == nil { - minPos = pos + if minLoc == nil { + minLoc = loc } else { - if minPos.Compare(*pos) >= 1 { - minPos = pos + if binlog.CompareLocation(*minLoc, *loc, subTaskCfg.EnableGTID) >= 1 { + minLoc = loc } } } - return minPos, nil + return minLoc, nil } -func getMinPosForSubTask(ctx context.Context, subTaskCfg *config.SubTaskConfig) (minPos *mysql.Position, err error) { +func getMinLocForSubTask(ctx context.Context, subTaskCfg *config.SubTaskConfig) (minLoc *binlog.Location, err error) { if subTaskCfg.Mode == config.ModeFull { return nil, nil } @@ -682,7 +689,7 @@ func getMinPosForSubTask(ctx context.Context, subTaskCfg *config.SubTaskConfig) } location := checkpoint.GlobalPoint() - return &location.Position, nil + return &location, nil } // unifyMasterBinlogPos eliminates different masterBinlog in one response diff --git a/dm/worker/server_test.go b/dm/worker/server_test.go index 9c5f1a255c..a5505a9faa 100644 --- a/dm/worker/server_test.go +++ b/dm/worker/server_test.go @@ -33,6 +33,8 @@ import ( "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/dm/pb" "github.com/pingcap/dm/dm/unit" + "github.com/pingcap/dm/pkg/binlog" + "github.com/pingcap/dm/pkg/gtid" "github.com/pingcap/dm/pkg/ha" "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/terror" @@ -60,11 +62,11 @@ func (t *testServer) SetUpSuite(c *C) { err := log.InitLogger(&log.Config{}) c.Assert(err, IsNil) - getMinPosForSubTaskFunc = getFakePosForSubTask + getMinLocForSubTaskFunc = getFakeLocForSubTask } func (t *testServer) TearDownSuite(c *C) { - getMinPosForSubTaskFunc = getMinPosForSubTask + getMinLocForSubTaskFunc = getMinLocForSubTask } func createMockETCD(dir string, host string) (*embed.Etcd, error) { @@ -404,7 +406,7 @@ func (t *testServer) testStopWorkerWhenLostConnect(c *C, s *Server, ETCD *embed. c.Assert(s.getWorker(true), IsNil) } -func (t *testServer) TestGetMinPosInAllSubTasks(c *C) { +func (t *testServer) TestGetMinLocInAllSubTasks(c *C) { subTaskCfg := []*config.SubTaskConfig{ { Name: "test2", @@ -414,10 +416,19 @@ func (t *testServer) TestGetMinPosInAllSubTasks(c *C) { Name: "test1", }, } - minPos, err := getMinPosInAllSubTasks(context.Background(), subTaskCfg) + minLoc, err := getMinLocInAllSubTasks(context.Background(), subTaskCfg) c.Assert(err, IsNil) - c.Assert(minPos.Name, Equals, "mysql-binlog.00001") - c.Assert(minPos.Pos, Equals, uint32(12)) + c.Assert(minLoc.Position.Name, Equals, "mysql-binlog.00001") + c.Assert(minLoc.Position.Pos, Equals, uint32(12)) + + for _, subtask := range subTaskCfg { + subtask.EnableGTID = true + } + + minLoc, err = getMinLocInAllSubTasks(context.Background(), subTaskCfg) + c.Assert(err, IsNil) + c.Assert(minLoc.Position.Name, Equals, "mysql-binlog.00001") + c.Assert(minLoc.Position.Pos, Equals, uint32(123)) } func (t *testServer) TestUnifyMasterBinlogPos(c *C) { @@ -530,22 +541,38 @@ func (t *testServer) TestUnifyMasterBinlogPos(c *C) { c.Assert(relay.RelayCatchUpMaster, IsTrue) } -func getFakePosForSubTask(ctx context.Context, subTaskCfg *config.SubTaskConfig) (minPos *mysql.Position, err error) { - switch subTaskCfg.Name { - case "test1": - return &mysql.Position{ +func getFakeLocForSubTask(ctx context.Context, subTaskCfg *config.SubTaskConfig) (minLoc *binlog.Location, err error) { + gset1, _ := gtid.ParserGTID(mysql.MySQLFlavor, "ba8f633f-1f15-11eb-b1c7-0242ac110001:1-30") + gset2, _ := gtid.ParserGTID(mysql.MySQLFlavor, "ba8f633f-1f15-11eb-b1c7-0242ac110001:1-50") + gset3, _ := gtid.ParserGTID(mysql.MySQLFlavor, "ba8f633f-1f15-11eb-b1c7-0242ac110001:1-50,ba8f633f-1f15-11eb-b1c7-0242ac110002:1") + loc1 := binlog.InitLocation( + mysql.Position{ Name: "mysql-binlog.00001", Pos: 123, - }, nil - case "test2": - return &mysql.Position{ + }, + gset1, + ) + loc2 := binlog.InitLocation( + mysql.Position{ Name: "mysql-binlog.00001", Pos: 12, - }, nil - case "test3": - return &mysql.Position{ + }, + gset2, + ) + loc3 := binlog.InitLocation( + mysql.Position{ Name: "mysql-binlog.00003", - }, nil + }, + gset3, + ) + + switch subTaskCfg.Name { + case "test1": + return &loc1, nil + case "test2": + return &loc2, nil + case "test3": + return &loc3, nil default: return nil, nil } diff --git a/dm/worker/task_checker.go b/dm/worker/task_checker.go index 7fcba3dfe8..69c150ea4d 100644 --- a/dm/worker/task_checker.go +++ b/dm/worker/task_checker.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/retry" "github.com/pingcap/dm/pkg/terror" + "github.com/pingcap/dm/pkg/utils" ) //// Backoff related constants @@ -116,6 +117,11 @@ type backoffController struct { // task name -> the latest auto resume time latestResumeTime map[string]time.Time + + latestRelayPausedTime time.Time + latestRelayBlockTime time.Time + latestRelayResumeTime time.Time + relayBackoff *backoff.Backoff } // newBackoffController returns a new backoffController instance @@ -273,7 +279,72 @@ func (tsc *realTaskStatusChecker) getResumeStrategy(stStatus *pb.SubTaskStatus, return ResumeDispatch } -func (tsc *realTaskStatusChecker) check() { +func (tsc *realTaskStatusChecker) getRelayResumeStrategy(relayStatus *pb.RelayStatus, duration time.Duration) ResumeStrategy { + // relay that is not paused or paused manually, just ignore it + if relayStatus == nil || relayStatus.Stage != pb.Stage_Paused || relayStatus.Result == nil || relayStatus.Result.IsCanceled { + return ResumeIgnore + } + + for _, err := range relayStatus.Result.Errors { + if _, ok := retry.UnresumableRelayErrCodes[err.ErrCode]; ok { + return ResumeNoSense + } + } + + if time.Since(tsc.bc.latestRelayResumeTime) < duration { + return ResumeSkip + } + + return ResumeDispatch +} + +func (tsc *realTaskStatusChecker) checkRelayStatus() { + ctx, cancel := context.WithTimeout(context.Background(), utils.DefaultDBTimeout) + defer cancel() + + relayStatus := tsc.w.relayHolder.Status(ctx) + if tsc.bc.relayBackoff == nil { + tsc.bc.relayBackoff, _ = backoff.NewBackoff(tsc.cfg.BackoffFactor, tsc.cfg.BackoffJitter, tsc.cfg.BackoffMin.Duration, tsc.cfg.BackoffMax.Duration) + tsc.bc.latestRelayPausedTime = time.Now() + tsc.bc.latestRelayResumeTime = time.Now() + } + rbf := tsc.bc.relayBackoff + duration := rbf.Current() + strategy := tsc.getRelayResumeStrategy(relayStatus, duration) + switch strategy { + case ResumeIgnore: + if time.Since(tsc.bc.latestRelayPausedTime) > tsc.cfg.BackoffRollback.Duration { + rbf.Rollback() + // after each rollback, reset this timer + tsc.bc.latestRelayPausedTime = time.Now() + } + case ResumeNoSense: + // this strategy doesn't forward or rollback backoff + tsc.bc.latestRelayPausedTime = time.Now() + blockTime := tsc.bc.latestRelayBlockTime + if !blockTime.IsZero() { + tsc.l.Warn("relay can't auto resume", zap.Duration("paused duration", time.Since(blockTime))) + } else { + tsc.bc.latestRelayBlockTime = time.Now() + tsc.l.Warn("relay can't auto resume") + } + case ResumeSkip: + tsc.l.Warn("backoff skip auto resume relay", zap.Time("latestResumeTime", tsc.bc.latestRelayResumeTime), zap.Duration("duration", duration)) + tsc.bc.latestRelayPausedTime = time.Now() + case ResumeDispatch: + tsc.bc.latestRelayPausedTime = time.Now() + err := tsc.w.operateRelay(tsc.ctx, pb.RelayOp_ResumeRelay) + if err != nil { + tsc.l.Error("dispatch auto resume relay failed", zap.Error(err)) + } else { + tsc.l.Info("dispatch auto resume relay") + tsc.bc.latestRelayResumeTime = time.Now() + rbf.BoundaryForward() + } + } +} + +func (tsc *realTaskStatusChecker) checkTaskStatus() { allSubTaskStatus := tsc.w.getAllSubTaskStatus() defer func() { @@ -333,3 +404,10 @@ func (tsc *realTaskStatusChecker) check() { } } } + +func (tsc *realTaskStatusChecker) check() { + if tsc.w.cfg.EnableRelay { + tsc.checkRelayStatus() + } + tsc.checkTaskStatus() +} diff --git a/dm/worker/worker_test.go b/dm/worker/worker_test.go index 181f76c534..9017819135 100644 --- a/dm/worker/worker_test.go +++ b/dm/worker/worker_test.go @@ -92,11 +92,11 @@ func (t *testServer2) SetUpSuite(c *C) { err := log.InitLogger(&log.Config{}) c.Assert(err, IsNil) - getMinPosForSubTaskFunc = getFakePosForSubTask + getMinLocForSubTaskFunc = getFakeLocForSubTask } func (t *testServer2) TearDownSuite(c *C) { - getMinPosForSubTaskFunc = getMinPosForSubTask + getMinLocForSubTaskFunc = getMinLocForSubTask } func (t *testServer2) TestTaskAutoResume(c *C) { diff --git a/pkg/binlog/position.go b/pkg/binlog/position.go index edd9d7a0af..6f302e1a59 100644 --- a/pkg/binlog/position.go +++ b/pkg/binlog/position.go @@ -36,6 +36,8 @@ const ( // eg. mysql-bin.000003 in c6ae5afe-c7a3-11e8-a19d-0242ac130006.000002 => mysql-bin|000002.000003 // where `000002` in `c6ae5afe-c7a3-11e8-a19d-0242ac130006.000002` is the UUIDSuffix posUUIDSuffixSeparator = "|" + // MinUUIDSuffix is same as relay.MinUUIDSuffix + MinUUIDSuffix = 1 ) var ( @@ -110,6 +112,24 @@ func RealMySQLPos(pos gmysql.Position) (gmysql.Position, error) { return pos, nil } +// ExtractSuffix extracts uuidSuffix from input name +func ExtractSuffix(name string) (int, error) { + if len(name) == 0 { + return MinUUIDSuffix, nil + } + filename, err := ParseFilename(name) + if err != nil { + return 0, err + } + sepIdx := strings.LastIndex(filename.BaseName, posUUIDSuffixSeparator) + if sepIdx > 0 && sepIdx+len(posUUIDSuffixSeparator) < len(filename.BaseName) { + suffix := filename.BaseName[sepIdx+len(posUUIDSuffixSeparator):] + v, err := strconv.ParseInt(suffix, 10, 64) + return int(v), err + } + return MinUUIDSuffix, nil +} + // ExtractPos extracts (uuidWithSuffix, uuidSuffix, originalPos) from input pos (originalPos or convertedPos) func ExtractPos(pos gmysql.Position, uuids []string) (uuidWithSuffix string, uuidSuffix string, realPos gmysql.Position, err error) { if len(uuids) == 0 { diff --git a/pkg/binlog/position_test.go b/pkg/binlog/position_test.go index f551569330..24d8526075 100644 --- a/pkg/binlog/position_test.go +++ b/pkg/binlog/position_test.go @@ -765,3 +765,31 @@ func (t *testPositionSuite) TestSetGTID(c *C) { c.Assert(loc2.gtidSet.String(), Equals, GTIDSetStr) c.Assert(CompareLocation(loc, loc2, true), Equals, 1) } + +func (t *testPositionSuite) TestExtractSuffix(c *C) { + testCases := []struct { + name string + suffix int + }{ + { + "", + MinUUIDSuffix, + }, { + "mysql-bin.00005", + MinUUIDSuffix, + }, + { + "mysql-bin|000001.000001", + 1, + }, { + "mysql-bin|000005.000004", + 5, + }, + } + + for _, tc := range testCases { + suffix, err := ExtractSuffix(tc.name) + c.Assert(err, IsNil) + c.Assert(suffix, Equals, tc.suffix) + } +} diff --git a/pkg/binlog/reader/file.go b/pkg/binlog/reader/file.go index c8c7948e2d..b55651f2eb 100644 --- a/pkg/binlog/reader/file.go +++ b/pkg/binlog/reader/file.go @@ -22,6 +22,7 @@ import ( "sync" "time" + "github.com/pingcap/errors" gmysql "github.com/siddontang/go-mysql/mysql" "github.com/siddontang/go-mysql/replication" "github.com/siddontang/go/sync2" @@ -112,7 +113,9 @@ func (r *FileReader) StartSyncByPos(pos gmysql.Position) error { defer r.wg.Done() err := r.parser.ParseFile(pos.Name, int64(pos.Pos), r.onEvent) if err != nil { - r.logger.Error("fail to parse binlog file", zap.Error(err)) + if errors.Cause(err) != context.Canceled { + r.logger.Error("fail to parse binlog file", zap.Error(err)) + } select { case r.ech <- err: case <-r.ctx.Done(): diff --git a/pkg/retry/errors.go b/pkg/retry/errors.go index 2ea5027212..06cb3bd95d 100644 --- a/pkg/retry/errors.go +++ b/pkg/retry/errors.go @@ -68,6 +68,15 @@ var ( int32(terror.ErrDumpUnitRuntime.Code()): {}, int32(terror.ErrSyncerUnitDMLColumnNotMatch.Code()): {}, } + + // UnresumableRelayErrCodes is a set of unresumeable relay unit err codes. + UnresumableRelayErrCodes = map[int32]struct{}{ + int32(terror.ErrRelayUUIDSuffixNotValid.Code()): {}, + int32(terror.ErrRelayUUIDSuffixLessThanPrev.Code()): {}, + int32(terror.ErrRelayBinlogNameNotValid.Code()): {}, + int32(terror.ErrRelayNoCurrentUUID.Code()): {}, + int32(terror.ErrRelayLogDirpathEmpty.Code()): {}, + } ) // IsConnectionError tells whether this error should reconnect to Database diff --git a/pkg/utils/db.go b/pkg/utils/db.go index 8c491108a3..25fb57921a 100644 --- a/pkg/utils/db.go +++ b/pkg/utils/db.go @@ -78,6 +78,33 @@ func GetAllServerID(ctx context.Context, db *sql.DB) (map[uint32]struct{}, error return serverIDs, nil } +// ReuseServerID reuse given server ID or get a new server ID +func ReuseServerID(ctx context.Context, serverID uint32, db *sql.DB) (uint32, error) { + serverIDs, err := GetAllServerID(ctx, db) + if err != nil { + return 0, err + } + + if _, ok := serverIDs[serverID]; !ok && serverID > 0 { + // reuse given server ID + return serverID, nil + } + + rand.Seed(time.Now().UnixNano()) + for i := 0; i < 99999; i++ { + randomValue := uint32(rand.Intn(100000)) + randomServerID := uint32(defaultBaseServerID) + randomValue + if _, ok := serverIDs[randomServerID]; ok { + continue + } + + return randomServerID, nil + } + + // should never happened unless the master has too many slave. + return 0, terror.ErrInvalidServerID.Generatef("can't find a random available server ID") +} + // GetRandomServerID gets a random server ID which is not used func GetRandomServerID(ctx context.Context, db *sql.DB) (uint32, error) { rand.Seed(time.Now().UnixNano()) diff --git a/relay/config.go b/relay/config.go index ce4c84c8d2..63b150329d 100644 --- a/relay/config.go +++ b/relay/config.go @@ -35,6 +35,7 @@ type Config struct { // do not need to specify binlog-pos, because relay will fetch the whole file BinLogName string `toml:"binlog-name" json:"binlog-name"` BinlogGTID string `toml:"binlog-gtid" json:"binlog-gtid"` + UUIDSuffix int `toml:"-" json:"-"` // for binlog reader retry ReaderRetry retry.ReaderRetryConfig `toml:"reader-retry" json:"reader-retry"` @@ -61,6 +62,7 @@ func FromSourceCfg(sourceCfg *config.SourceConfig) *Config { From: clone.From, BinLogName: clone.RelayBinLogName, BinlogGTID: clone.RelayBinlogGTID, + UUIDSuffix: clone.UUIDSuffix, ReaderRetry: retry.ReaderRetryConfig{ // we use config from TaskChecker now BackoffRollback: clone.Checker.BackoffRollback.Duration, BackoffMax: clone.Checker.BackoffMax.Duration, diff --git a/relay/meta.go b/relay/meta.go index 5560c1beba..08368fa8a0 100644 --- a/relay/meta.go +++ b/relay/meta.go @@ -58,12 +58,13 @@ type Meta interface { Dirty() bool // AddDir adds sub relay directory for server UUID (without suffix) - // the added sub relay directory's suffix is incremented + // if uuidSuffix is not zero value, add sub relay directory with uuidSuffix (bound to a new source) + // otherwise the added sub relay directory's suffix is incremented (master/slave switch) // after sub relay directory added, the internal binlog pos should be reset // and binlog pos will be set again when new binlog events received // @serverUUID should be a server_uuid for MySQL or MariaDB // if set @newPos / @newGTID, old value will be replaced - AddDir(serverUUID string, newPos *mysql.Position, newGTID gtid.Set) error + AddDir(serverUUID string, newPos *mysql.Position, newGTID gtid.Set, uuidSuffix int) error // Pos returns current (UUID with suffix, Position) pair Pos() (string, mysql.Position) @@ -268,7 +269,7 @@ func (lm *LocalMeta) Dir() string { } // AddDir implements Meta.AddDir -func (lm *LocalMeta) AddDir(serverUUID string, newPos *mysql.Position, newGTID gtid.Set) error { +func (lm *LocalMeta) AddDir(serverUUID string, newPos *mysql.Position, newGTID gtid.Set, uuidSuffix int) error { lm.Lock() defer lm.Unlock() @@ -276,7 +277,11 @@ func (lm *LocalMeta) AddDir(serverUUID string, newPos *mysql.Position, newGTID g if len(lm.currentUUID) == 0 { // no UUID exists yet, simply add it - newUUID = utils.AddSuffixForUUID(serverUUID, minUUIDSufix) + if uuidSuffix == 0 { + newUUID = utils.AddSuffixForUUID(serverUUID, minUUIDSufix) + } else { + newUUID = utils.AddSuffixForUUID(serverUUID, uuidSuffix) + } } else { _, suffix, err := utils.ParseSuffixForUUID(lm.currentUUID) if err != nil { @@ -343,7 +348,10 @@ func (lm *LocalMeta) GTID() (string, gtid.Set) { lm.RLock() defer lm.RUnlock() - return lm.currentUUID, lm.gset.Clone() + if lm.gset != nil { + return lm.currentUUID, lm.gset.Clone() + } + return lm.currentUUID, nil } // UUID implements Meta.UUID diff --git a/relay/meta_test.go b/relay/meta_test.go index df55d6831c..9e5ca98073 100644 --- a/relay/meta_test.go +++ b/relay/meta_test.go @@ -143,7 +143,7 @@ func (r *testMetaSuite) TestLocalMeta(c *C) { c.Assert(gset.String(), Equals, latestGTIDStr) for _, cs := range cases { - err = lm.AddDir(cs.uuid, nil, nil) + err = lm.AddDir(cs.uuid, nil, nil, 0) c.Assert(err, IsNil) err = lm.Save(cs.pos, cs.gset) @@ -203,7 +203,7 @@ func (r *testMetaSuite) TestLocalMeta(c *C) { pos: mysql.Position{Name: "mysql-bin.000005", Pos: 789}, gset: gset5, } - err = lm.AddDir(cs.uuid, &cs.pos, cs.gset) + err = lm.AddDir(cs.uuid, &cs.pos, cs.gset, 0) c.Assert(err, IsNil) dirty = lm.Dirty() diff --git a/relay/relay.go b/relay/relay.go index 7098ceb46e..214c560edb 100755 --- a/relay/relay.go +++ b/relay/relay.go @@ -95,6 +95,10 @@ type Process interface { IsClosed() bool // SaveMeta save relay meta SaveMeta(pos mysql.Position, gset gtid.Set) error + // ResetMeta reset relay meta + ResetMeta() + // PurgeRelayDir will clear all contents under w.cfg.RelayDir + PurgeRelayDir() error } // Relay relays mysql binlog to local file. @@ -199,8 +203,8 @@ func (r *Relay) process(ctx context.Context) error { return err } - if isNew { - // re-setup meta for new server + if isNew || r.cfg.UUIDSuffix > 0 { + // re-setup meta for new server or new source err = r.reSetupMeta(ctx) if err != nil { return err @@ -214,7 +218,7 @@ func (r *Relay) process(ctx context.Context) error { } } - reader2, err := r.setUpReader() + reader2, err := r.setUpReader(ctx) if err != nil { return err } @@ -262,7 +266,7 @@ func (r *Relay) process(ctx context.Context) error { if err != nil { r.logger.Error("fail to close binlog event reader", zap.Error(err)) } - reader2, err = r.setUpReader() // setup a new one + reader2, err = r.setUpReader(ctx) // setup a new one if err != nil { return err } @@ -270,8 +274,8 @@ func (r *Relay) process(ctx context.Context) error { } } -// purgeRelayDir will clear all contents under w.cfg.RelayDir -func (r *Relay) purgeRelayDir() error { +// PurgeRelayDir implements the dm.Unit interface +func (r *Relay) PurgeRelayDir() error { dir := r.cfg.RelayDir d, err := os.Open(dir) // fail to open dir, return directly @@ -383,7 +387,13 @@ func (r *Relay) handleEvents(ctx context.Context, reader2 reader.Reader, transfo var ( _, lastPos = r.meta.Pos() _, lastGTID = r.meta.GTID() + err error ) + if lastGTID == nil { + if lastGTID, err = gtid.ParserGTID(r.cfg.Flavor, ""); err != nil { + return err + } + } for { // 1. read events from upstream server @@ -510,7 +520,32 @@ func (r *Relay) reSetupMeta(ctx context.Context) error { if err != nil { return err } - err = r.meta.AddDir(uuid, nil, nil) + + var newPos *mysql.Position + var newGset gtid.Set + var newUUIDSufiix int + if r.cfg.UUIDSuffix > 0 { + // if bound or rebound to a source, clear all relay log and meta + if err = r.PurgeRelayDir(); err != nil { + return err + } + r.ResetMeta() + + newUUIDSufiix = r.cfg.UUIDSuffix + // reset the UUIDSuffix + r.cfg.UUIDSuffix = 0 + + if len(r.cfg.BinLogName) != 0 { + newPos = &mysql.Position{Name: r.cfg.BinLogName, Pos: binlog.MinPosition.Pos} + } + if len(r.cfg.BinlogGTID) != 0 { + newGset, err = gtid.ParserGTID(r.cfg.Flavor, r.cfg.BinlogGTID) + if err != nil { + return err + } + } + } + err = r.meta.AddDir(uuid, newPos, newGset, newUUIDSufiix) if err != nil { return err } @@ -606,7 +641,18 @@ func (r *Relay) doIntervalOps(ctx context.Context) { } // setUpReader setups the underlying reader used to read binlog events from the upstream master server. -func (r *Relay) setUpReader() (reader.Reader, error) { +func (r *Relay) setUpReader(ctx context.Context) (reader.Reader, error) { + ctx2, cancel := context.WithTimeout(ctx, utils.DefaultDBTimeout) + defer cancel() + + randomServerID, err := utils.ReuseServerID(ctx2, r.cfg.ServerID, r.db) + if err != nil { + // should never happened unless the master has too many slave + return nil, terror.Annotate(err, "fail to get random server id for relay reader") + } + r.syncerCfg.ServerID = randomServerID + r.cfg.ServerID = randomServerID + uuid, pos := r.meta.Pos() _, gs := r.meta.GTID() cfg := &reader.Config{ @@ -618,7 +664,7 @@ func (r *Relay) setUpReader() (reader.Reader, error) { } reader2 := reader.NewReader(cfg) - err := reader2.Start() + err = reader2.Start() if err != nil { // do not log the whole config to protect the password in `SyncConfig`. // and other config items should already logged before or included in `err`. @@ -664,6 +710,12 @@ func (r *Relay) SaveMeta(pos mysql.Position, gset gtid.Set) error { return nil } +// ResetMeta reset relay meta +func (r *Relay) ResetMeta() { + r.meta = NewLocalMeta(r.cfg.Flavor, r.cfg.RelayDir) + r.relayMetaHub.ClearMeta() +} + // FlushMeta flush relay meta func (r *Relay) FlushMeta() error { return r.meta.Flush() diff --git a/relay/relay_test.go b/relay/relay_test.go index 37d9f27b3d..4aa5517b98 100644 --- a/relay/relay_test.go +++ b/relay/relay_test.go @@ -183,7 +183,7 @@ func (t *testRelaySuite) TestTryRecoverLatestFile(c *C) { f, err := os.Create(filepath.Join(r.cfg.RelayDir, "old_relay_log")) c.Assert(err, IsNil) f.Close() - c.Assert(r.purgeRelayDir(), IsNil) + c.Assert(r.PurgeRelayDir(), IsNil) files, err := ioutil.ReadDir(r.cfg.RelayDir) c.Assert(err, IsNil) c.Assert(files, HasLen, 0) @@ -194,7 +194,7 @@ func (t *testRelaySuite) TestTryRecoverLatestFile(c *C) { c.Assert(r.tryRecoverLatestFile(context.Background(), parser2), IsNil) // save position into meta - c.Assert(r.meta.AddDir(uuid, &startPos, nil), IsNil) + c.Assert(r.meta.AddDir(uuid, &startPos, nil, 0), IsNil) // relay log file does not exists, no need to recover c.Assert(r.tryRecoverLatestFile(context.Background(), parser2), IsNil) @@ -265,7 +265,7 @@ func (t *testRelaySuite) TestTryRecoverMeta(c *C) { recoverGTIDSet, err := gtid.ParserGTID(relayCfg.Flavor, recoverGTIDSetStr) c.Assert(err, IsNil) - c.Assert(r.meta.AddDir(uuid, &startPos, nil), IsNil) + c.Assert(r.meta.AddDir(uuid, &startPos, nil, 0), IsNil) c.Assert(r.meta.Load(), IsNil) // use a generator to generate some binlog events @@ -395,7 +395,7 @@ func (t *testRelaySuite) TestHandleEvent(c *C) { c.Assert(r.Init(context.Background()), IsNil) // NOTE: we can mock meta later. c.Assert(r.meta.Load(), IsNil) - c.Assert(r.meta.AddDir("24ecd093-8cec-11e9-aa0d-0242ac170002", nil, nil), IsNil) + c.Assert(r.meta.AddDir("24ecd093-8cec-11e9-aa0d-0242ac170002", nil, nil, 0), IsNil) // attach GTID sets to QueryEv queryEv2 := queryEv.Event.(*replication.QueryEvent) @@ -487,12 +487,11 @@ func (t *testRelaySuite) TestReSetupMeta(c *C) { defer cancel() var ( - relayCfg = &Config{ - RelayDir: c.MkDir(), - Flavor: gmysql.MySQLFlavor, - } - r = NewRelay(relayCfg).(*Relay) + relayCfg = newRelayCfg(c, mysql.MySQLFlavor) + r = NewRelay(relayCfg).(*Relay) ) + c.Assert(r.Init(context.Background()), IsNil) + // empty metadata c.Assert(r.meta.Load(), IsNil) t.verifyMetadata(c, r, "", minCheckpoint, "", nil) @@ -521,20 +520,32 @@ func (t *testRelaySuite) TestReSetupMeta(c *C) { uuid002 := fmt.Sprintf("%s.000002", uuid) t.verifyMetadata(c, r, uuid002, minCheckpoint, r.cfg.BinlogGTID, []string{uuid001, uuid002}) + r.cfg.BinLogName = "mysql-bin.000002" + r.cfg.BinlogGTID = "24ecd093-8cec-11e9-aa0d-0242ac170002:1-50,24ecd093-8cec-11e9-aa0d-0242ac170003:1-50" + r.cfg.UUIDSuffix = 2 + c.Assert(r.reSetupMeta(ctx), IsNil) + t.verifyMetadata(c, r, uuid002, gmysql.Position{Name: r.cfg.BinLogName, Pos: 4}, r.cfg.BinlogGTID, []string{uuid002}) + + // re-setup meta again, often happen when connecting a server behind a VIP. + c.Assert(r.reSetupMeta(ctx), IsNil) + uuid003 := fmt.Sprintf("%s.000003", uuid) + t.verifyMetadata(c, r, uuid003, minCheckpoint, r.cfg.BinlogGTID, []string{uuid002, uuid003}) } func (t *testRelaySuite) verifyMetadata(c *C, r *Relay, uuidExpected string, - posExpected gmysql.Position, gsStrExpected string, UUIDsExpected []string) { + posExpected gmysql.Position, gsStrExpected string, uuidsExpected []string) { uuid, pos := r.meta.Pos() _, gs := r.meta.GTID() + gsExpected, err := gtid.ParserGTID(mysql.MySQLFlavor, gsStrExpected) + c.Assert(err, IsNil) c.Assert(uuid, Equals, uuidExpected) c.Assert(pos, DeepEquals, posExpected) - c.Assert(gs.String(), Equals, gsStrExpected) + c.Assert(gs.Equal(gsExpected), IsTrue) indexFile := filepath.Join(r.cfg.RelayDir, utils.UUIDIndexFilename) UUIDs, err := utils.ParseUUIDIndex(indexFile) c.Assert(err, IsNil) - c.Assert(UUIDs, DeepEquals, UUIDsExpected) + c.Assert(UUIDs, DeepEquals, uuidsExpected) } func (t *testRelaySuite) TestProcess(c *C) { diff --git a/relay/transformer/transformer.go b/relay/transformer/transformer.go index 3fde2beec0..6682883212 100644 --- a/relay/transformer/transformer.go +++ b/relay/transformer/transformer.go @@ -67,6 +67,8 @@ func (t *transformer) Transform(e *replication.BinlogEvent) Result { } switch ev := e.Event.(type) { + case *replication.PreviousGTIDsEvent: + result.CanSaveGTID = true case *replication.RotateEvent: result.LogPos = uint32(ev.Position) // next event's position result.NextLogName = string(ev.NextLogName) // for RotateEvent, update binlog name diff --git a/syncer/streamer_controller.go b/syncer/streamer_controller.go index 2702903dd5..09f10133c8 100644 --- a/syncer/streamer_controller.go +++ b/syncer/streamer_controller.go @@ -237,6 +237,7 @@ func (c *StreamerController) GetEvent(tctx *tcontext.Context) (event *replicatio failpoint.Inject("SyncerGetEventError", func(_ failpoint.Value) { if !mockRestarted { mockRestarted = true + c.meetError = true tctx.L().Info("mock upstream instance restart", zap.String("failpoint", "SyncerGetEventError")) failpoint.Return(nil, terror.ErrDBBadConn.Generate()) } diff --git a/tests/all_mode/conf/source1.yaml b/tests/all_mode/conf/source1.yaml index 948af37465..49b830cced 100644 --- a/tests/all_mode/conf/source1.yaml +++ b/tests/all_mode/conf/source1.yaml @@ -3,6 +3,7 @@ source-id: mysql-replica-01 flavor: '' enable-gtid: false +enable-relay: true relay-binlog-name: '' relay-binlog-gtid: '' from: diff --git a/tests/all_mode/conf/source2.yaml b/tests/all_mode/conf/source2.yaml index 812917e6f2..32f44eb948 100644 --- a/tests/all_mode/conf/source2.yaml +++ b/tests/all_mode/conf/source2.yaml @@ -2,7 +2,8 @@ source-id: mysql-replica-02 flavor: '' -enable-gtid: false +enable-gtid: true +enable-relay: false relay-binlog-name: '' relay-binlog-gtid: '' from: diff --git a/tests/compatibility/conf/source1.yaml b/tests/compatibility/conf/source1.yaml index 810b0af3d1..2c7dc924ef 100644 --- a/tests/compatibility/conf/source1.yaml +++ b/tests/compatibility/conf/source1.yaml @@ -1,6 +1,7 @@ source-id: mysql-replica-01 flavor: '' enable-gtid: false +enable-relay: false relay-binlog-name: '' relay-binlog-gtid: '' from: diff --git a/tests/compatibility/conf/source2.yaml b/tests/compatibility/conf/source2.yaml index bd68886439..cc4bc2f3a4 100644 --- a/tests/compatibility/conf/source2.yaml +++ b/tests/compatibility/conf/source2.yaml @@ -1,6 +1,7 @@ source-id: mysql-replica-02 flavor: '' -enable-gtid: false +enable-gtid: true +enable-relay: true relay-binlog-name: '' relay-binlog-gtid: '' from: diff --git a/tests/compatibility/start.sh b/tests/compatibility/start.sh index 7f940c4926..53a1fe3466 100755 --- a/tests/compatibility/start.sh +++ b/tests/compatibility/start.sh @@ -68,7 +68,7 @@ function run() { run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status test" \ - "\"stage\": \"Running\"" 2 + "\"stage\": \"Running\"" 3 echo "use sync_diff_inspector to check data second time" check_sync_diff $WORK_DIR $cur/conf/diff_config.toml diff --git a/tests/dm_syncer/conf/source1.yaml b/tests/dm_syncer/conf/source1.yaml index 7cefc966e4..00754d9626 100644 --- a/tests/dm_syncer/conf/source1.yaml +++ b/tests/dm_syncer/conf/source1.yaml @@ -1,6 +1,7 @@ source-id: mysql-replica-01 flavor: '' enable-gtid: false +enable-relay: false relay-binlog-name: '' relay-binlog-gtid: '' from: diff --git a/tests/dm_syncer/conf/source2.yaml b/tests/dm_syncer/conf/source2.yaml index bd68886439..cc4bc2f3a4 100644 --- a/tests/dm_syncer/conf/source2.yaml +++ b/tests/dm_syncer/conf/source2.yaml @@ -1,6 +1,7 @@ source-id: mysql-replica-02 flavor: '' -enable-gtid: false +enable-gtid: true +enable-relay: true relay-binlog-name: '' relay-binlog-gtid: '' from: diff --git a/tests/dmctl_basic/check_list/get_config.sh b/tests/dmctl_basic/check_list/get_config.sh index c7f76d1eb5..e271b243dc 100644 --- a/tests/dmctl_basic/check_list/get_config.sh +++ b/tests/dmctl_basic/check_list/get_config.sh @@ -45,12 +45,12 @@ function diff_get_config() { "\"result\": true" 1 diff $dm_master_conf $cur/conf/get_master1.toml || exit 1 - run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "get-config worker worker1 --file $dm_worker1_conf" \ "\"result\": true" 1 diff $dm_worker1_conf $cur/conf/get_worker1.toml || exit 1 - run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "get-config worker worker2 --file $dm_worker2_conf" \ "\"result\": true" 1 diff $dm_worker2_conf $cur/conf/get_worker2.toml || exit 1 diff --git a/tests/dmctl_basic/conf/get_source2.yaml b/tests/dmctl_basic/conf/get_source2.yaml index b58ef52df0..bac6ddfd99 100644 --- a/tests/dmctl_basic/conf/get_source2.yaml +++ b/tests/dmctl_basic/conf/get_source2.yaml @@ -1,4 +1,4 @@ -enable-gtid: false +enable-gtid: true auto-fix-gtid: false relay-dir: /tmp/dm_test/dmctl_basic/worker2/relay_log meta-dir: "" diff --git a/tests/dmctl_basic/conf/source2.yaml b/tests/dmctl_basic/conf/source2.yaml index 871d35d6eb..b399d9fd6a 100644 --- a/tests/dmctl_basic/conf/source2.yaml +++ b/tests/dmctl_basic/conf/source2.yaml @@ -1,6 +1,6 @@ source-id: mysql-replica-02 server-id: 654321 -enable-gtid: false +enable-gtid: true relay-binlog-name: '' relay-binlog-gtid: '' enable-relay: true diff --git a/tests/dmctl_basic/run.sh b/tests/dmctl_basic/run.sh index 157e2ab23b..a5efb0b9d6 100755 --- a/tests/dmctl_basic/run.sh +++ b/tests/dmctl_basic/run.sh @@ -16,13 +16,13 @@ function usage_and_arg_test() { check_task_wrong_arg check_task_wrong_config_file -# echo "pause_relay_wrong_arg" -# pause_relay_wrong_arg -# pause_relay_wihout_worker -# -# echo "resume_relay_wrong_arg" -# resume_relay_wrong_arg -# resume_relay_wihout_worker + echo "pause_relay_wrong_arg" + pause_relay_wrong_arg + pause_relay_wihout_worker + + echo "resume_relay_wrong_arg" + resume_relay_wrong_arg + resume_relay_wihout_worker echo "pause_task_wrong_arg" pause_task_wrong_arg @@ -57,10 +57,10 @@ function usage_and_arg_test() { # update_master_config_wrong_arg # update_master_config_wrong_config_file # -# echo "purge_relay_wrong_arg" -# purge_relay_wrong_arg -# purge_relay_wihout_worker -# purge_relay_filename_with_multi_workers + echo "purge_relay_wrong_arg" + purge_relay_wrong_arg + purge_relay_wihout_worker + purge_relay_filename_with_multi_workers echo "operate_source_empty_arg" operate_source_empty_arg @@ -147,13 +147,13 @@ function run() { '"worker": "worker1"' 1 \ '"worker": "worker2"' 1 -# echo "pause_relay_success" -# pause_relay_success -# query_status_stopped_relay -# # pause twice won't receive an error now -# # pause_relay_fail -# resume_relay_success -# query_status_with_no_tasks + echo "pause_relay_success" + pause_relay_success + query_status_stopped_relay + # pause twice won't receive an error now + # pause_relay_fail + resume_relay_success + query_status_with_no_tasks echo "dmctl_check_task" check_task_pass $TASK_CONF diff --git a/tests/dmctl_command/conf/source1.yaml b/tests/dmctl_command/conf/source1.yaml index 3cf5da4388..6cb9c1ac65 100644 --- a/tests/dmctl_command/conf/source1.yaml +++ b/tests/dmctl_command/conf/source1.yaml @@ -1,6 +1,7 @@ source-id: mysql-replica-01 flavor: '' enable-gtid: false +enable-relay: false relay-binlog-name: '' relay-binlog-gtid: '' from: diff --git a/tests/dmctl_command/conf/source2.yaml b/tests/dmctl_command/conf/source2.yaml index bd68886439..c7b63092c6 100644 --- a/tests/dmctl_command/conf/source2.yaml +++ b/tests/dmctl_command/conf/source2.yaml @@ -1,6 +1,8 @@ source-id: mysql-replica-02 flavor: '' -enable-gtid: false +enable-gtid: true +enable-relay: true +relay-dir: '' relay-binlog-name: '' relay-binlog-gtid: '' from: diff --git a/tests/dmctl_command/run.sh b/tests/dmctl_command/run.sh index 236e70e3b3..4bdcd80b28 100644 --- a/tests/dmctl_command/run.sh +++ b/tests/dmctl_command/run.sh @@ -6,7 +6,7 @@ cur=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd ) source $cur/../_utils/test_prepare WORK_DIR=$TEST_DIR/$TEST_NAME -help_cnt=31 +help_cnt=34 function run() { # check dmctl alone output @@ -93,7 +93,6 @@ function run() { cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml cp $cur/conf/source2.yaml $WORK_DIR/source2.yaml sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker1/relay_log" $WORK_DIR/source1.yaml - sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker2/relay_log" $WORK_DIR/source2.yaml dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1 dmctl_operate_source create $WORK_DIR/source2.yaml $SOURCE_ID2 diff --git a/tests/drop_column_with_index/conf/source1.yaml b/tests/drop_column_with_index/conf/source1.yaml index 573489a847..1f01c40998 100644 --- a/tests/drop_column_with_index/conf/source1.yaml +++ b/tests/drop_column_with_index/conf/source1.yaml @@ -1,6 +1,7 @@ source-id: mysql-replica-01 flavor: '' enable-gtid: false +enable-relay: true relay-binlog-name: '' relay-binlog-gtid: '' from: diff --git a/tests/drop_column_with_index/run.sh b/tests/drop_column_with_index/run.sh index 242505c190..8c4f7b77a9 100755 --- a/tests/drop_column_with_index/run.sh +++ b/tests/drop_column_with_index/run.sh @@ -29,7 +29,8 @@ function run() { check_sync_diff $WORK_DIR $cur/conf/diff_config.toml check_log_contain_with_retry "mock upstream instance restart" $WORK_DIR/worker1/log/dm-worker.log - check_log_contain_with_retry "dispatch auto resume task" $WORK_DIR/worker1/log/dm-worker.log + # check_log_contain_with_retry "dispatch auto resume task" $WORK_DIR/worker1/log/dm-worker.log + check_log_contain_with_retry "meet error when read from local binlog, will switch to remote binlog" $WORK_DIR/worker1/log/dm-worker.log run_sql_file $cur/data/db1.increment.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 diff --git a/tests/full_mode/conf/source1.yaml b/tests/full_mode/conf/source1.yaml index c1cc021476..901727c15b 100644 --- a/tests/full_mode/conf/source1.yaml +++ b/tests/full_mode/conf/source1.yaml @@ -2,6 +2,7 @@ server-id: 101 source-id: mysql-replica-01 flavor: '' enable-gtid: false +enable-relay: false relay-binlog-name: '' relay-binlog-gtid: '' from: diff --git a/tests/full_mode/conf/source2.yaml b/tests/full_mode/conf/source2.yaml index 8b08f3b082..f393837885 100644 --- a/tests/full_mode/conf/source2.yaml +++ b/tests/full_mode/conf/source2.yaml @@ -1,7 +1,8 @@ server-id: 102 source-id: mysql-replica-02 flavor: '' -enable-gtid: false +enable-gtid: true +enable-relay: true relay-binlog-name: '' relay-binlog-gtid: '' from: diff --git a/tests/full_mode/run.sh b/tests/full_mode/run.sh index 987a3ff305..1f5bcd12bc 100755 --- a/tests/full_mode/run.sh +++ b/tests/full_mode/run.sh @@ -52,7 +52,7 @@ function fail_acquire_global_lock() { run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status test" \ - "\"stage\": \"Paused\"" 2 \ + "\"stage\": \"Paused\"" 3 \ "you need (at least one of) the RELOAD privilege(s) for this operation" 2 cleanup_data full_mode diff --git a/tests/ha/conf/dm-worker3.toml b/tests/ha/conf/dm-worker3.toml index ceecd87470..ab7e1b9cb3 100644 --- a/tests/ha/conf/dm-worker3.toml +++ b/tests/ha/conf/dm-worker3.toml @@ -1,2 +1,2 @@ name = "worker3" -join = "127.0.0.1:8261,127.0.0.1:8361,127.0.0.1:8461,127.0.0.1:8561,127.0.0.1:8661" +join = "127.0.0.1:8261" diff --git a/tests/ha/conf/source2.yaml b/tests/ha/conf/source2.yaml index fb1985ca35..d6f0846831 100644 --- a/tests/ha/conf/source2.yaml +++ b/tests/ha/conf/source2.yaml @@ -1,7 +1,7 @@ source-id: mysql-replica-02 flavor: '' enable-gtid: false -enable-relay: false +enable-relay: true from: host: 127.0.0.1 user: root diff --git a/tests/ha/run.sh b/tests/ha/run.sh index 8e195ae70b..647783c6da 100755 --- a/tests/ha/run.sh +++ b/tests/ha/run.sh @@ -91,22 +91,22 @@ function run() { echo "query-status from all dm-master" run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT1" \ "query-status test" \ - "\"stage\": \"Running\"" 2 + "\"stage\": \"Running\"" 3 run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT2" \ "query-status test" \ - "\"stage\": \"Running\"" 2 + "\"stage\": \"Running\"" 3 run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT3" \ "query-status test" \ - "\"stage\": \"Running\"" 2 + "\"stage\": \"Running\"" 3 echo "join new dm-master and query-status" run_dm_master $WORK_DIR/master4 $MASTER_PORT4 $cur/conf/dm-master4.toml check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT4 run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT4" \ "query-status test" \ - "\"stage\": \"Running\"" 2 + "\"stage\": \"Running\"" 3 # may join failed with error `fail to join embed etcd: add member http://127.0.0.1:8295: etcdserver: unhealthy cluster`, and dm-master will exit. so just sleep some seconds. sleep 5 @@ -115,14 +115,14 @@ function run() { check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT5 run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT5" \ "query-status test" \ - "\"stage\": \"Running\"" 2 + "\"stage\": \"Running\"" 3 sleep 5 run_dm_master $WORK_DIR/master6 $MASTER_PORT6 $cur/conf/dm-master6.toml check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT6 run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT6" \ "query-status test" \ - "\"stage\": \"Running\"" 2 + "\"stage\": \"Running\"" 3 sleep 5 echo "kill dm-master1" diff --git a/tests/ha_cases/conf/source1.yaml b/tests/ha_cases/conf/source1.yaml index 01660cf685..8b2f8c4319 100644 --- a/tests/ha_cases/conf/source1.yaml +++ b/tests/ha_cases/conf/source1.yaml @@ -1,7 +1,7 @@ source-id: mysql-replica-01 flavor: '' enable-gtid: false -enable-relay: false +enable-relay: true from: host: 127.0.0.1 user: root diff --git a/tests/ha_cases/conf/source2.yaml b/tests/ha_cases/conf/source2.yaml index 100a7bd6b3..31bd5de10e 100644 --- a/tests/ha_cases/conf/source2.yaml +++ b/tests/ha_cases/conf/source2.yaml @@ -1,7 +1,7 @@ source-id: mysql-replica-02 flavor: '' enable-gtid: true -enable-relay: false +enable-relay: true from: host: 127.0.0.1 user: root diff --git a/tests/ha_cases/lib.sh b/tests/ha_cases/lib.sh index 9850905a89..e5afc4bb3d 100755 --- a/tests/ha_cases/lib.sh +++ b/tests/ha_cases/lib.sh @@ -20,8 +20,8 @@ function load_data() { run_sql "CREATE DATABASE if not exists ${db};" $port $pswd run_sql "DROP TABLE if exists ${db}.t${i};" $port $pswd run_sql "CREATE TABLE ${db}.t${i}(i TINYINT, j INT UNIQUE KEY);" $port $pswd - for j in $(seq 80); do - run_sql "INSERT INTO ${db}.t${i} VALUES ($j,${j}000$j),($j,${j}001$j);" $port $pswd + for j in $(seq 800); do + run_sql "INSERT INTO ${db}.t${i} VALUES ($j,${j}00$j),($j,${j}01$j);" $port $pswd sleep 0.1 done } diff --git a/tests/ha_cases/run.sh b/tests/ha_cases/run.sh index abfc3fe566..d7c05fa060 100755 --- a/tests/ha_cases/run.sh +++ b/tests/ha_cases/run.sh @@ -19,7 +19,7 @@ function test_running() { # make sure task to step in "Sync" stage run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT3" \ "query-status test" \ - "\"stage\": \"Running\"" 2 \ + "\"stage\": \"Running\"" 4 \ "\"unit\": \"Sync\"" 2 echo "use sync_diff_inspector to check full dump loader" @@ -49,11 +49,11 @@ function test_multi_task_running() { # make sure task to step in "Sync" stage run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT3" \ "query-status test" \ - "\"stage\": \"Running\"" 2 \ + "\"stage\": \"Running\"" 4 \ "\"unit\": \"Sync\"" 2 run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT3" \ "query-status test2" \ - "\"stage\": \"Running\"" 2 \ + "\"stage\": \"Running\"" 4 \ "\"unit\": \"Sync\"" 2 echo "use sync_diff_inspector to check full dump loader" @@ -158,11 +158,11 @@ function test_kill_master() { echo "check master2,3 are running" run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT2" \ "query-status test" \ - "\"stage\": \"Running\"" 2 + "\"stage\": \"Running\"" 4 run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT3" \ "query-status test" \ - "\"stage\": \"Running\"" 2 + "\"stage\": \"Running\"" 4 run_sql_file_withdb $cur/data/db1.increment2.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 $ha_test run_sql_file_withdb $cur/data/db2.increment2.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 $ha_test @@ -211,33 +211,33 @@ function test_kill_and_isolate_worker() { isolate_worker 4 "isolate" run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT1" \ "query-status test" \ - "\"stage\": \"Running\"" 2 + "\"stage\": \"Running\"" 4 echo "isolate dm-worker3" isolate_worker 3 "isolate" run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT1" \ "query-status test" \ - "\"stage\": \"Running\"" 1 \ + "\"stage\": \"Running\"" 2 \ "\"result\": false" 1 echo "disable isolate dm-worker4" isolate_worker 4 "disable_isolate" run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT1" \ "query-status test" \ - "\"stage\": \"Running\"" 2 + "\"stage\": \"Running\"" 4 echo "query-status from all dm-master" run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT1" \ "query-status test" \ - "\"stage\": \"Running\"" 2 + "\"stage\": \"Running\"" 4 run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT2" \ "query-status test" \ - "\"stage\": \"Running\"" 2 + "\"stage\": \"Running\"" 4 run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT3" \ "query-status test" \ - "\"stage\": \"Running\"" 2 + "\"stage\": \"Running\"" 4 run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "pause-task test"\ @@ -281,7 +281,7 @@ function test_kill_master_in_sync() { check_http_alive 127.0.0.1:$MASTER_PORT2/apis/${API_VERSION}/status/test '"stage": "Running"' 10 run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT2" \ "query-status test" \ - "\"stage\": \"Running\"" 2 + "\"stage\": \"Running\"" 4 # waiting for syncing wait @@ -319,15 +319,15 @@ function test_kill_worker_in_sync() { echo "query-status from all dm-master" run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT1" \ "query-status test" \ - "\"stage\": \"Running\"" 2 + "\"stage\": \"Running\"" 4 run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT2" \ "query-status test" \ - "\"stage\": \"Running\"" 2 + "\"stage\": \"Running\"" 4 run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT3" \ "query-status test" \ - "\"stage\": \"Running\"" 2 + "\"stage\": \"Running\"" 4 # waiting for syncing wait @@ -404,7 +404,7 @@ function test_standalone_running() { # test should still running run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status test"\ - "\"stage\": \"Running\"" 1 + "\"stage\": \"Running\"" 2 echo "[$(date)] <<<<<< finish test_standalone_running >>>>>>" } @@ -451,7 +451,7 @@ function test_pause_task() { run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status $name"\ - "\"stage\": \"Running\"" 2 + "\"stage\": \"Running\"" 4 done # waiting for syncing @@ -499,7 +499,7 @@ function test_stop_task() { run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status ${task_name[$idx]}"\ - "\"stage\": \"Running\"" 2 + "\"stage\": \"Running\"" 4 done # waiting for syncing @@ -542,7 +542,7 @@ function test_multi_task_reduce_and_restart_worker() { echo "try to kill worker port ${worker_ports[$[ $idx - 1 ] ]}" ps aux | grep dm-worker${idx} |awk '{print $2}'|xargs kill || true - check_port_offline ${worker_ports[$[ $idx - 1] ]} 20 + run_dm_ctl_with_retry $WORK_DIR 127.0.0.1:$MASTER_PORT2 "list-member --worker --name=worker$idx" '"stage": "offline"' 1 echo "start dm-worker${idx}" run_dm_worker $WORK_DIR/worker${idx} ${worker_ports[$[ $idx - 1] ]} $cur/conf/dm-worker${idx}.toml @@ -561,7 +561,7 @@ function test_multi_task_reduce_and_restart_worker() { for name in ${task_name[@]}; do run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status $name"\ - "\"stage\": \"Running\"" 2 + "\"stage\": \"Running\"" 4 done # waiting for syncing @@ -578,7 +578,7 @@ function test_multi_task_reduce_and_restart_worker() { done search_str="\"stage\": \"Running\"" running_count=$(echo $status_str | sed "s/$search_str/$search_str\n/g" | grep -c "$search_str") - if [ $running_count != 4 ]; then + if [ $running_count != 8 ]; then echo "error running worker" echo $status_str exit 1 diff --git a/tests/ha_master/conf/source2.yaml b/tests/ha_master/conf/source2.yaml index 92d2ee77b6..a3d54dbfc6 100644 --- a/tests/ha_master/conf/source2.yaml +++ b/tests/ha_master/conf/source2.yaml @@ -3,7 +3,7 @@ flavor: '' enable-gtid: false relay-binlog-name: '' relay-binlog-gtid: '' -enable-relay: false +enable-relay: true from: host: 127.0.0.1 user: root diff --git a/tests/ha_master/run.sh b/tests/ha_master/run.sh index 8112be6558..cba066a069 100755 --- a/tests/ha_master/run.sh +++ b/tests/ha_master/run.sh @@ -327,7 +327,7 @@ function run() { check_http_alive 127.0.0.1:$MASTER_PORT3/apis/${API_VERSION}/status/test '"stage": "Running"' 10 run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT3" \ "query-status test" \ - "\"stage\": \"Running\"" 2 + "\"stage\": \"Running\"" 3 run_sql_file $cur/data/db1.increment.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 run_sql_file $cur/data/db2.increment.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 @@ -351,7 +351,7 @@ function run() { # the last two masters should elect a new leader and serve service run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT4" \ "query-status test" \ - "\"stage\": \"Running\"" 2 + "\"stage\": \"Running\"" 3 # run master3 again run_dm_master $WORK_DIR/master3 $MASTER_PORT3 $cur/conf/dm-master3.toml diff --git a/tests/http_apis/conf/source1.yaml b/tests/http_apis/conf/source1.yaml index 1894302d70..fae9f5bd7e 100644 --- a/tests/http_apis/conf/source1.yaml +++ b/tests/http_apis/conf/source1.yaml @@ -1,6 +1,7 @@ source-id: mysql-replica-01 flavor: '' enable-gtid: true +enable-relay: true relay-binlog-name: '' relay-binlog-gtid: '' from: diff --git a/tests/http_apis/run.sh b/tests/http_apis/run.sh index 1f4980deab..0b574026d2 100644 --- a/tests/http_apis/run.sh +++ b/tests/http_apis/run.sh @@ -50,7 +50,7 @@ function run() { sleep 1 curl -X GET 127.0.0.1:$MASTER_PORT/apis/${API_VERSION}/status/test > $WORK_DIR/status.log - check_log_contains $WORK_DIR/status.log "\"stage\": \"Running\"" 1 + check_log_contains $WORK_DIR/status.log "\"stage\": \"Running\"" 2 check_log_contains $WORK_DIR/status.log "\"name\": \"test\"" 1 echo "get sub task configs" @@ -72,7 +72,7 @@ function run() { sleep 1 curl -X GET 127.0.0.1:$MASTER_PORT/apis/${API_VERSION}/status/test > $WORK_DIR/status.log - check_log_contains $WORK_DIR/status.log "\"stage\": \"Running\"" 1 + check_log_contains $WORK_DIR/status.log "\"stage\": \"Running\"" 2 check_log_contains $WORK_DIR/status.log "\"name\": \"test\"" 1 sleep 1 diff --git a/tests/import_goroutine_leak/conf/source1.yaml b/tests/import_goroutine_leak/conf/source1.yaml index 810b0af3d1..7d67feb8cf 100644 --- a/tests/import_goroutine_leak/conf/source1.yaml +++ b/tests/import_goroutine_leak/conf/source1.yaml @@ -1,6 +1,7 @@ source-id: mysql-replica-01 flavor: '' enable-gtid: false +enable-relay: true relay-binlog-name: '' relay-binlog-gtid: '' from: diff --git a/tests/import_goroutine_leak/run.sh b/tests/import_goroutine_leak/run.sh index a192007b96..a38aaf3dca 100644 --- a/tests/import_goroutine_leak/run.sh +++ b/tests/import_goroutine_leak/run.sh @@ -111,7 +111,7 @@ function run() { # wait until the task running run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status test" \ - '"stage": "Running"' 1 + '"stage": "Running"' 2 sleep 2 # wait to be blocked # check to be blocked diff --git a/tests/import_v10x/conf/source1.yaml b/tests/import_v10x/conf/source1.yaml index 6d078d2523..4bb3139236 100644 --- a/tests/import_v10x/conf/source1.yaml +++ b/tests/import_v10x/conf/source1.yaml @@ -3,6 +3,7 @@ source-id: mysql-replica-01 flavor: '' enable-gtid: false +enable-relay: false from: host: 127.0.0.1 user: root diff --git a/tests/import_v10x/conf/source2.yaml b/tests/import_v10x/conf/source2.yaml index 0a99dec60b..d0f4e7bc6f 100644 --- a/tests/import_v10x/conf/source2.yaml +++ b/tests/import_v10x/conf/source2.yaml @@ -3,6 +3,7 @@ source-id: mysql-replica-02 flavor: '' enable-gtid: true +enable-relay: true from: host: 127.0.0.1 user: root diff --git a/tests/import_v10x/run.sh b/tests/import_v10x/run.sh index 918b8ed163..f5455337dc 100644 --- a/tests/import_v10x/run.sh +++ b/tests/import_v10x/run.sh @@ -71,7 +71,7 @@ function run() { # check task running. run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status test" \ - "\"stage\": \"Running\"" 2 + "\"stage\": \"Running\"" 3 # check task config, just a simple match run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ diff --git a/tests/incremental_mode/conf/source1.yaml b/tests/incremental_mode/conf/source1.yaml index 6e0aa87543..2bc3933f51 100644 --- a/tests/incremental_mode/conf/source1.yaml +++ b/tests/incremental_mode/conf/source1.yaml @@ -3,6 +3,7 @@ flavor: '' enable-gtid: true relay-binlog-name: '' relay-binlog-gtid: '' +enable-relay: false from: host: 127.0.0.1 user: root diff --git a/tests/incremental_mode/conf/source2.yaml b/tests/incremental_mode/conf/source2.yaml index eb3f32f965..e21304db8e 100644 --- a/tests/incremental_mode/conf/source2.yaml +++ b/tests/incremental_mode/conf/source2.yaml @@ -3,6 +3,7 @@ flavor: '' enable-gtid: false relay-binlog-name: '' relay-binlog-gtid: '' +enable-relay: true from: host: 127.0.0.1 user: root diff --git a/tests/incremental_mode/run.sh b/tests/incremental_mode/run.sh index 70b7b07dc7..05dedd05e0 100755 --- a/tests/incremental_mode/run.sh +++ b/tests/incremental_mode/run.sh @@ -110,7 +110,7 @@ function run() { run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status test" \ - "Running" 2 + "Running" 3 # we use failpoint to let worker sleep 8 second when executeSQLs, to increase possibility of # meeting an error of context cancel. # when below check pass, it means we filter out that error, or that error doesn't happen. diff --git a/tests/load_interrupt/conf/source1.yaml b/tests/load_interrupt/conf/source1.yaml index 810b0af3d1..7d67feb8cf 100644 --- a/tests/load_interrupt/conf/source1.yaml +++ b/tests/load_interrupt/conf/source1.yaml @@ -1,6 +1,7 @@ source-id: mysql-replica-01 flavor: '' enable-gtid: false +enable-relay: true relay-binlog-name: '' relay-binlog-gtid: '' from: diff --git a/tests/online_ddl/conf/source1.yaml b/tests/online_ddl/conf/source1.yaml index 1894302d70..49390fefa4 100644 --- a/tests/online_ddl/conf/source1.yaml +++ b/tests/online_ddl/conf/source1.yaml @@ -1,6 +1,7 @@ source-id: mysql-replica-01 flavor: '' enable-gtid: true +enable-relay: false relay-binlog-name: '' relay-binlog-gtid: '' from: diff --git a/tests/online_ddl/conf/source2.yaml b/tests/online_ddl/conf/source2.yaml index bd68886439..3df6bdf955 100644 --- a/tests/online_ddl/conf/source2.yaml +++ b/tests/online_ddl/conf/source2.yaml @@ -1,6 +1,7 @@ source-id: mysql-replica-02 flavor: '' enable-gtid: false +enable-relay: true relay-binlog-name: '' relay-binlog-gtid: '' from: diff --git a/tests/online_ddl/run.sh b/tests/online_ddl/run.sh index d970a23d43..59f4a42546 100755 --- a/tests/online_ddl/run.sh +++ b/tests/online_ddl/run.sh @@ -52,7 +52,7 @@ function real_run() { echo "wait and check task running" run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status test" \ - "\"stage\": \"Running\"" 2 + "\"stage\": \"Running\"" 3 run_sql_file $cur/data/db1.increment2.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 run_sql_file $cur/data/db2.increment2.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 diff --git a/tests/print_status/conf/source1.yaml b/tests/print_status/conf/source1.yaml index 810b0af3d1..7d67feb8cf 100644 --- a/tests/print_status/conf/source1.yaml +++ b/tests/print_status/conf/source1.yaml @@ -1,6 +1,7 @@ source-id: mysql-replica-01 flavor: '' enable-gtid: false +enable-relay: true relay-binlog-name: '' relay-binlog-gtid: '' from: diff --git a/tests/safe_mode/conf/source1.yaml b/tests/safe_mode/conf/source1.yaml index 1894302d70..fae9f5bd7e 100644 --- a/tests/safe_mode/conf/source1.yaml +++ b/tests/safe_mode/conf/source1.yaml @@ -1,6 +1,7 @@ source-id: mysql-replica-01 flavor: '' enable-gtid: true +enable-relay: true relay-binlog-name: '' relay-binlog-gtid: '' from: diff --git a/tests/safe_mode/conf/source2.yaml b/tests/safe_mode/conf/source2.yaml index bd68886439..cc24bfedbc 100644 --- a/tests/safe_mode/conf/source2.yaml +++ b/tests/safe_mode/conf/source2.yaml @@ -1,6 +1,7 @@ source-id: mysql-replica-02 flavor: '' enable-gtid: false +enable-relay: false relay-binlog-name: '' relay-binlog-gtid: '' from: diff --git a/tests/sequence_safe_mode/conf/source1.yaml b/tests/sequence_safe_mode/conf/source1.yaml index 1894302d70..49390fefa4 100644 --- a/tests/sequence_safe_mode/conf/source1.yaml +++ b/tests/sequence_safe_mode/conf/source1.yaml @@ -1,6 +1,7 @@ source-id: mysql-replica-01 flavor: '' enable-gtid: true +enable-relay: false relay-binlog-name: '' relay-binlog-gtid: '' from: diff --git a/tests/sequence_safe_mode/conf/source2.yaml b/tests/sequence_safe_mode/conf/source2.yaml index bd68886439..3df6bdf955 100644 --- a/tests/sequence_safe_mode/conf/source2.yaml +++ b/tests/sequence_safe_mode/conf/source2.yaml @@ -1,6 +1,7 @@ source-id: mysql-replica-02 flavor: '' enable-gtid: false +enable-relay: true relay-binlog-name: '' relay-binlog-gtid: '' from: diff --git a/tests/sequence_sharding/conf/source1.yaml b/tests/sequence_sharding/conf/source1.yaml index 1894302d70..fae9f5bd7e 100644 --- a/tests/sequence_sharding/conf/source1.yaml +++ b/tests/sequence_sharding/conf/source1.yaml @@ -1,6 +1,7 @@ source-id: mysql-replica-01 flavor: '' enable-gtid: true +enable-relay: true relay-binlog-name: '' relay-binlog-gtid: '' from: diff --git a/tests/sequence_sharding/conf/source2.yaml b/tests/sequence_sharding/conf/source2.yaml index bd68886439..cc24bfedbc 100644 --- a/tests/sequence_sharding/conf/source2.yaml +++ b/tests/sequence_sharding/conf/source2.yaml @@ -1,6 +1,7 @@ source-id: mysql-replica-02 flavor: '' enable-gtid: false +enable-relay: false relay-binlog-name: '' relay-binlog-gtid: '' from: diff --git a/tests/sequence_sharding_optimistic/conf/source1.yaml b/tests/sequence_sharding_optimistic/conf/source1.yaml index 082837b755..bca70d80c0 100644 --- a/tests/sequence_sharding_optimistic/conf/source1.yaml +++ b/tests/sequence_sharding_optimistic/conf/source1.yaml @@ -1,6 +1,7 @@ source-id: mysql-replica-01 flavor: '' enable-gtid: false +enable-relay: false relay-binlog-name: '' relay-binlog-gtid: '' from: diff --git a/tests/sequence_sharding_optimistic/conf/source2.yaml b/tests/sequence_sharding_optimistic/conf/source2.yaml index eb3f32f965..17e3479acb 100644 --- a/tests/sequence_sharding_optimistic/conf/source2.yaml +++ b/tests/sequence_sharding_optimistic/conf/source2.yaml @@ -1,6 +1,7 @@ source-id: mysql-replica-02 flavor: '' -enable-gtid: false +enable-gtid: true +enable-relay: true relay-binlog-name: '' relay-binlog-gtid: '' from: diff --git a/tests/sequence_sharding_optimistic/run.sh b/tests/sequence_sharding_optimistic/run.sh index fd382bde38..998d2b31b5 100755 --- a/tests/sequence_sharding_optimistic/run.sh +++ b/tests/sequence_sharding_optimistic/run.sh @@ -74,7 +74,7 @@ run() { "\"result\": true" 3 run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status $task_name" \ - "\"stage\": \"Running\"" 2 + "\"stage\": \"Running\"" 3 # now, for optimistic shard DDL, different sources will reach a stage often not at the same time, # in order to simply the check and resume flow, only enable the failpoint for one DM-worker. @@ -198,7 +198,7 @@ run() { "\"result\": true" 3 run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status $task_name" \ - "\"stage\": \"Running\"" 2 + "\"stage\": \"Running\"" 3 # use sync_diff_inspector to check data now! check_sync_diff $WORK_DIR $cur/conf/diff_config.toml diff --git a/tests/sequence_sharding_removemeta/conf/source1.yaml b/tests/sequence_sharding_removemeta/conf/source1.yaml index 1894302d70..49390fefa4 100644 --- a/tests/sequence_sharding_removemeta/conf/source1.yaml +++ b/tests/sequence_sharding_removemeta/conf/source1.yaml @@ -1,6 +1,7 @@ source-id: mysql-replica-01 flavor: '' enable-gtid: true +enable-relay: false relay-binlog-name: '' relay-binlog-gtid: '' from: diff --git a/tests/sequence_sharding_removemeta/conf/source2.yaml b/tests/sequence_sharding_removemeta/conf/source2.yaml index bd68886439..3df6bdf955 100644 --- a/tests/sequence_sharding_removemeta/conf/source2.yaml +++ b/tests/sequence_sharding_removemeta/conf/source2.yaml @@ -1,6 +1,7 @@ source-id: mysql-replica-02 flavor: '' enable-gtid: false +enable-relay: true relay-binlog-name: '' relay-binlog-gtid: '' from: diff --git a/tests/sharding/conf/source1.yaml b/tests/sharding/conf/source1.yaml index 6e0aa87543..a3b176d0da 100644 --- a/tests/sharding/conf/source1.yaml +++ b/tests/sharding/conf/source1.yaml @@ -1,6 +1,7 @@ source-id: mysql-replica-01 flavor: '' enable-gtid: true +enable-relay: false relay-binlog-name: '' relay-binlog-gtid: '' from: diff --git a/tests/sharding/conf/source2.yaml b/tests/sharding/conf/source2.yaml index eb3f32f965..4271c2b9f2 100644 --- a/tests/sharding/conf/source2.yaml +++ b/tests/sharding/conf/source2.yaml @@ -1,6 +1,7 @@ source-id: mysql-replica-02 flavor: '' enable-gtid: false +enable-relay: true relay-binlog-name: '' relay-binlog-gtid: '' from: diff --git a/tests/sharding2/conf/source1.yaml b/tests/sharding2/conf/source1.yaml index 1894302d70..49390fefa4 100644 --- a/tests/sharding2/conf/source1.yaml +++ b/tests/sharding2/conf/source1.yaml @@ -1,6 +1,7 @@ source-id: mysql-replica-01 flavor: '' enable-gtid: true +enable-relay: false relay-binlog-name: '' relay-binlog-gtid: '' from: diff --git a/tests/sharding2/conf/source2.yaml b/tests/sharding2/conf/source2.yaml index 31f2a552d2..cc4bc2f3a4 100644 --- a/tests/sharding2/conf/source2.yaml +++ b/tests/sharding2/conf/source2.yaml @@ -1,6 +1,7 @@ source-id: mysql-replica-02 flavor: '' enable-gtid: true +enable-relay: true relay-binlog-name: '' relay-binlog-gtid: '' from: diff --git a/tests/sharding2/run.sh b/tests/sharding2/run.sh index 99b0ae7c6c..f0b95343b5 100755 --- a/tests/sharding2/run.sh +++ b/tests/sharding2/run.sh @@ -81,7 +81,7 @@ function run() { run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status test" \ - "\"stage\": \"Running\"" 2 \ + "\"stage\": \"Running\"" 3 \ "\"unit\": \"Sync\"" 2 } diff --git a/tests/start_task/conf/source1.yaml b/tests/start_task/conf/source1.yaml index 1894302d70..fae9f5bd7e 100644 --- a/tests/start_task/conf/source1.yaml +++ b/tests/start_task/conf/source1.yaml @@ -1,6 +1,7 @@ source-id: mysql-replica-01 flavor: '' enable-gtid: true +enable-relay: true relay-binlog-name: '' relay-binlog-gtid: '' from: diff --git a/tests/tiup/conf/source1.yaml b/tests/tiup/conf/source1.yaml index 999c5decff..c49cc78ae2 100644 --- a/tests/tiup/conf/source1.yaml +++ b/tests/tiup/conf/source1.yaml @@ -1,5 +1,6 @@ source-id: mysql-replica-01 -enable-gtid: true +enable-gtid: false +enable-relay: false from: host: mysql1 user: root diff --git a/tests/tiup/conf/source2.yaml b/tests/tiup/conf/source2.yaml index 0cdcfefe87..cf0a47d14e 100644 --- a/tests/tiup/conf/source2.yaml +++ b/tests/tiup/conf/source2.yaml @@ -1,5 +1,6 @@ source-id: mysql-replica-02 enable-gtid: true +enable-relay: true from: host: mysql2 user: root diff --git a/tests/tls/conf/source1.yaml b/tests/tls/conf/source1.yaml index 1894302d70..fae9f5bd7e 100644 --- a/tests/tls/conf/source1.yaml +++ b/tests/tls/conf/source1.yaml @@ -1,6 +1,7 @@ source-id: mysql-replica-01 flavor: '' enable-gtid: true +enable-relay: true relay-binlog-name: '' relay-binlog-gtid: '' from: