diff --git a/.github/workflows/chaos-mesh.yml b/.github/workflows/chaos-mesh.yml index f22e7fa630..37a4e18a8a 100644 --- a/.github/workflows/chaos-mesh.yml +++ b/.github/workflows/chaos-mesh.yml @@ -111,9 +111,9 @@ jobs: kubectl describe -f $GITHUB_WORKSPACE/chaos/manifests/mysql.yaml - name: Wait for MySQL ready # kubectl wait --all not working run: | - kubectl wait --for=condition=Ready pod/mysql-0 --timeout=120s || true + kubectl wait --for=condition=Ready pod/mysql-0 --timeout=300s || true sleep 10 - kubectl wait --for=condition=Ready pod/mysql-1 --timeout=120s || true + kubectl wait --for=condition=Ready pod/mysql-1 --timeout=300s || true echo show pvc kubectl get pvc -l app=mysql -o wide echo show pv @@ -139,7 +139,7 @@ jobs: kubectl describe -f $GITHUB_WORKSPACE/chaos/manifests/tidb.yaml - name: Wait for TiDB ready run: | - kubectl wait --for=condition=Ready pod/tidb-0 --timeout=120s || true + kubectl wait --for=condition=Ready pod/tidb-0 --timeout=300s || true echo show pvc kubectl get pvc -l app=tidb -o wide echo show pv @@ -165,7 +165,7 @@ jobs: - name: Wait for DM-master ready run: | sleep 10 - kubectl wait --for=condition=Ready pod -l app=dm-master --all --timeout=120s || true + kubectl wait --for=condition=Ready pod -l app=dm-master --all --timeout=300s || true echo "<<<<< show pvc >>>>>" kubectl get pvc -l app=dm-master -o wide echo "<<<<< show pv >>>>>" @@ -202,7 +202,7 @@ jobs: - name: Wait for DM-worker ready run: | sleep 10 - kubectl wait --for=condition=Ready pod -l app=dm-worker --all --timeout=120s || true + kubectl wait --for=condition=Ready pod -l app=dm-worker --all --timeout=300s || true echo "<<<<< show pvc >>>>>" kubectl get pvc -l app=dm-worker -o wide echo "<<<<< show pv >>>>>" diff --git a/_utils/terror_gen/errors_release.txt b/_utils/terror_gen/errors_release.txt index bc4d0b51d3..cd004450f1 100644 --- a/_utils/terror_gen/errors_release.txt +++ b/_utils/terror_gen/errors_release.txt @@ -410,9 +410,7 @@ ErrWorkerGetVersionFromKV,[code=40045:class=dm-worker:scope=internal:level=high] ErrWorkerSaveVersionToKV,[code=40046:class=dm-worker:scope=internal:level=high], "Message: save version %v into levelDB with key %v" ErrWorkerVerAutoDowngrade,[code=40047:class=dm-worker:scope=internal:level=high], "Message: the previous version %s is newer than current %s, automatic downgrade is not supported now, please handle it manually" ErrWorkerStartService,[code=40048:class=dm-worker:scope=internal:level=high], "Message: start server" -ErrWorkerNoStart,[code=40070:class=dm-worker:scope=internal:level=high], "Message: worker has not started" -ErrWorkerAlreadyClosed,[code=40049:class=dm-worker:scope=internal:level=high], "Message: worker already closed" -ErrWorkerAlreadyStart,[code=40071:class=dm-worker:scope=internal:level=high], "Message: worker already started" +ErrWorkerAlreadyClosed,[code=40049:class=dm-worker:scope=internal:level=high], "Message: mysql source handler worker already closed" ErrWorkerNotRunningStage,[code=40050:class=dm-worker:scope=internal:level=high], "Message: current stage is %s but not running, invalid" ErrWorkerNotPausedStage,[code=40051:class=dm-worker:scope=internal:level=high], "Message: current stage is %s but not paused, invalid" ErrWorkerUpdateTaskStage,[code=40052:class=dm-worker:scope=internal:level=high], "Message: can only update task on Paused stage, but current stage is %s, Workaround: Please use `pause-task` command to pause the task." @@ -433,6 +431,8 @@ ErrWorkerExecDDLTimeout,[code=40066:class=dm-worker:scope=internal:level=high], ErrWorkerWaitRelayCatchupTimeout,[code=40067:class=dm-worker:scope=internal:level=high], "Message: waiting for relay binlog pos to catch up with loader end binlog pos is timeout (exceeding %s), loader end binlog pos: %s, relay binlog pos: %s" ErrWorkerRelayIsPurging,[code=40068:class=dm-worker:scope=internal:level=high], "Message: relay log purger is purging, cannot start sub task %s, Workaround: Please try again later." ErrWorkerHostPortNotValid,[code=40069:class=dm-worker:scope=internal:level=high], "Message: host:port '%s' not valid, Workaround: Please check configs in worker configuration file." +ErrWorkerNoStart,[code=40070:class=dm-worker:scope=internal:level=high], "Message: no mysql source is being handled in the worker" +ErrWorkerAlreadyStart,[code=40071:class=dm-worker:scope=internal:level=high], "Message: mysql source handler worker already started" ErrWorkerSourceNotMatch,[code=40072:class=dm-worker:scope=internal:level=high], "Message: source of request does not match with source in worker" ErrWorkerFailToGetSubtaskConfigFromEtcd,[code=40073:class=dm-worker:scope=internal:level=medium], "Message: there is no relative subtask config for task %s in etcd" ErrWorkerFailToGetSourceConfigFromEtcd,[code=40074:class=dm-worker:scope=internal:level=medium], "Message: there is no relative source config for source %s in etcd" diff --git a/chaos/cases/task.go b/chaos/cases/task.go index e3b7de5251..519815b68e 100644 --- a/chaos/cases/task.go +++ b/chaos/cases/task.go @@ -76,13 +76,13 @@ func newTask(ctx context.Context, cli pb.MasterClient, taskFile string, schema s ) for i := range taskCfg.MySQLInstances { // only use necessary part of sources. cfg := sourcesCfg[i] - db, err := conn.DefaultDBProvider.Apply(cfg) - if err != nil { - return nil, err + db, err2 := conn.DefaultDBProvider.Apply(cfg) + if err2 != nil { + return nil, err2 } - conn, err := createDBConn(ctx, db, schema) - if err != nil { - return nil, err + conn, err2 := createDBConn(ctx, db, schema) + if err2 != nil { + return nil, err2 } sourceDBs = append(sourceDBs, db) sourceConns = append(sourceConns, conn) diff --git a/dm/worker/config.go b/dm/worker/config.go index 86dfe22303..8de442423b 100644 --- a/dm/worker/config.go +++ b/dm/worker/config.go @@ -217,17 +217,3 @@ func (c *Config) configFromFile(path string) error { } return nil } - -// Reload reload configure from ConfigFile -func (c *Config) Reload() error { - if c.ConfigFile == "" { - c.ConfigFile = "dm-worker-config.bak" - } - - err := c.configFromFile(c.ConfigFile) - if err != nil { - return err - } - - return nil -} diff --git a/dm/worker/join.go b/dm/worker/join.go index d00645b7e4..1a9d1cc719 100644 --- a/dm/worker/join.go +++ b/dm/worker/join.go @@ -61,7 +61,7 @@ func (s *Server) JoinMaster(endpoints []string) error { if conn != nil { conn.Close() } - log.L().Error("fail to dial dm-master", zap.Error(err)) + log.L().Error("fail to dial dm-master", zap.String("endpoint", endpoint), zap.Error(err)) continue } client := pb.NewMasterClient(conn) @@ -70,11 +70,11 @@ func (s *Server) JoinMaster(endpoints []string) error { cancel1() conn.Close() if err != nil { - log.L().Error("fail to register worker", zap.Error(err)) + log.L().Error("fail to register worker", zap.String("endpoint", endpoint), zap.Error(err)) continue } if !resp.GetResult() { - log.L().Error("fail to register worker", zap.String("error", resp.Msg)) + log.L().Error("fail to register worker", zap.String("endpoint", endpoint), zap.String("error", resp.Msg)) continue } return nil diff --git a/dm/worker/metrics.go b/dm/worker/metrics.go index 63789ab5b1..7c44569555 100644 --- a/dm/worker/metrics.go +++ b/dm/worker/metrics.go @@ -135,7 +135,7 @@ func InitStatus(lis net.Listener) { } err := httpS.Serve(lis) if err != nil && !common.IsErrNetClosing(err) && err != http.ErrServerClosed { - log.L().Error("fail to start status server return", log.ShortError(err)) + log.L().Error("status server returned", log.ShortError(err)) } } diff --git a/dm/worker/server.go b/dm/worker/server.go index 18c2ec9874..0565cd81a3 100644 --- a/dm/worker/server.go +++ b/dm/worker/server.go @@ -84,6 +84,9 @@ func NewServer(cfg *Config) *Server { // Start starts to serving func (s *Server) Start() error { + log.L().Info("starting dm-worker server") + RegistryMetrics() + s.ctx, s.cancel = context.WithCancel(context.Background()) tls, err := toolutils.NewTLS(s.cfg.SSLCA, s.cfg.SSLCert, s.cfg.SSLKey, s.cfg.AdvertiseAddr, s.cfg.CertAllowedCN) if err != nil { return terror.ErrWorkerTLSConfigNotValid.Delegate(err) @@ -95,9 +98,6 @@ func (s *Server) Start() error { } s.rootLis = tls.WrapListener(rootLis) - log.L().Info("Start Server") - s.setWorker(nil, true) - s.ctx, s.cancel = context.WithCancel(context.Background()) s.etcdClient, err = clientv3.New(clientv3.Config{ Endpoints: GetJoinURLs(s.cfg.Join), DialTimeout: dialTimeout, @@ -109,24 +109,19 @@ func (s *Server) Start() error { return err } - s.wg.Add(1) - go func() { - s.syncMasterEndpoints(s.ctx) - s.wg.Done() - }() - + s.setWorker(nil, true) bound, sourceCfg, revBound, err := ha.GetSourceBoundConfig(s.etcdClient, s.cfg.Name) if err != nil { - // TODO: need retry return err } if !bound.IsEmpty() { - log.L().Warn("worker has been assigned source before keepalive") + log.L().Warn("worker has been assigned source before keepalive", zap.Stringer("bound", bound), zap.Bool("is deleted", bound.IsDeleted)) err = s.startWorker(&sourceCfg) s.setSourceStatus(bound.Source, err, true) if err != nil { - log.L().Error("fail to operate sourceBound on worker", zap.String("worker", s.cfg.Name), - zap.Stringer("bound", bound), zap.Error(err)) + // if DM-worker can't handle pre-assigned source before keepalive, it simply exits with the error, + // because no re-assigned mechanism exists for keepalived DM-worker yet. + return err } } @@ -136,12 +131,18 @@ func (s *Server) Start() error { s.wg.Done() }() + s.wg.Add(1) + go func() { + s.syncMasterEndpoints(s.ctx) + s.wg.Done() + }() + s.wg.Add(1) go func() { defer s.wg.Done() // TODO: handle fatal error from observeSourceBound //nolint:errcheck - s.observeSourceBound(s.ctx, s.etcdClient, revBound) + s.observeSourceBound(s.ctx, revBound) }() s.wg.Add(1) @@ -165,18 +166,23 @@ func (s *Server) Start() error { // NOTE: don't need to set tls config, because rootLis already use tls s.svr = grpc.NewServer() pb.RegisterWorkerServer(s.svr, s) + s.wg.Add(1) go func() { + defer s.wg.Done() err2 := s.svr.Serve(grpcL) if err2 != nil && !common.IsErrNetClosing(err2) && err2 != cmux.ErrListenerClosed { - log.L().Error("fail to start gRPC server", log.ShortError(err2)) + log.L().Error("gRPC server returned", log.ShortError(err2)) } }() - RegistryMetrics() - go InitStatus(httpL) // serve status + s.wg.Add(1) + go func() { + defer s.wg.Done() + InitStatus(httpL) // serve status + }() s.closed.Set(false) - log.L().Info("start gRPC API", zap.String("listened address", s.cfg.WorkerAddr)) + log.L().Info("listening gRPC API and status request", zap.String("address", s.cfg.WorkerAddr)) err = m.Serve() if err != nil && common.IsErrNetClosing(err) { err = nil @@ -220,7 +226,7 @@ func (s *Server) syncMasterEndpoints(ctx context.Context) { } } -func (s *Server) observeSourceBound(ctx context.Context, etcdCli *clientv3.Client, rev int64) error { +func (s *Server) observeSourceBound(ctx context.Context, rev int64) error { var wg sync.WaitGroup for { sourceBoundCh := make(chan ha.SourceBound, 10) @@ -234,7 +240,7 @@ func (s *Server) observeSourceBound(ctx context.Context, etcdCli *clientv3.Clien close(sourceBoundErrCh) wg.Done() }() - ha.WatchSourceBound(ctx1, etcdCli, s.cfg.Name, rev+1, sourceBoundCh, sourceBoundErrCh) + ha.WatchSourceBound(ctx1, s.etcdClient, s.cfg.Name, rev+1, sourceBoundCh, sourceBoundErrCh) }() err := s.handleSourceBound(ctx1, sourceBoundCh, sourceBoundErrCh) cancel1() @@ -272,8 +278,7 @@ func (s *Server) observeSourceBound(ctx context.Context, etcdCli *clientv3.Clien err1 = s.startWorker(&cfg) s.setSourceStatus(bound.Source, err1, true) if err1 != nil { - log.L().Error("fail to operate sourceBound on worker", zap.String("worker", s.cfg.Name), - zap.Stringer("bound", bound), zap.Error(err1)) + log.L().Error("fail to operate sourceBound on worker", zap.Stringer("bound", bound), zap.Bool("is deleted", bound.IsDeleted), zap.Error(err1)) } } } @@ -396,13 +401,12 @@ OUTER: if !ok { break OUTER } + log.L().Info("receive source bound", zap.Stringer("bound", bound), zap.Bool("is deleted", bound.IsDeleted)) err := s.operateSourceBound(bound) s.setSourceStatus(bound.Source, err, true) if err != nil { - // record the reason for operating source bound opErrCounter.WithLabelValues(s.cfg.Name, opErrTypeSourceBound).Inc() - log.L().Error("fail to operate sourceBound on worker", zap.String("worker", s.cfg.Name), - zap.Stringer("bound", bound), zap.Error(err)) + log.L().Error("fail to operate sourceBound on worker", zap.Stringer("bound", bound), zap.Bool("is deleted", bound.IsDeleted), zap.Error(err)) if etcdutil.IsRetryableError(err) { return err } @@ -451,7 +455,7 @@ func (s *Server) QueryStatus(ctx context.Context, req *pb.QueryStatusRequest) (* w := s.getWorker(true) if w == nil { - log.L().Error("fail to call QueryStatus, because mysql worker has not been started") + log.L().Warn("fail to call QueryStatus, because no mysql source is being handled in the worker") resp.Result = false resp.Msg = terror.ErrWorkerNoStart.Error() return resp, nil @@ -475,7 +479,7 @@ func (s *Server) PurgeRelay(ctx context.Context, req *pb.PurgeRelayRequest) (*pb log.L().Info("", zap.String("request", "PurgeRelay"), zap.Stringer("payload", req)) w := s.getWorker(true) if w == nil { - log.L().Error("fail to call StartSubTask, because mysql worker has not been started") + log.L().Warn("fail to call StartSubTask, because no mysql source is being handled in the worker") return makeCommonWorkerResponse(terror.ErrWorkerNoStart.Generate()), nil } @@ -483,7 +487,6 @@ func (s *Server) PurgeRelay(ctx context.Context, req *pb.PurgeRelayRequest) (*pb if err != nil { log.L().Error("fail to purge relay", zap.String("request", "PurgeRelay"), zap.Stringer("payload", req), zap.Error(err)) } - // TODO: check whether this interface need to store message in ETCD return makeCommonWorkerResponse(err), nil } @@ -493,10 +496,10 @@ func (s *Server) OperateSchema(ctx context.Context, req *pb.OperateWorkerSchemaR w := s.getWorker(true) if w == nil { - log.L().Error("fail to call OperateSchema, because mysql worker has not been started") + log.L().Warn("fail to call OperateSchema, because no mysql source is being handled in the worker") return makeCommonWorkerResponse(terror.ErrWorkerNoStart.Generate()), nil } else if req.Source != w.cfg.SourceID { - log.L().Error("fail to call OperateSchema, because source mismatch") + log.L().Error("fail to call OperateSchema, because source mismatch", zap.String("request", req.Source), zap.String("current", w.cfg.SourceID)) return makeCommonWorkerResponse(terror.ErrWorkerSourceNotMatch.Generate()), nil } @@ -517,8 +520,7 @@ func (s *Server) startWorker(cfg *config.SourceConfig) error { defer s.Unlock() if w := s.getWorker(false); w != nil { if w.cfg.SourceID == cfg.SourceID { - // This mysql task has started. It may be a repeated request. Just return true - log.L().Info("This mysql task has started. It may be a repeated request. Just return true", zap.String("sourceID", s.worker.cfg.SourceID)) + log.L().Info("mysql source is being handled", zap.String("sourceID", s.worker.cfg.SourceID)) return nil } return terror.ErrWorkerAlreadyStart.Generate() @@ -528,7 +530,6 @@ func (s *Server) startWorker(cfg *config.SourceConfig) error { // because triggering these events is useless now subTaskStages, subTaskCfgm, revSubTask, err := ha.GetSubTaskStageConfig(s.etcdClient, cfg.SourceID) if err != nil { - // TODO: need retry return err } @@ -558,8 +559,7 @@ func (s *Server) startWorker(cfg *config.SourceConfig) error { } } - log.L().Info("start worker", zap.String("sourceCfg", cfg.String()), zap.Reflect("subTasks", subTaskCfgs)) - + log.L().Info("starting to handle mysql source", zap.String("sourceCfg", cfg.String()), zap.Reflect("subTasks", subTaskCfgs)) w, err := NewWorker(cfg, s.etcdClient, s.cfg.Name) if err != nil { return err @@ -587,17 +587,17 @@ func (s *Server) startWorker(cfg *config.SourceConfig) error { return w.closed.Get() == closedFalse }) if !isStarted { - // TODO: add more mechanism to wait + // TODO: add more mechanism to wait or un-bound the source return terror.ErrWorkerNoStart } for _, subTaskCfg := range subTaskCfgs { expectStage := subTaskStages[subTaskCfg.Name] - if expectStage.IsDeleted || expectStage.Expect != pb.Stage_Running { + if expectStage.IsDeleted { continue } - w.StartSubTask(subTaskCfg) - log.L().Info("load subtask", zap.String("sourceID", subTaskCfg.SourceID), zap.String("task", subTaskCfg.Name)) + log.L().Info("start to create subtask", zap.String("sourceID", subTaskCfg.SourceID), zap.String("task", subTaskCfg.Name)) + w.StartSubTask(subTaskCfg, expectStage.Expect) } w.wg.Add(1) @@ -618,6 +618,7 @@ func (s *Server) startWorker(cfg *config.SourceConfig) error { }() } + log.L().Info("started to handle mysql source", zap.String("sourceCfg", cfg.String())) return nil } @@ -762,7 +763,7 @@ func (s *Server) HandleError(ctx context.Context, req *pb.HandleWorkerErrorReque w := s.getWorker(true) if w == nil { - log.L().Error("fail to call HandleError, because mysql worker has not been started") + log.L().Warn("fail to call HandleError, because no mysql source is being handled in the worker") return makeCommonWorkerResponse(terror.ErrWorkerNoStart.Generate()), nil } diff --git a/dm/worker/server_test.go b/dm/worker/server_test.go index e58066e9b0..9c5f1a255c 100644 --- a/dm/worker/server_test.go +++ b/dm/worker/server_test.go @@ -281,7 +281,7 @@ func (t *testServer) TestWatchSourceBoundEtcdCompact(c *C) { wg.Add(1) go func() { defer wg.Done() - c.Assert(s.observeSourceBound(ctx1, etcdCli, startRev), IsNil) + c.Assert(s.observeSourceBound(ctx1, startRev), IsNil) }() // step 4.1: should stop the running worker, source bound has been deleted, should stop this worker time.Sleep(time.Second) @@ -302,7 +302,7 @@ func (t *testServer) TestWatchSourceBoundEtcdCompact(c *C) { wg.Add(1) go func() { defer wg.Done() - c.Assert(s.observeSourceBound(ctx2, etcdCli, startRev), IsNil) + c.Assert(s.observeSourceBound(ctx2, startRev), IsNil) }() c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool { return s.getWorker(true) != nil diff --git a/dm/worker/subtask.go b/dm/worker/subtask.go index b7ac72fac8..6e146e62da 100644 --- a/dm/worker/subtask.go +++ b/dm/worker/subtask.go @@ -176,9 +176,9 @@ func (st *SubTask) Init() error { } // Run runs the sub task -func (st *SubTask) Run() { +func (st *SubTask) Run(expectStage pb.Stage) { if st.Stage() == pb.Stage_Finished || st.Stage() == pb.Stage_Running { - st.l.Warn("prepare to run", zap.Stringer("stage", st.Stage())) + st.l.Warn("prepare to run a subtask with invalid stage", zap.Stringer("current stage", st.Stage())) return } @@ -189,7 +189,12 @@ func (st *SubTask) Run() { return } - st.run() + if expectStage == pb.Stage_Running { + st.run() + } else { + // if not want to run, still need to set the stage. + st.setStage(expectStage) + } } func (st *SubTask) run() { @@ -455,7 +460,7 @@ func (st *SubTask) Pause() error { // similar to Run func (st *SubTask) Resume() error { if !st.initialized.Get() { - st.Run() + st.Run(pb.Stage_Running) return nil } diff --git a/dm/worker/subtask_test.go b/dm/worker/subtask_test.go index cc013bb3a6..a259dc4da8 100644 --- a/dm/worker/subtask_test.go +++ b/dm/worker/subtask_test.go @@ -176,7 +176,7 @@ func (t *testSubTask) TestSubTaskNormalUsage(c *C) { createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client) []unit.Unit { return nil } - st.Run() + st.Run(pb.Stage_Running) c.Assert(st.Stage(), Equals, pb.Stage_Paused) c.Assert(strings.Contains(st.Result().Errors[0].String(), "has no dm units for mode"), IsTrue) @@ -186,7 +186,7 @@ func (t *testSubTask) TestSubTaskNormalUsage(c *C) { return []unit.Unit{mockDumper, mockLoader} } - st.Run() + st.Run(pb.Stage_Running) c.Assert(st.Stage(), Equals, pb.Stage_Running) c.Assert(st.CurrUnit(), Equals, mockDumper) c.Assert(st.Result(), IsNil) @@ -297,7 +297,7 @@ func (t *testSubTask) TestPauseAndResumeSubtask(c *C) { return []unit.Unit{mockDumper, mockLoader} } - st.Run() + st.Run(pb.Stage_Running) c.Assert(st.Stage(), Equals, pb.Stage_Running) c.Assert(st.CurrUnit(), Equals, mockDumper) c.Assert(st.Result(), IsNil) @@ -411,7 +411,7 @@ func (t *testSubTask) TestPauseAndResumeSubtask(c *C) { c.Assert(st.Stage(), Equals, pb.Stage_Finished) c.Assert(st.Result().Errors, HasLen, 0) - st.Run() + st.Run(pb.Stage_Finished) c.Assert(st.CurrUnit(), Equals, mockLoader) c.Assert(st.Stage(), Equals, pb.Stage_Finished) c.Assert(st.Result().Errors, HasLen, 0) @@ -466,7 +466,7 @@ func (t *testSubTask) TestSubtaskWithStage(c *C) { c.Assert(st.CurrUnit(), Equals, nil) c.Assert(st.Result(), IsNil) - st.Run() + st.Run(pb.Stage_Finished) c.Assert(st.Stage(), Equals, pb.Stage_Finished) c.Assert(st.CurrUnit(), Equals, nil) c.Assert(st.Result(), IsNil) diff --git a/dm/worker/worker.go b/dm/worker/worker.go index a96a472828..46b61e2eff 100644 --- a/dm/worker/worker.go +++ b/dm/worker/worker.go @@ -188,7 +188,7 @@ func (w *Worker) Close() { } // StartSubTask creates a sub task an run it -func (w *Worker) StartSubTask(cfg *config.SubTaskConfig) { +func (w *Worker) StartSubTask(cfg *config.SubTaskConfig, expectStage pb.Stage) { w.Lock() defer w.Unlock() @@ -211,12 +211,13 @@ func (w *Worker) StartSubTask(cfg *config.SubTaskConfig) { st.cfg = cfg2 if w.relayPurger != nil && w.relayPurger.Purging() { + // TODO: retry until purged finished st.fail(terror.ErrWorkerRelayIsPurging.Generate(cfg.Name)) return } - w.l.Info("started sub task", zap.Stringer("config", cfg2)) - st.Run() + w.l.Info("subtask created", zap.Stringer("config", cfg2)) + st.Run(expectStage) } // UpdateSubTask update config for a sub task @@ -378,10 +379,11 @@ func (w *Worker) handleSubTaskStage(ctx context.Context, stageCh chan ha.Stage, closed = true break } + log.L().Info("receive subtask stage change", zap.Stringer("stage", stage), zap.Bool("is deleted", stage.IsDeleted)) opType, err := w.operateSubTaskStageWithoutConfig(stage) if err != nil { opErrCounter.WithLabelValues(w.name, opType).Inc() - log.L().Error("fail to operate subtask stage", zap.Stringer("stage", stage), zap.Error(err)) + log.L().Error("fail to operate subtask stage", zap.Stringer("stage", stage), zap.Bool("is deleted", stage.IsDeleted), zap.Error(err)) if etcdutil.IsRetryableError(err) { return err } @@ -408,16 +410,19 @@ func (w *Worker) handleSubTaskStage(ctx context.Context, stageCh chan ha.Stage, func (w *Worker) operateSubTaskStage(stage ha.Stage, subTaskCfg config.SubTaskConfig) (string, error) { var op pb.TaskOp switch { - case stage.Expect == pb.Stage_Running: + case stage.Expect == pb.Stage_Running, stage.Expect == pb.Stage_Paused: if st := w.subTaskHolder.findSubTask(stage.Task); st == nil { - w.StartSubTask(&subTaskCfg) - log.L().Info("load subtask", zap.String("sourceID", subTaskCfg.SourceID), zap.String("task", subTaskCfg.Name)) + // create the subtask for expected running and paused stage. + log.L().Info("start to create subtask", zap.String("sourceID", subTaskCfg.SourceID), zap.String("task", subTaskCfg.Name)) + w.StartSubTask(&subTaskCfg, stage.Expect) // error is nil, opErrTypeBeforeOp will be ignored return opErrTypeBeforeOp, nil } - op = pb.TaskOp_Resume - case stage.Expect == pb.Stage_Paused: - op = pb.TaskOp_Pause + if stage.Expect == pb.Stage_Running { + op = pb.TaskOp_Resume + } else if stage.Expect == pb.Stage_Paused { + op = pb.TaskOp_Pause + } case stage.IsDeleted: op = pb.TaskOp_Stop } @@ -483,7 +488,7 @@ func (w *Worker) observeRelayStage(ctx context.Context, etcdCli *clientv3.Client opType, err1 := w.operateRelayStage(ctx, stage) if err1 != nil { opErrCounter.WithLabelValues(w.name, opType).Inc() - log.L().Error("fail to operate relay", zap.Stringer("stage", stage), zap.Error(err1)) + log.L().Error("fail to operate relay", zap.Stringer("stage", stage), zap.Bool("is deleted", stage.IsDeleted), zap.Error(err1)) } } retryNum++ @@ -509,10 +514,11 @@ OUTER: if !ok { break OUTER } + log.L().Info("receive relay stage change", zap.Stringer("stage", stage), zap.Bool("is deleted", stage.IsDeleted)) opType, err := w.operateRelayStage(ctx, stage) if err != nil { opErrCounter.WithLabelValues(w.name, opType).Inc() - log.L().Error("fail to operate relay", zap.Stringer("stage", stage), zap.Error(err)) + log.L().Error("fail to operate relay", zap.Stringer("stage", stage), zap.Bool("is deleted", stage.IsDeleted), zap.Error(err)) } case err, ok := <-errCh: if !ok { diff --git a/dm/worker/worker_test.go b/dm/worker/worker_test.go index 186c003a5c..8d6ad889d9 100644 --- a/dm/worker/worker_test.go +++ b/dm/worker/worker_test.go @@ -70,7 +70,7 @@ func (t *testServer) testWorker(c *C) { w.StartSubTask(&config.SubTaskConfig{ Name: "testStartTask", - }) + }, pb.Stage_Running) task := w.subTaskHolder.findSubTask("testStartTask") c.Assert(task, NotNil) c.Assert(task.Result().String(), Matches, ".*worker already closed.*") @@ -160,7 +160,7 @@ func (t *testServer2) TestTaskAutoResume(c *C) { var subtaskCfg config.SubTaskConfig c.Assert(subtaskCfg.DecodeFile("./subtask.toml", true), IsNil) c.Assert(err, IsNil) - s.getWorker(true).StartSubTask(&subtaskCfg) + s.getWorker(true).StartSubTask(&subtaskCfg, pb.Stage_Running) // check task in paused state c.Assert(utils.WaitSomething(100, 100*time.Millisecond, func() bool { @@ -269,7 +269,7 @@ func (t *testWorkerEtcdCompact) TestWatchSubtaskStageEtcdCompact(c *C) { []ha.Stage{ha.NewSubTaskStage(pb.Stage_Stopped, sourceCfg.SourceID, subtaskCfg.Name)}) c.Assert(err, IsNil) // step 2.1: start a subtask manually - w.StartSubTask(&subtaskCfg) + w.StartSubTask(&subtaskCfg, pb.Stage_Running) // step 3: trigger etcd compaction and check whether we can receive it through watcher _, err = etcdCli.Compact(ctx, rev) c.Assert(err, IsNil) diff --git a/errors.toml b/errors.toml index c64a8aa717..cc5762d951 100644 --- a/errors.toml +++ b/errors.toml @@ -2471,7 +2471,7 @@ workaround = "" tags = ["internal", "high"] [error.DM-dm-worker-40049] -message = "worker already closed" +message = "mysql source handler worker already closed" description = "" workaround = "" tags = ["internal", "high"] @@ -2597,13 +2597,13 @@ workaround = "Please check configs in worker configuration file." tags = ["internal", "high"] [error.DM-dm-worker-40070] -message = "worker has not started" +message = "no mysql source is being handled in the worker" description = "" workaround = "" tags = ["internal", "high"] [error.DM-dm-worker-40071] -message = "worker already started" +message = "mysql source handler worker already started" description = "" workaround = "" tags = ["internal", "high"] diff --git a/pkg/terror/error_list.go b/pkg/terror/error_list.go index 4e9a2df1d8..f63f7f2564 100644 --- a/pkg/terror/error_list.go +++ b/pkg/terror/error_list.go @@ -1074,9 +1074,7 @@ var ( ErrWorkerSaveVersionToKV = New(codeWorkerSaveVersionToKV, ClassDMWorker, ScopeInternal, LevelHigh, "save version %v into levelDB with key %v", "") ErrWorkerVerAutoDowngrade = New(codeWorkerVerAutoDowngrade, ClassDMWorker, ScopeInternal, LevelHigh, "the previous version %s is newer than current %s, automatic downgrade is not supported now, please handle it manually", "") ErrWorkerStartService = New(codeWorkerStartService, ClassDMWorker, ScopeInternal, LevelHigh, "start server", "") - ErrWorkerNoStart = New(codeWorkerNoStart, ClassDMWorker, ScopeInternal, LevelHigh, "worker has not started", "") - ErrWorkerAlreadyClosed = New(codeWorkerAlreadyClosed, ClassDMWorker, ScopeInternal, LevelHigh, "worker already closed", "") - ErrWorkerAlreadyStart = New(codeWorkerAlreadyStarted, ClassDMWorker, ScopeInternal, LevelHigh, "worker already started", "") + ErrWorkerAlreadyClosed = New(codeWorkerAlreadyClosed, ClassDMWorker, ScopeInternal, LevelHigh, "mysql source handler worker already closed", "") ErrWorkerNotRunningStage = New(codeWorkerNotRunningStage, ClassDMWorker, ScopeInternal, LevelHigh, "current stage is %s but not running, invalid", "") ErrWorkerNotPausedStage = New(codeWorkerNotPausedStage, ClassDMWorker, ScopeInternal, LevelHigh, "current stage is %s but not paused, invalid", "") ErrWorkerUpdateTaskStage = New(codeWorkerUpdateTaskStage, ClassDMWorker, ScopeInternal, LevelHigh, "can only update task on Paused stage, but current stage is %s", "Please use `pause-task` command to pause the task.") @@ -1097,6 +1095,8 @@ var ( ErrWorkerWaitRelayCatchupTimeout = New(codeWorkerWaitRelayCatchupTimeout, ClassDMWorker, ScopeInternal, LevelHigh, "waiting for relay binlog pos to catch up with loader end binlog pos is timeout (exceeding %s), loader end binlog pos: %s, relay binlog pos: %s", "") ErrWorkerRelayIsPurging = New(codeWorkerRelayIsPurging, ClassDMWorker, ScopeInternal, LevelHigh, "relay log purger is purging, cannot start sub task %s", "Please try again later.") ErrWorkerHostPortNotValid = New(codeWorkerHostPortNotValid, ClassDMWorker, ScopeInternal, LevelHigh, "host:port '%s' not valid", "Please check configs in worker configuration file.") + ErrWorkerNoStart = New(codeWorkerNoStart, ClassDMWorker, ScopeInternal, LevelHigh, "no mysql source is being handled in the worker", "") + ErrWorkerAlreadyStart = New(codeWorkerAlreadyStarted, ClassDMWorker, ScopeInternal, LevelHigh, "mysql source handler worker already started", "") ErrWorkerSourceNotMatch = New(codeWorkerSourceNotMatch, ClassDMWorker, ScopeInternal, LevelHigh, "source of request does not match with source in worker", "") ErrWorkerFailToGetSubtaskConfigFromEtcd = New(codeWorkerFailToGetSubtaskConfigFromEtcd, ClassDMWorker, ScopeInternal, LevelMedium, "there is no relative subtask config for task %s in etcd", "") diff --git a/tests/ha/run.sh b/tests/ha/run.sh index 86b03313d0..8e195ae70b 100755 --- a/tests/ha/run.sh +++ b/tests/ha/run.sh @@ -54,6 +54,11 @@ function run() { check_sync_diff $WORK_DIR $cur/conf/diff_config.toml sleep 2 + echo "pause task before kill and restart dm-worker" + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "pause-task test" \ + "\"result\": true" 3 + echo "start dm-worker3 and kill dm-worker2" ps aux | grep dm-worker2 |awk '{print $2}'|xargs kill || true check_port_offline $WORKER2_PORT 20 @@ -62,6 +67,23 @@ function run() { run_dm_worker $WORK_DIR/worker3 $WORKER3_PORT $cur/conf/dm-worker3.toml check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER3_PORT + sleep 8 + echo "wait for the task to be scheduled and keep paused" + check_http_alive 127.0.0.1:$MASTER_PORT/apis/${API_VERSION}/status/test '"stage": "Paused"' 10 + + echo "resume task before kill and restart dm-worker" + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "resume-task test" \ + "\"result\": true" 3 + + echo "start dm-worker2 and kill dm-worker3" + ps aux | grep dm-worker3 |awk '{print $2}'|xargs kill || true + check_port_offline $WORKER3_PORT 20 + rm -rf $WORK_DIR/worker3/relay_log + + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + sleep 8 echo "wait and check task running" check_http_alive 127.0.0.1:$MASTER_PORT/apis/${API_VERSION}/status/test '"stage": "Running"' 10