diff --git a/dm/common/common.go b/dm/common/common.go index e8d1264973..53f47d8040 100644 --- a/dm/common/common.go +++ b/dm/common/common.go @@ -38,6 +38,10 @@ var ( // UpstreamBoundWorkerKeyAdapter used to store address of worker in which MySQL-tasks which are running. // k/v: Encode(name) -> the bound relationship. UpstreamBoundWorkerKeyAdapter KeyAdapter = keyHexEncoderDecoder("/dm-master/bound-worker/") + // UpstreamLastBoundWorkerKeyAdapter used to store address of worker in which MySQL-tasks which are running. + // different with UpstreamBoundWorkerKeyAdapter, this kv should not be deleted when unbound, to provide a priority + // k/v: Encode(name) -> the bound relationship. + UpstreamLastBoundWorkerKeyAdapter KeyAdapter = keyHexEncoderDecoder("/dm-master/last-bound-worker/") // TaskConfigKeyAdapter used to store task config string. // k/v: Encode(task-name) -> task-config-string TaskConfigKeyAdapter KeyAdapter = keyHexEncoderDecoder("/dm-master/task/") @@ -77,7 +81,8 @@ var ( func keyAdapterKeysLen(s KeyAdapter) int { switch s { case WorkerRegisterKeyAdapter, UpstreamConfigKeyAdapter, UpstreamBoundWorkerKeyAdapter, - WorkerKeepAliveKeyAdapter, StageRelayKeyAdapter, TaskConfigKeyAdapter: + WorkerKeepAliveKeyAdapter, StageRelayKeyAdapter, TaskConfigKeyAdapter, + UpstreamLastBoundWorkerKeyAdapter: return 1 case UpstreamSubTaskKeyAdapter, StageSubTaskKeyAdapter, ShardDDLPessimismInfoKeyAdapter, ShardDDLPessimismOperationKeyAdapter, diff --git a/dm/master/scheduler/scheduler.go b/dm/master/scheduler/scheduler.go index 445fc250f3..72e047f45c 100644 --- a/dm/master/scheduler/scheduler.go +++ b/dm/master/scheduler/scheduler.go @@ -111,6 +111,9 @@ type Scheduler struct { // - when bounded the source to a worker. unbounds map[string]struct{} + // a mirror of bounds whose element is not deleted when worker unbound. worker -> SourceBound + lastBound map[string]ha.SourceBound + // expectant relay stages for sources, source ID -> stage. // add: // - bound the source to a worker (at first time). @@ -143,6 +146,7 @@ func NewScheduler(pLogger *log.Logger, securityCfg config.Security) *Scheduler { workers: make(map[string]*Worker), bounds: make(map[string]*Worker), unbounds: make(map[string]struct{}), + lastBound: make(map[string]ha.SourceBound), expectRelayStages: make(map[string]ha.Stage), expectSubTaskStages: make(map[string]map[string]ha.Stage), securityCfg: securityCfg, @@ -906,6 +910,11 @@ func (s *Scheduler) recoverWorkersBounds(cli *clientv3.Client) (int64, error) { if err != nil { return 0, err } + lastSourceBoundM, _, err := ha.GetLastSourceBounds(cli) + if err != nil { + return 0, err + } + s.lastBound = lastSourceBoundM // 3. get all history offline status. kam, rev, err := ha.GetKeepAliveWorkers(cli) @@ -1164,14 +1173,22 @@ func (s *Scheduler) handleWorkerOffline(ev ha.WorkerEvent, toLock bool) error { return nil } -// tryBoundForWorker tries to bound a random unbounded source to the worker. +// tryBoundForWorker tries to bound a source to the worker. first try last source of this worker, then randomly pick one // returns (true, nil) after bounded. func (s *Scheduler) tryBoundForWorker(w *Worker) (bounded bool, err error) { - // 1. check whether any unbound source exists. - var source string - for source = range s.unbounds { - break // got a source. + // 1. check if last bound is still available. + // if lastBound not found, or this source has been bounded to another worker (we also check that source still exists + // here), randomly pick one from unbounds. + // NOTE: if worker isn't in lastBound, we'll get "zero" SourceBound and it's OK, because "zero" string is not in + // unbounds + source := s.lastBound[w.baseInfo.Name].Source + if _, ok := s.unbounds[source]; !ok { + source = "" + for source = range s.unbounds { + break // got a source. + } } + if source == "" { s.logger.Info("no unbound sources need to bound", zap.Stringer("worker", w.BaseInfo())) return false, nil @@ -1198,14 +1215,31 @@ func (s *Scheduler) tryBoundForWorker(w *Worker) (bounded bool, err error) { // tryBoundForSource tries to bound a source to a random Free worker. // returns (true, nil) after bounded. func (s *Scheduler) tryBoundForSource(source string) (bool, error) { - // 1. try to find a random Free worker. + // 1. try to find history workers, then random Free worker. var worker *Worker - for _, w := range s.workers { - if w.Stage() == WorkerFree { - worker = w - break + for workerName, bound := range s.lastBound { + if bound.Source == source { + w, ok := s.workers[workerName] + if !ok { + // a not found worker + continue + } + if w.Stage() == WorkerFree { + worker = w + break + } } } + + if worker == nil { + for _, w := range s.workers { + if w.Stage() == WorkerFree { + worker = w + break + } + } + } + if worker == nil { s.logger.Info("no free worker exists for bound", zap.String("source", source)) return false, nil @@ -1280,7 +1314,7 @@ func (s *Scheduler) deleteWorker(name string) { // updateStatusForBound updates the in-memory status for bound, including: // - update the stage of worker to `Bound`. -// - record the bound relationship in the scheduler. +// - record the bound relationship and last bound relationship in the scheduler. // this func is called after the bound relationship existed in etcd. func (s *Scheduler) updateStatusForBound(w *Worker, b ha.SourceBound) error { err := w.ToBound(b) @@ -1288,6 +1322,7 @@ func (s *Scheduler) updateStatusForBound(w *Worker, b ha.SourceBound) error { return err } s.bounds[b.Source] = w + s.lastBound[b.Worker] = b return nil } diff --git a/dm/master/scheduler/scheduler_test.go b/dm/master/scheduler/scheduler_test.go index ac0ee66384..5ced30a348 100644 --- a/dm/master/scheduler/scheduler_test.go +++ b/dm/master/scheduler/scheduler_test.go @@ -944,3 +944,77 @@ func (t *testScheduler) TestWatchWorkerEventEtcdCompact(c *C) { cancel3() wg.Wait() } + +func (t *testScheduler) TestLastBound(c *C) { + defer clearTestInfoOperation(c) + + var ( + logger = log.L() + s = NewScheduler(&logger, config.Security{}) + sourceID1 = "mysql-replica-1" + sourceID2 = "mysql-replica-2" + workerName1 = "dm-worker-1" + workerName2 = "dm-worker-2" + workerName3 = "dm-worker-3" + workerName4 = "dm-worker-4" + sourceCfg1 config.SourceConfig + ) + + c.Assert(sourceCfg1.LoadFromFile(sourceSampleFile), IsNil) + sourceCfg1.SourceID = sourceID1 + sourceCfg2 := sourceCfg1 + sourceCfg2.SourceID = sourceID2 + worker1 := &Worker{baseInfo: ha.WorkerInfo{Name: workerName1}} + worker2 := &Worker{baseInfo: ha.WorkerInfo{Name: workerName2}} + worker3 := &Worker{baseInfo: ha.WorkerInfo{Name: workerName3}} + worker4 := &Worker{baseInfo: ha.WorkerInfo{Name: workerName4}} + + // step 1: start an empty scheduler without listening the worker event + s.started = true + s.etcdCli = etcdTestCli + s.workers[workerName1] = worker1 + s.workers[workerName2] = worker2 + s.workers[workerName3] = worker3 + s.workers[workerName4] = worker4 + + s.lastBound[workerName1] = ha.SourceBound{Source: sourceID1} + s.lastBound[workerName2] = ha.SourceBound{Source: sourceID2} + s.unbounds[sourceID1] = struct{}{} + s.unbounds[sourceID2] = struct{}{} + + // worker1 goes to last bounded source + worker1.ToFree() + bounded, err := s.tryBoundForWorker(worker1) + c.Assert(err, IsNil) + c.Assert(bounded, IsTrue) + c.Assert(s.bounds[sourceID1], DeepEquals, worker1) + + // worker3 has to bounded to source2 + worker3.ToFree() + bounded, err = s.tryBoundForWorker(worker3) + c.Assert(err, IsNil) + c.Assert(bounded, IsTrue) + c.Assert(s.bounds[sourceID2], DeepEquals, worker3) + + // though worker2 has a previous source, that source is not available, so not bound + worker2.ToFree() + bounded, err = s.tryBoundForWorker(worker2) + c.Assert(err, IsNil) + c.Assert(bounded, IsFalse) + + // worker4 is used to test whether source2 should be bounded to worker2 rather than a new worker + worker4.ToFree() + bounded, err = s.tryBoundForWorker(worker4) + c.Assert(err, IsNil) + c.Assert(bounded, IsFalse) + + // after worker3 become offline, worker2 should be bounded to worker2 + s.updateStatusForUnbound(sourceID2) + _, ok := s.bounds[sourceID2] + c.Assert(ok, IsFalse) + worker3.ToOffline() + bounded, err = s.tryBoundForSource(sourceID2) + c.Assert(err, IsNil) + c.Assert(bounded, IsTrue) + c.Assert(s.bounds[sourceID2], DeepEquals, worker2) +} diff --git a/pkg/ha/bound.go b/pkg/ha/bound.go index a36589fc4a..e6be3e3369 100644 --- a/pkg/ha/bound.go +++ b/pkg/ha/bound.go @@ -86,11 +86,11 @@ func sourceBoundFromJSON(s string) (b SourceBound, err error) { func PutSourceBound(cli *clientv3.Client, bounds ...SourceBound) (int64, error) { ops := make([]clientv3.Op, 0, len(bounds)) for _, bound := range bounds { - op, err := putSourceBoundOp(bound) + boundOps, err := putSourceBoundOp(bound) if err != nil { return 0, err } - ops = append(ops, op) + ops = append(ops, boundOps...) } _, rev, err := etcdutil.DoOpsInOneTxnWithRetry(cli, ops...) return rev, err @@ -137,6 +137,29 @@ func GetSourceBound(cli *clientv3.Client, worker string) (map[string]SourceBound return sbm, resp.Header.Revision, nil } +// GetLastSourceBounds gets all last source bound relationship. Different with GetSourceBound, "last source bound" will +// not be deleted when worker offline +func GetLastSourceBounds(cli *clientv3.Client) (map[string]SourceBound, int64, error) { + ctx, cancel := context.WithTimeout(cli.Ctx(), etcdutil.DefaultRequestTimeout) + defer cancel() + + var ( + sbm = make(map[string]SourceBound) + ) + resp, err := cli.Get(ctx, common.UpstreamLastBoundWorkerKeyAdapter.Path(), clientv3.WithPrefix()) + + if err != nil { + return sbm, 0, err + } + + sbm, err = sourceBoundFromResp("", resp) + if err != nil { + return sbm, 0, err + } + + return sbm, resp.Header.Revision, nil +} + // GetSourceBoundConfig gets the source bound relationship and relative source config at the same time // for the specified DM-worker. The index worker **must not be empty**: // if source bound is empty, will return an empty sourceBound and an empty source config @@ -310,14 +333,22 @@ func deleteSourceBoundOp(worker string) clientv3.Op { return clientv3.OpDelete(common.UpstreamBoundWorkerKeyAdapter.Encode(worker)) } -// putSourceBoundOp returns a PUT etcd operation for the bound relationship. +// deleteLastSourceBoundOp returns a DELETE ectd operation for the last bound relationship of the specified DM-worker. +func deleteLastSourceBoundOp(worker string) clientv3.Op { + return clientv3.OpDelete(common.UpstreamLastBoundWorkerKeyAdapter.Encode(worker)) +} + +// putSourceBoundOp returns PUT etcd operations for the bound relationship. // k/v: worker-name -> bound relationship. -func putSourceBoundOp(bound SourceBound) (clientv3.Op, error) { +func putSourceBoundOp(bound SourceBound) ([]clientv3.Op, error) { value, err := bound.toJSON() if err != nil { - return clientv3.Op{}, err + return []clientv3.Op{}, err } - key := common.UpstreamBoundWorkerKeyAdapter.Encode(bound.Worker) + key1 := common.UpstreamBoundWorkerKeyAdapter.Encode(bound.Worker) + op1 := clientv3.OpPut(key1, value) + key2 := common.UpstreamLastBoundWorkerKeyAdapter.Encode(bound.Worker) + op2 := clientv3.OpPut(key2, value) - return clientv3.OpPut(key, value), nil + return []clientv3.Op{op1, op2}, nil } diff --git a/pkg/ha/ops.go b/pkg/ha/ops.go index d57e549f8c..03cced27b8 100644 --- a/pkg/ha/ops.go +++ b/pkg/ha/ops.go @@ -33,9 +33,9 @@ func PutRelayStageSourceBound(cli *clientv3.Client, stage Stage, bound SourceBou if err != nil { return 0, err } - ops := make([]clientv3.Op, 0, 2) + ops := make([]clientv3.Op, 0, 3) ops = append(ops, ops1...) - ops = append(ops, op2) + ops = append(ops, op2...) _, rev, err := etcdutil.DoOpsInOneTxnWithRetry(cli, ops...) return rev, err } @@ -48,7 +48,8 @@ func DeleteSourceCfgRelayStageSourceBound(cli *clientv3.Client, source, worker s sourceCfgOp := deleteSourceCfgOp(source) relayStageOp := deleteRelayStageOp(source) sourceBoundOp := deleteSourceBoundOp(worker) - _, rev, err := etcdutil.DoOpsInOneTxnWithRetry(cli, sourceCfgOp, relayStageOp, sourceBoundOp) + lastBoundOp := deleteLastSourceBoundOp(worker) + _, rev, err := etcdutil.DoOpsInOneTxnWithRetry(cli, sourceCfgOp, relayStageOp, sourceBoundOp, lastBoundOp) return rev, err } diff --git a/pkg/ha/source.go b/pkg/ha/source.go index b473298c79..12a3dedbb3 100644 --- a/pkg/ha/source.go +++ b/pkg/ha/source.go @@ -102,9 +102,10 @@ func ClearTestInfoOperation(cli *clientv3.Client) error { clearWorkerInfo := clientv3.OpDelete(common.WorkerRegisterKeyAdapter.Path(), clientv3.WithPrefix()) clearWorkerKeepAlive := clientv3.OpDelete(common.WorkerKeepAliveKeyAdapter.Path(), clientv3.WithPrefix()) clearBound := clientv3.OpDelete(common.UpstreamBoundWorkerKeyAdapter.Path(), clientv3.WithPrefix()) + clearLastBound := clientv3.OpDelete(common.UpstreamLastBoundWorkerKeyAdapter.Path(), clientv3.WithPrefix()) clearRelayStage := clientv3.OpDelete(common.StageRelayKeyAdapter.Path(), clientv3.WithPrefix()) clearSubTaskStage := clientv3.OpDelete(common.StageSubTaskKeyAdapter.Path(), clientv3.WithPrefix()) _, _, err := etcdutil.DoOpsInOneTxnWithRetry(cli, clearSource, clearTask, clearSubTask, clearWorkerInfo, clearBound, - clearWorkerKeepAlive, clearRelayStage, clearSubTaskStage) + clearLastBound, clearWorkerKeepAlive, clearRelayStage, clearSubTaskStage) return err } diff --git a/tests/ha_cases/lib.sh b/tests/ha_cases/lib.sh index e5afc4bb3d..b8300eb3c6 100755 --- a/tests/ha_cases/lib.sh +++ b/tests/ha_cases/lib.sh @@ -186,3 +186,40 @@ function isolate_worker() { run_dm_worker $WORK_DIR/worker$1 $port $cur/conf/dm-worker$1.toml export GO_FAILPOINTS='' } + +function check_bound() { + bound1=$($PWD/bin/dmctl.test DEVEL --master-addr "127.0.0.1:$MASTER_PORT1" list-member --name worker1 \ + | grep 'source' | awk -F: '{print $2}') + bound2=$($PWD/bin/dmctl.test DEVEL --master-addr "127.0.0.1:$MASTER_PORT1" list-member --name worker2 \ + | grep 'source' | awk -F: '{print $2}') + if [[ $worker1bound != $bound1 || $worker2bound != $bound2 ]]; then + echo "worker1bound $worker1bound bound1 $bound1" + echo "worker2bound $worker2bound bound2 $bound2" + exit 1 + fi +} + +function start_2_worker_ensure_bound() { + worker_ports_2=(0 $WORKER1_PORT $WORKER2_PORT $WORKER3_PORT $WORKER4_PORT $WORKER5_PORT) + + echo "start worker$1" + run_dm_worker $WORK_DIR/worker$1 ${worker_ports_2[$1]} $cur/conf/dm-worker$1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:${worker_ports_2[$1]} + echo "start worker$2" + run_dm_worker $WORK_DIR/worker$2 ${worker_ports_2[$2]} $cur/conf/dm-worker$2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:${worker_ports_2[$2]} + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT1" \ + "list-member --name worker$1 --name worker$2" \ + "\"source\": \"mysql-replica-01\"" 1 \ + "\"source\": \"mysql-replica-02\"" 1 +} + +function kill_2_worker_ensure_unbound() { + echo "kill dm-worker$1" + ps aux | grep dm-worker$1 |awk '{print $2}'|xargs kill || true + echo "kill dm-worker$2" + ps aux | grep dm-worker$2 |awk '{print $2}'|xargs kill || true + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT1" \ + "list-member --name worker$1 --name worker$2" \ + "\"source\": \"\"" 2 +} \ No newline at end of file diff --git a/tests/ha_cases/run.sh b/tests/ha_cases/run.sh index 0dbdb20a33..f8cdcfc73a 100755 --- a/tests/ha_cases/run.sh +++ b/tests/ha_cases/run.sh @@ -703,7 +703,47 @@ function test_config_name() { echo "[$(date)] <<<<<< finish test_config_name >>>>>>" } +function test_last_bound() { + echo "[$(date)] <<<<<< start test_last_bound >>>>>>" + test_running + + worker1bound=$($PWD/bin/dmctl.test DEVEL --master-addr "127.0.0.1:$MASTER_PORT1" list-member --name worker1 \ + | grep 'source' | awk -F: '{print $2}') + echo "worker1bound $worker1bound" + worker2bound=$($PWD/bin/dmctl.test DEVEL --master-addr "127.0.0.1:$MASTER_PORT1" list-member --name worker2 \ + | grep 'source' | awk -F: '{print $2}') + echo "worker2bound $worker2bound" + + kill_2_worker_ensure_unbound 1 2 + + # start 1 then 2 + start_2_worker_ensure_bound 1 2 + + check_bound + + kill_2_worker_ensure_unbound 1 2 + + # start 2 then 1 + start_2_worker_ensure_bound 2 1 + + check_bound + + # kill 12, start 34, kill 34 + kill_2_worker_ensure_unbound 1 2 + start_2_worker_ensure_bound 3 4 + kill_2_worker_ensure_unbound 3 4 + + # start 1 then 2 + start_2_worker_ensure_bound 1 2 + + # check + check_bound + + echo "[$(date)] <<<<<< finish test_last_bound >>>>>>" +} + function run() { + test_last_bound test_config_name # TICASE-915, 916, 954, 955 test_join_masters_and_worker # TICASE-928, 930, 931, 961, 932, 957 test_kill_master # TICASE-996, 958