Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

*: last bound source is prior to other source when pick for bound #1373

Merged
merged 14 commits into from
Jan 23, 2021
7 changes: 6 additions & 1 deletion dm/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ var (
// UpstreamBoundWorkerKeyAdapter used to store address of worker in which MySQL-tasks which are running.
// k/v: Encode(name) -> the bound relationship.
UpstreamBoundWorkerKeyAdapter KeyAdapter = keyHexEncoderDecoder("/dm-master/bound-worker/")
// UpstreamLastBoundWorkerKeyAdapter used to store address of worker in which MySQL-tasks which are running.
// different with UpstreamBoundWorkerKeyAdapter, this kv should not be deleted when unbound, to provide a priority
GMHDBJD marked this conversation as resolved.
Show resolved Hide resolved
// k/v: Encode(name) -> the bound relationship.
UpstreamLastBoundWorkerKeyAdapter KeyAdapter = keyHexEncoderDecoder("/dm-master/last-bound-worker/")
// TaskConfigKeyAdapter used to store task config string.
// k/v: Encode(task-name) -> task-config-string
TaskConfigKeyAdapter KeyAdapter = keyHexEncoderDecoder("/dm-master/task/")
Expand Down Expand Up @@ -77,7 +81,8 @@ var (
func keyAdapterKeysLen(s KeyAdapter) int {
switch s {
case WorkerRegisterKeyAdapter, UpstreamConfigKeyAdapter, UpstreamBoundWorkerKeyAdapter,
WorkerKeepAliveKeyAdapter, StageRelayKeyAdapter, TaskConfigKeyAdapter:
WorkerKeepAliveKeyAdapter, StageRelayKeyAdapter, TaskConfigKeyAdapter,
UpstreamLastBoundWorkerKeyAdapter:
return 1
case UpstreamSubTaskKeyAdapter, StageSubTaskKeyAdapter,
ShardDDLPessimismInfoKeyAdapter, ShardDDLPessimismOperationKeyAdapter,
Expand Down
57 changes: 46 additions & 11 deletions dm/master/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ type Scheduler struct {
// - when bounded the source to a worker.
unbounds map[string]struct{}

// a mirror of bounds whose element is not deleted when worker unbound. worker -> SourceBound
lastBound map[string]ha.SourceBound

// expectant relay stages for sources, source ID -> stage.
// add:
// - bound the source to a worker (at first time).
Expand Down Expand Up @@ -143,6 +146,7 @@ func NewScheduler(pLogger *log.Logger, securityCfg config.Security) *Scheduler {
workers: make(map[string]*Worker),
bounds: make(map[string]*Worker),
unbounds: make(map[string]struct{}),
lastBound: make(map[string]ha.SourceBound),
expectRelayStages: make(map[string]ha.Stage),
expectSubTaskStages: make(map[string]map[string]ha.Stage),
securityCfg: securityCfg,
Expand Down Expand Up @@ -906,6 +910,11 @@ func (s *Scheduler) recoverWorkersBounds(cli *clientv3.Client) (int64, error) {
if err != nil {
return 0, err
}
lastSourceBoundM, _, err := ha.GetLastSourceBounds(cli)
if err != nil {
return 0, err
}
s.lastBound = lastSourceBoundM

// 3. get all history offline status.
kam, rev, err := ha.GetKeepAliveWorkers(cli)
Expand Down Expand Up @@ -1164,14 +1173,22 @@ func (s *Scheduler) handleWorkerOffline(ev ha.WorkerEvent, toLock bool) error {
return nil
}

// tryBoundForWorker tries to bound a random unbounded source to the worker.
// tryBoundForWorker tries to bound a source to the worker. first try last source of this worker, then randomly pick one
// returns (true, nil) after bounded.
func (s *Scheduler) tryBoundForWorker(w *Worker) (bounded bool, err error) {
// 1. check whether any unbound source exists.
var source string
for source = range s.unbounds {
break // got a source.
// 1. check if last bound is still available.
// if lastBound not found, or this source has been bounded to another worker (we also check that source still exists
// here), randomly pick one from unbounds.
// NOTE: if worker isn't in lastBound, we'll get "zero" SourceBound and it's OK, because "zero" string is not in
// unbounds
source := s.lastBound[w.baseInfo.Name].Source
GMHDBJD marked this conversation as resolved.
Show resolved Hide resolved
if _, ok := s.unbounds[source]; !ok {
source = ""
for source = range s.unbounds {
break // got a source.
}
}

if source == "" {
s.logger.Info("no unbound sources need to bound", zap.Stringer("worker", w.BaseInfo()))
return false, nil
Expand All @@ -1198,14 +1215,31 @@ func (s *Scheduler) tryBoundForWorker(w *Worker) (bounded bool, err error) {
// tryBoundForSource tries to bound a source to a random Free worker.
// returns (true, nil) after bounded.
func (s *Scheduler) tryBoundForSource(source string) (bool, error) {
// 1. try to find a random Free worker.
// 1. try to find history workers, then random Free worker.
GMHDBJD marked this conversation as resolved.
Show resolved Hide resolved
var worker *Worker
for _, w := range s.workers {
if w.Stage() == WorkerFree {
worker = w
break
for workerName, bound := range s.lastBound {
if bound.Source == source {
w, ok := s.workers[workerName]
if !ok {
// a not found worker
continue
}
if w.Stage() == WorkerFree {
worker = w
break
}
}
}

if worker == nil {
for _, w := range s.workers {
if w.Stage() == WorkerFree {
worker = w
break
}
}
}

if worker == nil {
s.logger.Info("no free worker exists for bound", zap.String("source", source))
return false, nil
Expand Down Expand Up @@ -1280,14 +1314,15 @@ func (s *Scheduler) deleteWorker(name string) {

// updateStatusForBound updates the in-memory status for bound, including:
// - update the stage of worker to `Bound`.
// - record the bound relationship in the scheduler.
// - record the bound relationship and last bound relationship in the scheduler.
// this func is called after the bound relationship existed in etcd.
func (s *Scheduler) updateStatusForBound(w *Worker, b ha.SourceBound) error {
err := w.ToBound(b)
if err != nil {
return err
}
s.bounds[b.Source] = w
s.lastBound[b.Worker] = b
return nil
}

Expand Down
74 changes: 74 additions & 0 deletions dm/master/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -944,3 +944,77 @@ func (t *testScheduler) TestWatchWorkerEventEtcdCompact(c *C) {
cancel3()
wg.Wait()
}

func (t *testScheduler) TestLastBound(c *C) {
defer clearTestInfoOperation(c)

var (
logger = log.L()
s = NewScheduler(&logger, config.Security{})
sourceID1 = "mysql-replica-1"
sourceID2 = "mysql-replica-2"
workerName1 = "dm-worker-1"
workerName2 = "dm-worker-2"
workerName3 = "dm-worker-3"
workerName4 = "dm-worker-4"
sourceCfg1 config.SourceConfig
)

c.Assert(sourceCfg1.LoadFromFile(sourceSampleFile), IsNil)
sourceCfg1.SourceID = sourceID1
sourceCfg2 := sourceCfg1
sourceCfg2.SourceID = sourceID2
worker1 := &Worker{baseInfo: ha.WorkerInfo{Name: workerName1}}
worker2 := &Worker{baseInfo: ha.WorkerInfo{Name: workerName2}}
worker3 := &Worker{baseInfo: ha.WorkerInfo{Name: workerName3}}
worker4 := &Worker{baseInfo: ha.WorkerInfo{Name: workerName4}}

// step 1: start an empty scheduler without listening the worker event
s.started = true
s.etcdCli = etcdTestCli
s.workers[workerName1] = worker1
s.workers[workerName2] = worker2
s.workers[workerName3] = worker3
s.workers[workerName4] = worker4

s.lastBound[workerName1] = ha.SourceBound{Source: sourceID1}
s.lastBound[workerName2] = ha.SourceBound{Source: sourceID2}
s.unbounds[sourceID1] = struct{}{}
s.unbounds[sourceID2] = struct{}{}

// worker1 goes to last bounded source
worker1.ToFree()
bounded, err := s.tryBoundForWorker(worker1)
c.Assert(err, IsNil)
c.Assert(bounded, IsTrue)
c.Assert(s.bounds[sourceID1], DeepEquals, worker1)

// worker3 has to bounded to source2
worker3.ToFree()
bounded, err = s.tryBoundForWorker(worker3)
c.Assert(err, IsNil)
c.Assert(bounded, IsTrue)
c.Assert(s.bounds[sourceID2], DeepEquals, worker3)

// though worker2 has a previous source, that source is not available, so not bound
worker2.ToFree()
bounded, err = s.tryBoundForWorker(worker2)
c.Assert(err, IsNil)
c.Assert(bounded, IsFalse)

// worker4 is used to test whether source2 should be bounded to worker2 rather than a new worker
worker4.ToFree()
bounded, err = s.tryBoundForWorker(worker4)
c.Assert(err, IsNil)
c.Assert(bounded, IsFalse)

// after worker3 become offline, worker2 should be bounded to worker2
s.updateStatusForUnbound(sourceID2)
_, ok := s.bounds[sourceID2]
c.Assert(ok, IsFalse)
worker3.ToOffline()
bounded, err = s.tryBoundForSource(sourceID2)
c.Assert(err, IsNil)
c.Assert(bounded, IsTrue)
c.Assert(s.bounds[sourceID2], DeepEquals, worker2)
}
45 changes: 38 additions & 7 deletions pkg/ha/bound.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,11 @@ func sourceBoundFromJSON(s string) (b SourceBound, err error) {
func PutSourceBound(cli *clientv3.Client, bounds ...SourceBound) (int64, error) {
ops := make([]clientv3.Op, 0, len(bounds))
for _, bound := range bounds {
op, err := putSourceBoundOp(bound)
boundOps, err := putSourceBoundOp(bound)
if err != nil {
return 0, err
}
ops = append(ops, op)
ops = append(ops, boundOps...)
}
_, rev, err := etcdutil.DoOpsInOneTxnWithRetry(cli, ops...)
return rev, err
Expand Down Expand Up @@ -137,6 +137,29 @@ func GetSourceBound(cli *clientv3.Client, worker string) (map[string]SourceBound
return sbm, resp.Header.Revision, nil
}

// GetLastSourceBounds gets all last source bound relationship. Different with GetSourceBound, "last source bound" will
// not be deleted when worker offline
func GetLastSourceBounds(cli *clientv3.Client) (map[string]SourceBound, int64, error) {
ctx, cancel := context.WithTimeout(cli.Ctx(), etcdutil.DefaultRequestTimeout)
defer cancel()

var (
sbm = make(map[string]SourceBound)
)
resp, err := cli.Get(ctx, common.UpstreamLastBoundWorkerKeyAdapter.Path(), clientv3.WithPrefix())

if err != nil {
return sbm, 0, err
}

sbm, err = sourceBoundFromResp("", resp)
if err != nil {
return sbm, 0, err
}

return sbm, resp.Header.Revision, nil
}

// GetSourceBoundConfig gets the source bound relationship and relative source config at the same time
// for the specified DM-worker. The index worker **must not be empty**:
// if source bound is empty, will return an empty sourceBound and an empty source config
Expand Down Expand Up @@ -310,14 +333,22 @@ func deleteSourceBoundOp(worker string) clientv3.Op {
return clientv3.OpDelete(common.UpstreamBoundWorkerKeyAdapter.Encode(worker))
}

// putSourceBoundOp returns a PUT etcd operation for the bound relationship.
// deleteLastSourceBoundOp returns a DELETE ectd operation for the last bound relationship of the specified DM-worker.
func deleteLastSourceBoundOp(worker string) clientv3.Op {
return clientv3.OpDelete(common.UpstreamLastBoundWorkerKeyAdapter.Encode(worker))
}

// putSourceBoundOp returns PUT etcd operations for the bound relationship.
// k/v: worker-name -> bound relationship.
func putSourceBoundOp(bound SourceBound) (clientv3.Op, error) {
func putSourceBoundOp(bound SourceBound) ([]clientv3.Op, error) {
value, err := bound.toJSON()
if err != nil {
return clientv3.Op{}, err
return []clientv3.Op{}, err
}
key := common.UpstreamBoundWorkerKeyAdapter.Encode(bound.Worker)
key1 := common.UpstreamBoundWorkerKeyAdapter.Encode(bound.Worker)
op1 := clientv3.OpPut(key1, value)
key2 := common.UpstreamLastBoundWorkerKeyAdapter.Encode(bound.Worker)
op2 := clientv3.OpPut(key2, value)

return clientv3.OpPut(key, value), nil
return []clientv3.Op{op1, op2}, nil
}
7 changes: 4 additions & 3 deletions pkg/ha/ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ func PutRelayStageSourceBound(cli *clientv3.Client, stage Stage, bound SourceBou
if err != nil {
return 0, err
}
ops := make([]clientv3.Op, 0, 2)
ops := make([]clientv3.Op, 0, 3)
ops = append(ops, ops1...)
ops = append(ops, op2)
ops = append(ops, op2...)
_, rev, err := etcdutil.DoOpsInOneTxnWithRetry(cli, ops...)
return rev, err
}
Expand All @@ -48,7 +48,8 @@ func DeleteSourceCfgRelayStageSourceBound(cli *clientv3.Client, source, worker s
sourceCfgOp := deleteSourceCfgOp(source)
relayStageOp := deleteRelayStageOp(source)
sourceBoundOp := deleteSourceBoundOp(worker)
_, rev, err := etcdutil.DoOpsInOneTxnWithRetry(cli, sourceCfgOp, relayStageOp, sourceBoundOp)
lastBoundOp := deleteLastSourceBoundOp(worker)
_, rev, err := etcdutil.DoOpsInOneTxnWithRetry(cli, sourceCfgOp, relayStageOp, sourceBoundOp, lastBoundOp)
return rev, err
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/ha/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,10 @@ func ClearTestInfoOperation(cli *clientv3.Client) error {
clearWorkerInfo := clientv3.OpDelete(common.WorkerRegisterKeyAdapter.Path(), clientv3.WithPrefix())
clearWorkerKeepAlive := clientv3.OpDelete(common.WorkerKeepAliveKeyAdapter.Path(), clientv3.WithPrefix())
clearBound := clientv3.OpDelete(common.UpstreamBoundWorkerKeyAdapter.Path(), clientv3.WithPrefix())
clearLastBound := clientv3.OpDelete(common.UpstreamLastBoundWorkerKeyAdapter.Path(), clientv3.WithPrefix())
clearRelayStage := clientv3.OpDelete(common.StageRelayKeyAdapter.Path(), clientv3.WithPrefix())
clearSubTaskStage := clientv3.OpDelete(common.StageSubTaskKeyAdapter.Path(), clientv3.WithPrefix())
_, _, err := etcdutil.DoOpsInOneTxnWithRetry(cli, clearSource, clearTask, clearSubTask, clearWorkerInfo, clearBound,
clearWorkerKeepAlive, clearRelayStage, clearSubTaskStage)
clearLastBound, clearWorkerKeepAlive, clearRelayStage, clearSubTaskStage)
return err
}
37 changes: 37 additions & 0 deletions tests/ha_cases/lib.sh
Original file line number Diff line number Diff line change
Expand Up @@ -186,3 +186,40 @@ function isolate_worker() {
run_dm_worker $WORK_DIR/worker$1 $port $cur/conf/dm-worker$1.toml
export GO_FAILPOINTS=''
}

function check_bound() {
bound1=$($PWD/bin/dmctl.test DEVEL --master-addr "127.0.0.1:$MASTER_PORT1" list-member --name worker1 \
| grep 'source' | awk -F: '{print $2}')
bound2=$($PWD/bin/dmctl.test DEVEL --master-addr "127.0.0.1:$MASTER_PORT1" list-member --name worker2 \
| grep 'source' | awk -F: '{print $2}')
if [[ $worker1bound != $bound1 || $worker2bound != $bound2 ]]; then
echo "worker1bound $worker1bound bound1 $bound1"
echo "worker2bound $worker2bound bound2 $bound2"
exit 1
fi
}

function start_2_worker_ensure_bound() {
worker_ports_2=(0 $WORKER1_PORT $WORKER2_PORT $WORKER3_PORT $WORKER4_PORT $WORKER5_PORT)

echo "start worker$1"
run_dm_worker $WORK_DIR/worker$1 ${worker_ports_2[$1]} $cur/conf/dm-worker$1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:${worker_ports_2[$1]}
echo "start worker$2"
run_dm_worker $WORK_DIR/worker$2 ${worker_ports_2[$2]} $cur/conf/dm-worker$2.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:${worker_ports_2[$2]}
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT1" \
"list-member --name worker$1 --name worker$2" \
"\"source\": \"mysql-replica-01\"" 1 \
"\"source\": \"mysql-replica-02\"" 1
}

function kill_2_worker_ensure_unbound() {
echo "kill dm-worker$1"
ps aux | grep dm-worker$1 |awk '{print $2}'|xargs kill || true
echo "kill dm-worker$2"
ps aux | grep dm-worker$2 |awk '{print $2}'|xargs kill || true
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT1" \
"list-member --name worker$1 --name worker$2" \
"\"source\": \"\"" 2
}
Loading