diff --git a/_utils/terror_gen/errors_release.txt b/_utils/terror_gen/errors_release.txt index 0924415254..1c8f20f246 100644 --- a/_utils/terror_gen/errors_release.txt +++ b/_utils/terror_gen/errors_release.txt @@ -442,7 +442,7 @@ ErrWorkerWaitRelayCatchupTimeout,[code=40067:class=dm-worker:scope=internal:leve ErrWorkerRelayIsPurging,[code=40068:class=dm-worker:scope=internal:level=high], "Message: relay log purger is purging, cannot start sub task %s, Workaround: Please try again later." ErrWorkerHostPortNotValid,[code=40069:class=dm-worker:scope=internal:level=high], "Message: host:port '%s' not valid, Workaround: Please check configs in worker configuration file." ErrWorkerNoStart,[code=40070:class=dm-worker:scope=internal:level=high], "Message: no mysql source is being handled in the worker" -ErrWorkerAlreadyStart,[code=40071:class=dm-worker:scope=internal:level=high], "Message: mysql source handler worker already started" +ErrWorkerAlreadyStart,[code=40071:class=dm-worker:scope=internal:level=high], "Message: mysql source worker %s has already started with source %s, but get a request with source %s, Workaround: Please try restart this DM-worker" ErrWorkerSourceNotMatch,[code=40072:class=dm-worker:scope=internal:level=high], "Message: source of request does not match with source in worker" ErrWorkerWaitRelayCatchupGTID,[code=40078:class=dm-worker:scope=internal:level=high], "Message: cannot compare gtid between loader and relay, loader gtid: %s, relay gtid: %s" ErrWorkerFailToGetSubtaskConfigFromEtcd,[code=40073:class=dm-worker:scope=internal:level=medium], "Message: there is no relative subtask config for task %s in etcd" diff --git a/dm/config/subtask.go b/dm/config/subtask.go index 79ba918c8c..b3c5a70f69 100644 --- a/dm/config/subtask.go +++ b/dm/config/subtask.go @@ -158,7 +158,7 @@ type SubTaskConfig struct { // RelayDir get value from dm-worker config RelayDir string `toml:"relay-dir" json:"relay-dir"` - // UseRelay get value from dm-worker config + // UseRelay get value from dm-worker's relayEnabled UseRelay bool `toml:"use-relay" json:"use-relay"` From DBConfig `toml:"from" json:"from"` To DBConfig `toml:"to" json:"to"` diff --git a/dm/master/bootstrap.go b/dm/master/bootstrap.go index 5b5d32186a..2cc2db7801 100644 --- a/dm/master/bootstrap.go +++ b/dm/master/bootstrap.go @@ -56,17 +56,17 @@ func (s *Server) bootstrap(ctx context.Context) error { if err != nil { return terror.ErrMasterFailToImportFromV10x.Delegate(err) } + } else { + uctx := upgrade.Context{ + Context: ctx, + SubTaskConfigs: s.scheduler.GetSubTaskCfgs(), + } + err := upgrade.TryUpgrade(s.etcdClient, uctx) + if err != nil { + return err + } } - uctx := upgrade.Context{ - Context: ctx, - SubTaskConfigs: s.scheduler.GetSubTaskCfgs(), - } - err := upgrade.TryUpgrade(s.etcdClient, uctx) - - if err != nil { - return err - } return nil } @@ -104,16 +104,26 @@ func (s *Server) importFromV10x(ctx context.Context) error { return err } - // 5. create sources. - logger.Info("add source config into cluster") - err = s.addSourcesV1Import(tctx, sourceCfgs) + // 5. upgrade v1.0.x downstream metadata table and run v2.0 upgrading routines. + // some v2.0 upgrading routines are also altering schema, if we run them after adding sources, DM worker will + // meet error. + logger.Info("upgrading downstream metadata tables") + err = s.upgradeDBSchemaV1Import(tctx, subtaskCfgs) + if err != nil { + return err + } + uctx := upgrade.Context{ + Context: ctx, + SubTaskConfigs: subtaskCfgs, + } + err = upgrade.UntouchVersionUpgrade(s.etcdClient, uctx) if err != nil { return err } - // 6. upgrade v1.0.x downstream metadata table. - logger.Info("upgrading downstream metadata tables") - err = s.upgradeDBSchemaV1Import(tctx, subtaskCfgs) + // 6. create sources. + logger.Info("add source config into cluster") + err = s.addSourcesV1Import(tctx, sourceCfgs) if err != nil { return err } @@ -127,7 +137,7 @@ func (s *Server) importFromV10x(ctx context.Context) error { // 8. mark the upgrade operation as done. logger.Info("marking upgrade from v1.0.x as done") - _, err = upgrade.PutVersion(s.etcdClient, upgrade.MinVersion) + _, err = upgrade.PutVersion(s.etcdClient, upgrade.CurrentVersion) if err != nil { return err } diff --git a/dm/worker/join.go b/dm/worker/join.go index f1bd067500..b6d366df20 100644 --- a/dm/worker/join.go +++ b/dm/worker/join.go @@ -16,6 +16,7 @@ package worker import ( "context" "strings" + "sync" "time" "github.com/pingcap/failpoint" @@ -119,9 +120,13 @@ func (s *Server) KeepAlive() { } } +// TODO: a channel is enough to avoid data race, check TTL not changed at receiving end of channel +var keepAliveLock sync.Mutex + // UpdateKeepAliveTTL updates keepalive key with new lease TTL in place, to avoid watcher observe a DELETE event -// this function should not be concurrently called func (s *Server) UpdateKeepAliveTTL(newTTL int64) { + keepAliveLock.Lock() + defer keepAliveLock.Unlock() if ha.CurrentKeepAliveTTL == newTTL { log.L().Info("not changing keepalive TTL, skip", zap.Int64("ttl", newTTL)) return diff --git a/dm/worker/relay.go b/dm/worker/relay.go index ebd9ea37ae..44b593909e 100644 --- a/dm/worker/relay.go +++ b/dm/worker/relay.go @@ -23,7 +23,6 @@ import ( "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/dm/pb" - "github.com/pingcap/dm/dm/unit" "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/streamer" "github.com/pingcap/dm/pkg/terror" @@ -34,7 +33,7 @@ import ( // RelayHolder for relay unit type RelayHolder interface { // Init initializes the holder - Init(interceptors []purger.PurgeInterceptor) (purger.Purger, error) + Init(ctx context.Context, interceptors []purger.PurgeInterceptor) (purger.Purger, error) // Start starts run the relay Start() // Close closes the holder @@ -70,7 +69,7 @@ type realRelayHolder struct { l log.Logger - closed sync2.AtomicInt32 + closed sync2.AtomicBool stage pb.Stage result *pb.ProcessResult // the process result, nil when is processing } @@ -85,13 +84,13 @@ func NewRealRelayHolder(sourceCfg *config.SourceConfig) RelayHolder { relay: relay.NewRelay(cfg), l: log.With(zap.String("component", "relay holder")), } - h.closed.Set(closedTrue) + h.closed.Set(true) return h } // Init initializes the holder -func (h *realRelayHolder) Init(interceptors []purger.PurgeInterceptor) (purger.Purger, error) { - h.closed.Set(closedFalse) +func (h *realRelayHolder) Init(ctx context.Context, interceptors []purger.PurgeInterceptor) (purger.Purger, error) { + h.closed.Set(false) // initial relay purger operators := []purger.RelayOperator{ @@ -99,9 +98,6 @@ func (h *realRelayHolder) Init(interceptors []purger.PurgeInterceptor) (purger.P streamer.GetReaderHub(), } - // TODO: refine the context usage of relay, and it may need to be initialized before handle any subtasks. - ctx, cancel := context.WithTimeout(context.Background(), unit.DefaultInitTimeout) - defer cancel() if err := h.relay.Init(ctx); err != nil { return nil, terror.Annotate(err, "initial relay unit") } @@ -120,7 +116,7 @@ func (h *realRelayHolder) Start() { // Close closes the holder func (h *realRelayHolder) Close() { - if !h.closed.CompareAndSwap(closedFalse, closedTrue) { + if !h.closed.CompareAndSwap(false, true) { return } @@ -153,7 +149,7 @@ func (h *realRelayHolder) run() { // Status returns relay unit's status func (h *realRelayHolder) Status(ctx context.Context) *pb.RelayStatus { - if h.closed.Get() == closedTrue || h.relay.IsClosed() { + if h.closed.Get() || h.relay.IsClosed() { return &pb.RelayStatus{ Stage: pb.Stage_Stopped, } @@ -168,7 +164,7 @@ func (h *realRelayHolder) Status(ctx context.Context) *pb.RelayStatus { // Error returns relay unit's status func (h *realRelayHolder) Error() *pb.RelayError { - if h.closed.Get() == closedTrue || h.relay.IsClosed() { + if h.closed.Get() || h.relay.IsClosed() { return &pb.RelayError{ Msg: "relay stopped", } @@ -356,7 +352,7 @@ func NewDummyRelayHolderWithInitError(cfg *config.SourceConfig) RelayHolder { } // Init implements interface of RelayHolder -func (d *dummyRelayHolder) Init(interceptors []purger.PurgeInterceptor) (purger.Purger, error) { +func (d *dummyRelayHolder) Init(ctx context.Context, interceptors []purger.PurgeInterceptor) (purger.Purger, error) { // initial relay purger operators := []purger.RelayOperator{ d, diff --git a/dm/worker/relay_test.go b/dm/worker/relay_test.go index 9fc6480a3c..9535414389 100644 --- a/dm/worker/relay_test.go +++ b/dm/worker/relay_test.go @@ -159,7 +159,8 @@ func (t *testRelay) TestRelay(c *C) { } func (t *testRelay) testInit(c *C, holder *realRelayHolder) { - _, err := holder.Init(nil) + ctx := context.Background() + _, err := holder.Init(ctx, nil) c.Assert(err, IsNil) r, ok := holder.relay.(*DummyRelay) @@ -169,19 +170,19 @@ func (t *testRelay) testInit(c *C, holder *realRelayHolder) { r.InjectInitError(initErr) defer r.InjectInitError(nil) - _, err = holder.Init(nil) + _, err = holder.Init(ctx, nil) c.Assert(err, ErrorMatches, ".*"+initErr.Error()+".*") } func (t *testRelay) testStart(c *C, holder *realRelayHolder) { c.Assert(holder.Stage(), Equals, pb.Stage_New) - c.Assert(holder.closed.Get(), Equals, closedFalse) + c.Assert(holder.closed.Get(), IsFalse) c.Assert(holder.Result(), IsNil) holder.Start() c.Assert(waitRelayStage(holder, pb.Stage_Running, 10), IsTrue) c.Assert(holder.Result(), IsNil) - c.Assert(holder.closed.Get(), Equals, closedFalse) + c.Assert(holder.closed.Get(), IsFalse) // test status status := holder.Status(context.Background()) @@ -193,13 +194,13 @@ func (t *testRelay) testStart(c *C, holder *realRelayHolder) { // test update and pause -> resume t.testUpdate(c, holder) c.Assert(holder.Stage(), Equals, pb.Stage_Paused) - c.Assert(holder.closed.Get(), Equals, closedFalse) + c.Assert(holder.closed.Get(), IsFalse) err := holder.Operate(context.Background(), pb.RelayOp_ResumeRelay) c.Assert(err, IsNil) c.Assert(waitRelayStage(holder, pb.Stage_Running, 10), IsTrue) c.Assert(holder.Result(), IsNil) - c.Assert(holder.closed.Get(), Equals, closedFalse) + c.Assert(holder.closed.Get(), IsFalse) } func (t *testRelay) testClose(c *C, holder *realRelayHolder) { @@ -217,12 +218,12 @@ func (t *testRelay) testClose(c *C, holder *realRelayHolder) { holder.Close() c.Assert(waitRelayStage(holder, pb.Stage_Paused, 10), IsTrue) c.Assert(holder.Result(), DeepEquals, processResult) - c.Assert(holder.closed.Get(), Equals, closedTrue) + c.Assert(holder.closed.Get(), IsTrue) holder.Close() c.Assert(holder.Stage(), Equals, pb.Stage_Paused) c.Assert(holder.Result(), DeepEquals, processResult) - c.Assert(holder.closed.Get(), Equals, closedTrue) + c.Assert(holder.closed.Get(), IsTrue) // todo: very strange, and can't resume status := holder.Status(context.Background()) @@ -237,7 +238,7 @@ func (t *testRelay) testPauseAndResume(c *C, holder *realRelayHolder) { err := holder.Operate(context.Background(), pb.RelayOp_PauseRelay) c.Assert(err, IsNil) c.Assert(holder.Stage(), Equals, pb.Stage_Paused) - c.Assert(holder.closed.Get(), Equals, closedFalse) + c.Assert(holder.closed.Get(), IsFalse) err = holder.pauseRelay(context.Background(), pb.RelayOp_PauseRelay) c.Assert(err, ErrorMatches, ".*current stage is Paused.*") @@ -253,7 +254,7 @@ func (t *testRelay) testPauseAndResume(c *C, holder *realRelayHolder) { c.Assert(err, IsNil) c.Assert(waitRelayStage(holder, pb.Stage_Running, 10), IsTrue) c.Assert(holder.Result(), IsNil) - c.Assert(holder.closed.Get(), Equals, closedFalse) + c.Assert(holder.closed.Get(), IsFalse) err = holder.Operate(context.Background(), pb.RelayOp_ResumeRelay) c.Assert(err, ErrorMatches, ".*current stage is Running.*") @@ -281,7 +282,7 @@ func (t *testRelay) testUpdate(c *C, holder *realRelayHolder) { originStage := holder.Stage() c.Assert(holder.Update(context.Background(), cfg), IsNil) c.Assert(waitRelayStage(holder, originStage, 10), IsTrue) - c.Assert(holder.closed.Get(), Equals, closedFalse) + c.Assert(holder.closed.Get(), IsFalse) r, ok := holder.relay.(*DummyRelay) c.Assert(ok, IsTrue) @@ -296,7 +297,7 @@ func (t *testRelay) testStop(c *C, holder *realRelayHolder) { err := holder.Operate(context.Background(), pb.RelayOp_StopRelay) c.Assert(err, IsNil) c.Assert(holder.Stage(), Equals, pb.Stage_Stopped) - c.Assert(holder.closed.Get(), Equals, closedTrue) + c.Assert(holder.closed.Get(), IsTrue) err = holder.Operate(context.Background(), pb.RelayOp_StopRelay) c.Assert(err, ErrorMatches, ".*current stage is already stopped.*") diff --git a/dm/worker/server.go b/dm/worker/server.go index 1134674c02..b79ec8062a 100644 --- a/dm/worker/server.go +++ b/dm/worker/server.go @@ -115,20 +115,6 @@ func (s *Server) Start() error { } s.setWorker(nil, true) - bound, sourceCfg, revBound, err := ha.GetSourceBoundConfig(s.etcdClient, s.cfg.Name) - if err != nil { - return err - } - if !bound.IsEmpty() { - log.L().Warn("worker has been assigned source before keepalive", zap.Stringer("bound", bound), zap.Bool("is deleted", bound.IsDeleted)) - err = s.startWorker(&sourceCfg) - s.setSourceStatus(bound.Source, err, true) - if err != nil { - // if DM-worker can't handle pre-assigned source before keepalive, it simply exits with the error, - // because no re-assigned mechanism exists for keepalived DM-worker yet. - return err - } - } s.wg.Add(1) go func() { @@ -144,10 +130,36 @@ func (s *Server) Start() error { s.startKeepAlive() + bound, sourceCfg, revBound, err := ha.GetSourceBoundConfig(s.etcdClient, s.cfg.Name) + if err != nil { + return err + } + if !bound.IsEmpty() { + log.L().Warn("worker has been assigned source before keepalive", zap.Stringer("bound", bound), zap.Bool("is deleted", bound.IsDeleted)) + w, err2 := s.getOrStartWorker(&sourceCfg) + s.setSourceStatus(bound.Source, err2, true) + if err2 != nil { + // if DM-worker can't handle pre-assigned source before keepalive, it simply exits with the error, + // because no re-assigned mechanism exists for keepalived DM-worker yet. + return err2 + } + if sourceCfg.EnableRelay { + s.UpdateKeepAliveTTL(s.cfg.RelayKeepAliveTTL) + if err2 = w.EnableRelay(); err2 != nil { + return err2 + } + } + if err2 = w.EnableHandleSubtasks(); err2 != nil { + return err2 + } + w.l.Info("started to handle mysql source", zap.String("sourceCfg", sourceCfg.String())) + } + s.wg.Add(1) go func(ctx context.Context) { defer s.wg.Done() for { + // TODO: ObserveRelayConfig? err1 := s.observeSourceBound(ctx, revBound) if err1 == nil { return @@ -305,10 +317,19 @@ func (s *Server) observeSourceBound(ctx context.Context, rev int64) error { log.L().Error("fail to stop worker", zap.Error(err)) return err // return if failed to stop the worker. } - err1 = s.startWorker(&cfg) - s.setSourceStatus(bound.Source, err1, true) - if err1 != nil { - log.L().Error("fail to operate sourceBound on worker", zap.Stringer("bound", bound), zap.Bool("is deleted", bound.IsDeleted), zap.Error(err1)) + w, err2 := s.getOrStartWorker(&cfg) + if err2 == nil { + if cfg.EnableRelay { + s.UpdateKeepAliveTTL(s.cfg.RelayKeepAliveTTL) + if err2 = w.EnableRelay(); err2 != nil { + return err2 + } + } + err2 = w.EnableHandleSubtasks() + } + s.setSourceStatus(bound.Source, err2, true) + if err2 != nil { + w.l.Error("fail to operate sourceBound on worker", zap.Stringer("bound", bound), zap.Bool("is deleted", bound.IsDeleted), zap.Error(err2)) } } } @@ -415,8 +436,10 @@ func (s *Server) stopWorker(sourceID string) error { s.Unlock() return terror.ErrWorkerSourceNotMatch } + // TODO: and when disable relay s.UpdateKeepAliveTTL(s.cfg.KeepAliveTTL) s.setWorker(nil, false) + s.setSourceStatus("", nil, false) s.Unlock() w.Close() return nil @@ -470,7 +493,17 @@ func (s *Server) operateSourceBound(bound ha.SourceBound) error { if !ok { return terror.ErrWorkerFailToGetSourceConfigFromEtcd.Generate(bound.Source) } - return s.startWorker(&sourceCfg) + w, err := s.getOrStartWorker(&sourceCfg) + if err != nil { + return err + } + if sourceCfg.EnableRelay { + s.UpdateKeepAliveTTL(s.cfg.RelayKeepAliveTTL) + if err = w.EnableRelay(); err != nil { + return err + } + } + return w.EnableHandleSubtasks() } // QueryStatus implements WorkerServer.QueryStatus @@ -492,11 +525,7 @@ func (s *Server) QueryStatus(ctx context.Context, req *pb.QueryStatusRequest) (* return resp, nil } - resp.SubTaskStatus = w.QueryStatus(ctx, req.Name) - if w.relayHolder != nil { - sourceStatus.RelayStatus = w.relayHolder.Status(ctx) - } - + resp.SubTaskStatus, sourceStatus.RelayStatus = w.QueryStatus(ctx, req.Name) unifyMasterBinlogPos(resp, w.cfg.EnableGTID) if len(resp.SubTaskStatus) == 0 { @@ -546,42 +575,33 @@ func (s *Server) OperateSchema(ctx context.Context, req *pb.OperateWorkerSchemaR }, nil } -func (s *Server) startWorker(cfg *config.SourceConfig) error { +func (s *Server) getOrStartWorker(cfg *config.SourceConfig) (*Worker, error) { s.Lock() defer s.Unlock() if w := s.getWorker(false); w != nil { if w.cfg.SourceID == cfg.SourceID { log.L().Info("mysql source is being handled", zap.String("sourceID", s.worker.cfg.SourceID)) - return nil + return w, nil } - return terror.ErrWorkerAlreadyStart.Generate() + return nil, terror.ErrWorkerAlreadyStart.Generate(w.name, w.cfg.SourceID, cfg.SourceID) } w, err := NewWorker(cfg, s.etcdClient, s.cfg.Name) if err != nil { - return err + return nil, err } s.setWorker(w, false) - if cfg.EnableRelay { - s.UpdateKeepAliveTTL(s.cfg.RelayKeepAliveTTL) - if err2 := w.EnableRelay(); err2 != nil { - return err2 - } - } go w.Start() isStarted := utils.WaitSomething(50, 100*time.Millisecond, func() bool { - return w.closed.Get() == closedFalse + return !w.closed.Get() }) if !isStarted { // TODO: add more mechanism to wait or un-bound the source - return terror.ErrWorkerNoStart + return nil, terror.ErrWorkerNoStart } - - err = w.EnableHandleSubtasks() - log.L().Info("started to handle mysql source", zap.String("sourceCfg", cfg.String())) - return err + return w, nil } func makeCommonWorkerResponse(reqErr error) *pb.CommonWorkerResponse { @@ -650,10 +670,10 @@ func getMinLocForSubTask(ctx context.Context, subTaskCfg config.SubTaskConfig) ( // see https://github.com/pingcap/dm/issues/727 func unifyMasterBinlogPos(resp *pb.QueryStatusResponse, enableGTID bool) { var ( - syncStatus []*pb.SubTaskStatus_Sync - syncMasterBinlog []*mysql.Position - lastestMasterBinlog mysql.Position // not pointer, to make use of zero value and avoid nil check - relayMasterBinlog *mysql.Position + syncStatus []*pb.SubTaskStatus_Sync + syncMasterBinlog []*mysql.Position + latestMasterBinlog mysql.Position // not pointer, to make use of zero value and avoid nil check + relayMasterBinlog *mysql.Position ) // uninitialized mysql.Position is less than any initialized mysql.Position @@ -661,10 +681,10 @@ func unifyMasterBinlogPos(resp *pb.QueryStatusResponse, enableGTID bool) { var err error relayMasterBinlog, err = utils.DecodeBinlogPosition(resp.SourceStatus.RelayStatus.MasterBinlog) if err != nil { - log.L().Error("failed to decode relay's master binlog position", zap.Stringer("response", resp), zap.Error(err)) + log.L().Warn("failed to decode relay's master binlog position", zap.Stringer("response", resp), zap.Error(err)) return } - lastestMasterBinlog = *relayMasterBinlog + latestMasterBinlog = *relayMasterBinlog } for _, stStatus := range resp.SubTaskStatus { @@ -674,11 +694,11 @@ func unifyMasterBinlogPos(resp *pb.QueryStatusResponse, enableGTID bool) { position, err := utils.DecodeBinlogPosition(s.Sync.MasterBinlog) if err != nil { - log.L().Error("failed to decode sync's master binlog position", zap.Stringer("response", resp), zap.Error(err)) + log.L().Warn("failed to decode sync's master binlog position", zap.Stringer("response", resp), zap.Error(err)) return } - if lastestMasterBinlog.Compare(*position) < 0 { - lastestMasterBinlog = *position + if latestMasterBinlog.Compare(*position) < 0 { + latestMasterBinlog = *position } syncMasterBinlog = append(syncMasterBinlog, position) } @@ -686,33 +706,33 @@ func unifyMasterBinlogPos(resp *pb.QueryStatusResponse, enableGTID bool) { // re-check relay if resp.SourceStatus.RelayStatus != nil && resp.SourceStatus.RelayStatus.Stage != pb.Stage_Stopped && - lastestMasterBinlog.Compare(*relayMasterBinlog) != 0 { + latestMasterBinlog.Compare(*relayMasterBinlog) != 0 { - resp.SourceStatus.RelayStatus.MasterBinlog = lastestMasterBinlog.String() + resp.SourceStatus.RelayStatus.MasterBinlog = latestMasterBinlog.String() // if enableGTID, modify output binlog position doesn't affect RelayCatchUpMaster, skip check if !enableGTID { relayPos, err := utils.DecodeBinlogPosition(resp.SourceStatus.RelayStatus.RelayBinlog) if err != nil { - log.L().Error("failed to decode relay binlog position", zap.Stringer("response", resp), zap.Error(err)) + log.L().Warn("failed to decode relay binlog position", zap.Stringer("response", resp), zap.Error(err)) return } - catchUp := lastestMasterBinlog.Compare(*relayPos) == 0 + catchUp := latestMasterBinlog.Compare(*relayPos) == 0 resp.SourceStatus.RelayStatus.RelayCatchUpMaster = catchUp } } // re-check syncer for i, sStatus := range syncStatus { - if lastestMasterBinlog.Compare(*syncMasterBinlog[i]) != 0 { + if latestMasterBinlog.Compare(*syncMasterBinlog[i]) != 0 { syncerPos, err := utils.DecodeBinlogPosition(sStatus.Sync.SyncerBinlog) if err != nil { - log.L().Error("failed to decode syncer binlog position", zap.Stringer("response", resp), zap.Error(err)) + log.L().Warn("failed to decode syncer binlog position", zap.Stringer("response", resp), zap.Error(err)) return } - synced := lastestMasterBinlog.Compare(*syncerPos) == 0 + synced := latestMasterBinlog.Compare(*syncerPos) == 0 - sStatus.Sync.MasterBinlog = lastestMasterBinlog.String() + sStatus.Sync.MasterBinlog = latestMasterBinlog.String() sStatus.Sync.Synced = synced } } diff --git a/dm/worker/server_test.go b/dm/worker/server_test.go index a040770a9f..787da5782b 100644 --- a/dm/worker/server_test.go +++ b/dm/worker/server_test.go @@ -167,8 +167,6 @@ func (t *testServer) TestServer(c *C) { subtaskCfg.MydumperPath = mydumperPath sourceCfg := loadSourceConfigWithoutPassword(c) - _, err = ha.PutSubTaskCfg(s.etcdClient, subtaskCfg) - c.Assert(err, IsNil) _, err = ha.PutSubTaskCfgStage(s.etcdClient, []config.SubTaskConfig{subtaskCfg}, []ha.Stage{ha.NewSubTaskStage(pb.Stage_Running, sourceCfg.SourceID, subtaskCfg.Name)}) c.Assert(err, IsNil) @@ -389,7 +387,9 @@ func (t *testServer) TestWatchSourceBoundEtcdCompact(c *C) { rev, err := ha.DeleteSourceBound(etcdCli, cfg.Name) c.Assert(err, IsNil) // step 2: start source at this worker - c.Assert(s.startWorker(&sourceCfg), IsNil) + w, err := s.getOrStartWorker(&sourceCfg) + c.Assert(err, IsNil) + c.Assert(w.EnableHandleSubtasks(), IsNil) // step 3: trigger etcd compaction and check whether we can receive it through watcher _, err = etcdCli.Compact(ctx, rev) c.Assert(err, IsNil) @@ -478,20 +478,20 @@ func (t *testServer) testOperateWorker(c *C, s *Server, dir string, start bool) // worker should be started and without error c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool { w := s.getWorker(true) - return w != nil && w.closed.Get() == closedFalse + return w != nil && !w.closed.Get() }), IsTrue) c.Assert(s.getSourceStatus(true).Result, IsNil) } else { // worker should be started before stopped w := s.getWorker(true) c.Assert(w, NotNil) - c.Assert(w.closed.Get() == closedFalse, IsTrue) + c.Assert(w.closed.Get(), IsFalse) _, err := ha.DeleteSourceCfgRelayStageSourceBound(s.etcdClient, sourceCfg.SourceID, s.cfg.Name) c.Assert(err, IsNil) // worker should be started and without error c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool { currentWorker := s.getWorker(true) - return currentWorker == nil && w.closed.Get() == closedTrue + return currentWorker == nil && w.closed.Get() }), IsTrue) c.Assert(s.getSourceStatus(true).Result, IsNil) } @@ -519,9 +519,27 @@ func (t *testServer) testSubTaskRecover(c *C, s *Server, dir string) { c.Assert(status.Msg, Equals, terror.ErrWorkerNoStart.Error()) t.testOperateWorker(c, s, dir, true) + + // because we split starting worker and enabling handling subtasks into two parts, a query-status may occur between + // them, thus get a result of no subtask running + utils.WaitSomething(30, 100*time.Millisecond, func() bool { + status, err = workerCli.QueryStatus(context.Background(), &pb.QueryStatusRequest{Name: "sub-task-name"}) + if err != nil { + return false + } + if status.Result == false { + return false + } + if len(status.SubTaskStatus) == 0 || status.SubTaskStatus[0].Stage != pb.Stage_Running { + return false + } + return true + }) + status, err = workerCli.QueryStatus(context.Background(), &pb.QueryStatusRequest{Name: "sub-task-name"}) c.Assert(err, IsNil) c.Assert(status.Result, IsTrue) + c.Assert(status.SubTaskStatus, HasLen, 1) c.Assert(status.SubTaskStatus[0].Stage, Equals, pb.Stage_Running) } diff --git a/dm/worker/subtask.go b/dm/worker/subtask.go index 14ead538bd..e798591f5f 100644 --- a/dm/worker/subtask.go +++ b/dm/worker/subtask.go @@ -39,8 +39,8 @@ import ( ) const ( - // the timout to wait for relay catchup when switching from load unit to sync unit. - waitRelayCatchupTimeout = 5 * time.Minute + // the timeout to wait for relay catchup when switching from load unit to sync unit. + waitRelayCatchupTimeout = 30 * time.Second ) // createRealUnits is subtask units initializer @@ -598,7 +598,7 @@ func (st *SubTask) unitTransWaitCondition(subTaskCtx context.Context) error { st.l.Info("wait condition between two units", zap.Stringer("previous unit", pu.Type()), zap.Stringer("unit", cu.Type())) hub := GetConditionHub() - if hub.w.relayHolder == nil { + if !hub.w.relayEnabled.Get() { return nil } diff --git a/dm/worker/subtask_holder.go b/dm/worker/subtask_holder.go index 0123bafa90..e8754ca6fa 100644 --- a/dm/worker/subtask_holder.go +++ b/dm/worker/subtask_holder.go @@ -46,6 +46,18 @@ func (h *subTaskHolder) removeSubTask(name string) { delete(h.subTasks, name) } +// resetAllSubTasks does Close, change cfg.UseRelay then Init the subtasks +func (h *subTaskHolder) resetAllSubTasks(useRelay bool) { + h.mu.Lock() + defer h.mu.Unlock() + for _, st := range h.subTasks { + stage := st.Stage() + st.Close() + st.cfg.UseRelay = useRelay + st.Run(stage) + } +} + // closeAllSubTasks closes all subtask instances. func (h *subTaskHolder) closeAllSubTasks() { h.mu.Lock() diff --git a/dm/worker/task_checker.go b/dm/worker/task_checker.go index 69c150ea4d..243b96ce5d 100644 --- a/dm/worker/task_checker.go +++ b/dm/worker/task_checker.go @@ -141,7 +141,7 @@ type realTaskStatusChecker struct { ctx context.Context cancel context.CancelFunc wg sync.WaitGroup - closed sync2.AtomicInt32 + closed sync2.AtomicBool cfg config.CheckerConfig l log.Logger @@ -157,7 +157,7 @@ func NewRealTaskStatusChecker(cfg config.CheckerConfig, w *Worker) TaskStatusChe w: w, bc: newBackoffController(), } - tsc.closed.Set(closedTrue) + tsc.closed.Set(true) return tsc } @@ -180,7 +180,7 @@ func (tsc *realTaskStatusChecker) Start() { // Close implements TaskStatusChecker.Close func (tsc *realTaskStatusChecker) Close() { - if !tsc.closed.CompareAndSwap(closedFalse, closedTrue) { + if !tsc.closed.CompareAndSwap(false, true) { return } @@ -193,7 +193,7 @@ func (tsc *realTaskStatusChecker) Close() { func (tsc *realTaskStatusChecker) run() { // keep running until canceled in `Close`. tsc.ctx, tsc.cancel = context.WithCancel(context.Background()) - tsc.closed.Set(closedFalse) + tsc.closed.Set(false) failpoint.Inject("TaskCheckInterval", func(val failpoint.Value) { interval, err := time.ParseDuration(val.(string)) @@ -406,7 +406,7 @@ func (tsc *realTaskStatusChecker) checkTaskStatus() { } func (tsc *realTaskStatusChecker) check() { - if tsc.w.cfg.EnableRelay { + if tsc.w.relayEnabled.Get() { tsc.checkRelayStatus() } tsc.checkTaskStatus() diff --git a/dm/worker/task_checker_test.go b/dm/worker/task_checker_test.go index fb12fb86ac..bb2d95f5a0 100644 --- a/dm/worker/task_checker_test.go +++ b/dm/worker/task_checker_test.go @@ -91,7 +91,7 @@ func (s *testTaskCheckerSuite) TestCheck(c *check.C) { cfg.MetaDir = dir w, err := NewWorker(&cfg, nil, "") c.Assert(err, check.IsNil) - w.closed.Set(closedFalse) + w.closed.Set(false) tsc := NewRealTaskStatusChecker(config.CheckerConfig{ CheckEnable: true, @@ -208,7 +208,7 @@ func (s *testTaskCheckerSuite) TestCheckTaskIndependent(c *check.C) { cfg.MetaDir = dir w, err := NewWorker(&cfg, nil, "") c.Assert(err, check.IsNil) - w.closed.Set(closedFalse) + w.closed.Set(false) tsc := NewRealTaskStatusChecker(config.CheckerConfig{ CheckEnable: true, diff --git a/dm/worker/worker.go b/dm/worker/worker.go index 4c66dd1404..06175d9cec 100644 --- a/dm/worker/worker.go +++ b/dm/worker/worker.go @@ -37,18 +37,15 @@ import ( "github.com/pingcap/dm/relay/purger" ) -var ( - closedFalse int32 - closedTrue int32 = 1 -) - // Worker manages sub tasks and process units for data migration type Worker struct { // ensure no other operation can be done when closing (we can use `WatGroup`/`Context` to archive this) + // TODO: check what does it guards. Now it's used to guard relayHolder and relayPurger (maybe subTaskHolder?) since + // query-status maybe access them when closing/disable functionalities sync.RWMutex wg sync.WaitGroup - closed sync2.AtomicInt32 + closed sync2.AtomicBool // context created when Worker created, and canceled when closing ctx context.Context @@ -57,10 +54,21 @@ type Worker struct { cfg *config.SourceConfig l log.Logger - subTaskHolder *subTaskHolder - - relayHolder RelayHolder - relayPurger purger.Purger + // subtask functionality + subTaskEnabled sync2.AtomicBool + subTaskCtx context.Context + subTaskCancel context.CancelFunc + subTaskWg sync.WaitGroup + subTaskHolder *subTaskHolder + + // relay functionality + // during relayEnabled == true, relayHolder and relayPurger should not be nil + relayEnabled sync2.AtomicBool + relayCtx context.Context + relayCancel context.CancelFunc + relayWg sync.WaitGroup + relayHolder RelayHolder + relayPurger purger.Purger taskStatusChecker TaskStatusChecker @@ -81,7 +89,9 @@ func NewWorker(cfg *config.SourceConfig, etcdClient *clientv3.Client, name strin } // keep running until canceled in `Close`. w.ctx, w.cancel = context.WithCancel(context.Background()) - w.closed.Set(closedTrue) + w.closed.Set(true) + w.subTaskEnabled.Set(false) + w.relayEnabled.Set(false) defer func(w2 *Worker) { if err != nil { // when err != nil, `w` will become nil in this func, so we pass `w` in defer. @@ -121,7 +131,7 @@ func (w *Worker) Start() { w.l.Info("start running") ticker := time.NewTicker(5 * time.Second) - w.closed.Set(closedFalse) + w.closed.Set(false) defer ticker.Stop() for { select { @@ -136,7 +146,7 @@ func (w *Worker) Start() { // Close stops working and releases resources func (w *Worker) Close() { - if w.closed.Get() == closedTrue { + if w.closed.Get() { w.l.Warn("already closed") return } @@ -152,12 +162,10 @@ func (w *Worker) Close() { w.subTaskHolder.closeAllSubTasks() if w.relayHolder != nil { - // close relay w.relayHolder.Close() } if w.relayPurger != nil { - // close purger w.relayPurger.Close() } @@ -166,14 +174,23 @@ func (w *Worker) Close() { w.taskStatusChecker.Close() } - w.closed.Set(closedTrue) + w.closed.Set(true) w.l.Info("Stop worker") } // EnableRelay enables the functionality of start/watch/handle relay -func (w *Worker) EnableRelay() error { +func (w *Worker) EnableRelay() (err error) { + w.Lock() + defer w.Unlock() + if w.relayEnabled.Get() { + w.l.Warn("already enabled relay") + return nil + } + w.relayCtx, w.relayCancel = context.WithCancel(w.ctx) + // 1. adjust relay starting position, to the earliest of subtasks - _, subTaskCfgs, _, err := w.fetchSubTasksAndAdjust() + var subTaskCfgs map[string]config.SubTaskConfig + _, subTaskCfgs, _, err = w.fetchSubTasksAndAdjust() if err != nil { return err } @@ -202,7 +219,7 @@ func (w *Worker) EnableRelay() error { // 2. initial relay holder, the cfg's password need decrypt w.relayHolder = NewRelayHolder(w.cfg) - relayPurger, err := w.relayHolder.Init([]purger.PurgeInterceptor{ + relayPurger, err := w.relayHolder.Init(w.relayCtx, []purger.PurgeInterceptor{ w, }) if err != nil { @@ -220,55 +237,123 @@ func (w *Worker) EnableRelay() error { } startImmediately := !relayStage.IsDeleted && relayStage.Expect == pb.Stage_Running if startImmediately { - log.L().Info("relay is started") + w.l.Info("relay is started") w.relayHolder.Start() w.relayPurger.Start() } // 4. watch relay stage - w.wg.Add(1) + w.relayWg.Add(1) go func() { - defer w.wg.Done() + defer w.relayWg.Done() // TODO: handle fatal error from observeRelayStage //nolint:errcheck - w.observeRelayStage(w.ctx, w.etcdClient, revRelay) + w.observeRelayStage(w.relayCtx, w.etcdClient, revRelay) }() + + w.relayEnabled.Set(true) + w.l.Info("relay enabled") + w.subTaskHolder.resetAllSubTasks(true) return nil } +// DisableRelay disables the functionality of start/watch/handle relay +func (w *Worker) DisableRelay() { + w.Lock() + defer w.Unlock() + if !w.relayEnabled.CompareAndSwap(true, false) { + w.l.Warn("already disabled relay") + return + } + + w.relayCancel() + w.relayWg.Wait() + + // refresh task checker know latest relayEnabled, to avoid accessing relayHolder + if w.cfg.Checker.CheckEnable { + w.l.Info("refresh task checker") + w.taskStatusChecker.Close() + w.taskStatusChecker.Start() + w.l.Info("finish refreshing task checker") + } + + w.subTaskHolder.resetAllSubTasks(false) + + if w.relayHolder != nil { + r := w.relayHolder + w.relayHolder = nil + r.Close() + } + if w.relayPurger != nil { + r := w.relayPurger + w.relayPurger = nil + r.Close() + } +} + // EnableHandleSubtasks enables the functionality of start/watch/handle subtasks func (w *Worker) EnableHandleSubtasks() error { + w.Lock() + defer w.Unlock() + if w.subTaskEnabled.Get() { + w.l.Warn("already enabled handling subtasks") + return nil + } + w.subTaskCtx, w.subTaskCancel = context.WithCancel(w.ctx) + + // we get the newest subtask stages directly which will omit the subtask stage PUT/DELETE event + // because triggering these events is useless now subTaskStages, subTaskCfgM, revSubTask, err := w.fetchSubTasksAndAdjust() if err != nil { return err } - log.L().Info("starting to handle mysql source", zap.String("sourceCfg", w.cfg.String()), zap.Any("subTasks", subTaskCfgM)) + w.l.Info("starting to handle mysql source", zap.String("sourceCfg", w.cfg.String()), zap.Any("subTasks", subTaskCfgM)) for _, subTaskCfg := range subTaskCfgM { expectStage := subTaskStages[subTaskCfg.Name] if expectStage.IsDeleted { continue } - log.L().Info("start to create subtask", zap.String("sourceID", subTaskCfg.SourceID), zap.String("task", subTaskCfg.Name)) - // for range of a map will use a same value-address, so we'd better not pass value-address to other function + w.l.Info("start to create subtask", zap.String("sourceID", subTaskCfg.SourceID), zap.String("task", subTaskCfg.Name)) + // "for range" of a map will use same value address, so we'd better not pass value address to other function clone := subTaskCfg - if err := w.StartSubTask(&clone, expectStage.Expect); err != nil { - return err + if err2 := w.StartSubTask(&clone, expectStage.Expect, false); err2 != nil { + w.subTaskHolder.closeAllSubTasks() + return err2 } } - w.wg.Add(1) + w.subTaskWg.Add(1) go func() { - defer w.wg.Done() + defer w.subTaskWg.Done() // TODO: handle fatal error from observeSubtaskStage //nolint:errcheck - w.observeSubtaskStage(w.ctx, w.etcdClient, revSubTask) + w.observeSubtaskStage(w.subTaskCtx, w.etcdClient, revSubTask) }() + w.subTaskEnabled.Set(true) + w.l.Info("handling subtask enabled") return nil } +// DisableHandleSubtasks disables the functionality of start/watch/handle subtasks +func (w *Worker) DisableHandleSubtasks() { + if !w.subTaskEnabled.CompareAndSwap(true, false) { + w.l.Warn("already disabled handling subtasks") + return + } + + w.subTaskCancel() + w.subTaskWg.Wait() + + w.Lock() + defer w.Unlock() + + // close all sub tasks + w.subTaskHolder.closeAllSubTasks() +} + // fetchSubTasksAndAdjust gets source's subtask stages and configs, adjust some values by worker's config and status // source **must not be empty** // return map{task name -> subtask stage}, map{task name -> subtask config}, revision, error. @@ -280,19 +365,21 @@ func (w *Worker) fetchSubTasksAndAdjust() (map[string]ha.Stage, map[string]confi return nil, nil, 0, err } - if err = copyConfigFromSourceForEach(subTaskCfgM, w.cfg); err != nil { + if err = copyConfigFromSourceForEach(subTaskCfgM, w.cfg, w.relayEnabled.Get()); err != nil { return nil, nil, 0, err } return subTaskStages, subTaskCfgM, revSubTask, nil } // StartSubTask creates a sub task an run it -func (w *Worker) StartSubTask(cfg *config.SubTaskConfig, expectStage pb.Stage) error { - w.Lock() - defer w.Unlock() +func (w *Worker) StartSubTask(cfg *config.SubTaskConfig, expectStage pb.Stage, needLock bool) error { + if needLock { + w.Lock() + defer w.Unlock() + } // copy some config item from dm-worker's source config - err := copyConfigFromSource(cfg, w.cfg) + err := copyConfigFromSource(cfg, w.cfg, w.relayEnabled.Get()) if err != nil { return err } @@ -301,7 +388,7 @@ func (w *Worker) StartSubTask(cfg *config.SubTaskConfig, expectStage pb.Stage) e // the unique of subtask should be assured by etcd st := NewSubTask(cfg, w.etcdClient) w.subTaskHolder.recordSubTask(st) - if w.closed.Get() == closedTrue { + if w.closed.Get() { st.fail(terror.ErrWorkerAlreadyClosed.Generate()) return nil } @@ -313,7 +400,7 @@ func (w *Worker) StartSubTask(cfg *config.SubTaskConfig, expectStage pb.Stage) e } st.cfg = cfg2 - if w.relayPurger != nil && w.relayPurger.Purging() { + if w.relayEnabled.Get() && w.relayPurger.Purging() { // TODO: retry until purged finished st.fail(terror.ErrWorkerRelayIsPurging.Generate(cfg.Name)) return nil @@ -329,7 +416,7 @@ func (w *Worker) UpdateSubTask(cfg *config.SubTaskConfig) error { w.Lock() defer w.Unlock() - if w.closed.Get() == closedTrue { + if w.closed.Get() { return terror.ErrWorkerAlreadyClosed.Generate() } @@ -347,7 +434,7 @@ func (w *Worker) OperateSubTask(name string, op pb.TaskOp) error { w.Lock() defer w.Unlock() - if w.closed.Get() == closedTrue { + if w.closed.Get() { return terror.ErrWorkerAlreadyClosed.Generate() } @@ -378,20 +465,29 @@ func (w *Worker) OperateSubTask(name string, op pb.TaskOp) error { return err } -// QueryStatus query worker's sub tasks' status -func (w *Worker) QueryStatus(ctx context.Context, name string) []*pb.SubTaskStatus { +// QueryStatus query worker's sub tasks' status. If relay enabled, also return source status +// TODO: `ctx` is used to get upstream master status in every subtasks and source. +// reduce it to only call once and remove `unifyMasterBinlogPos`, also remove below ctx2 +func (w *Worker) QueryStatus(ctx context.Context, name string) ([]*pb.SubTaskStatus, *pb.RelayStatus) { w.RLock() defer w.RUnlock() - if w.closed.Get() == closedTrue { + if w.closed.Get() { w.l.Warn("querying status from a closed worker") - return nil + return nil, nil } // use one timeout for all tasks. increase this value if it's too short. ctx2, cancel2 := context.WithTimeout(ctx, utils.DefaultDBTimeout) defer cancel2() - return w.Status(ctx2, name) + var ( + subtaskStatus = w.Status(ctx2, name) + relayStatus *pb.RelayStatus + ) + if w.relayEnabled.Get() { + relayStatus = w.relayHolder.Status(ctx2) + } + return subtaskStatus, relayStatus } func (w *Worker) resetSubtaskStage() (int64, error) { @@ -521,7 +617,7 @@ func (w *Worker) operateSubTaskStage(stage ha.Stage, subTaskCfg config.SubTaskCo if st := w.subTaskHolder.findSubTask(stage.Task); st == nil { // create the subtask for expected running and paused stage. log.L().Info("start to create subtask", zap.String("sourceID", subTaskCfg.SourceID), zap.String("task", subTaskCfg.Name)) - err := w.StartSubTask(&subTaskCfg, stage.Expect) + err := w.StartSubTask(&subTaskCfg, stage.Expect, true) return opErrTypeBeforeOp, err } if stage.Expect == pb.Stage_Running { @@ -662,11 +758,11 @@ func (w *Worker) operateRelayStage(ctx context.Context, stage ha.Stage) (string, // OperateRelay operates relay unit func (w *Worker) operateRelay(ctx context.Context, op pb.RelayOp) error { - if w.closed.Get() == closedTrue { + if w.closed.Get() { return terror.ErrWorkerAlreadyClosed.Generate() } - if w.relayHolder != nil { + if w.relayEnabled.Get() { return w.relayHolder.Operate(ctx, op) } @@ -676,11 +772,11 @@ func (w *Worker) operateRelay(ctx context.Context, op pb.RelayOp) error { // PurgeRelay purges relay log files func (w *Worker) PurgeRelay(ctx context.Context, req *pb.PurgeRelayRequest) error { - if w.closed.Get() == closedTrue { + if w.closed.Get() { return terror.ErrWorkerAlreadyClosed.Generate() } - if w.relayPurger != nil { + if w.relayEnabled.Get() { return w.relayPurger.Do(ctx, req) } @@ -690,7 +786,7 @@ func (w *Worker) PurgeRelay(ctx context.Context, req *pb.PurgeRelayRequest) erro // ForbidPurge implements PurgeInterceptor.ForbidPurge func (w *Worker) ForbidPurge() (bool, string) { - if w.closed.Get() == closedTrue { + if w.closed.Get() { return false, "" } @@ -711,7 +807,7 @@ func (w *Worker) OperateSchema(ctx context.Context, req *pb.OperateWorkerSchemaR w.Lock() defer w.Unlock() - if w.closed.Get() == closedTrue { + if w.closed.Get() { return "", terror.ErrWorkerAlreadyClosed.Generate() } @@ -723,15 +819,15 @@ func (w *Worker) OperateSchema(ctx context.Context, req *pb.OperateWorkerSchemaR return st.OperateSchema(ctx, req) } -// copyConfigFromSource copies config items from source config to sub task -func copyConfigFromSource(cfg *config.SubTaskConfig, sourceCfg *config.SourceConfig) error { +// copyConfigFromSource copies config items from source config and worker's relayEnabled to sub task +func copyConfigFromSource(cfg *config.SubTaskConfig, sourceCfg *config.SourceConfig, enableRelay bool) error { cfg.From = sourceCfg.From cfg.Flavor = sourceCfg.Flavor cfg.ServerID = sourceCfg.ServerID cfg.RelayDir = sourceCfg.RelayDir cfg.EnableGTID = sourceCfg.EnableGTID - cfg.UseRelay = sourceCfg.EnableRelay + cfg.UseRelay = enableRelay // we can remove this from SubTaskConfig later, because syncer will always read from relay cfg.AutoFixGTID = sourceCfg.AutoFixGTID @@ -760,9 +856,13 @@ func copyConfigFromSource(cfg *config.SubTaskConfig, sourceCfg *config.SourceCon } // copyConfigFromSourceForEach do copyConfigFromSource for each value in subTaskCfgM and change subTaskCfgM in-place -func copyConfigFromSourceForEach(subTaskCfgM map[string]config.SubTaskConfig, sourceCfg *config.SourceConfig) error { +func copyConfigFromSourceForEach( + subTaskCfgM map[string]config.SubTaskConfig, + sourceCfg *config.SourceConfig, + enableRelay bool, +) error { for k, subTaskCfg := range subTaskCfgM { - if err2 := copyConfigFromSource(&subTaskCfg, sourceCfg); err2 != nil { + if err2 := copyConfigFromSource(&subTaskCfg, sourceCfg, enableRelay); err2 != nil { return err2 } subTaskCfgM[k] = subTaskCfg @@ -792,7 +892,7 @@ func (w *Worker) HandleError(ctx context.Context, req *pb.HandleWorkerErrorReque w.Lock() defer w.Unlock() - if w.closed.Get() == closedTrue { + if w.closed.Get() { return terror.ErrWorkerAlreadyClosed.Generate() } diff --git a/dm/worker/worker_test.go b/dm/worker/worker_test.go index 35226e18cc..1b0bc723ef 100644 --- a/dm/worker/worker_test.go +++ b/dm/worker/worker_test.go @@ -17,6 +17,7 @@ import ( "context" "fmt" "sync" + "sync/atomic" "time" . "github.com/pingcap/check" @@ -42,11 +43,6 @@ func (t *testServer) testWorker(c *C) { cfg.RelayDir = dir cfg.MetaDir = dir - NewRelayHolder = NewDummyRelayHolderWithInitError - defer func() { - NewRelayHolder = NewRealRelayHolder - }() - var ( masterAddr = tempurl.Alloc()[len("http://"):] keepAliveTTL = int64(1) @@ -69,6 +65,10 @@ func (t *testServer) testWorker(c *C) { }) c.Assert(err, IsNil) + NewRelayHolder = NewDummyRelayHolderWithInitError + defer func() { + NewRelayHolder = NewRealRelayHolder + }() w, err := NewWorker(&cfg, etcdCli, "") c.Assert(err, IsNil) c.Assert(w.EnableRelay(), ErrorMatches, "init error") @@ -77,23 +77,19 @@ func (t *testServer) testWorker(c *C) { w, err = NewWorker(&cfg, etcdCli, "") c.Assert(err, IsNil) c.Assert(w.StatusJSON(context.Background(), ""), HasLen, emptyWorkerStatusInfoJSONLength) - //c.Assert(w.closed.Get(), Equals, closedFalse) - //go func() { - // w.Start() - //}() // close twice w.Close() - c.Assert(w.closed.Get(), Equals, closedTrue) + c.Assert(w.closed.Get(), IsTrue) c.Assert(w.subTaskHolder.getAllSubTasks(), HasLen, 0) w.Close() - c.Assert(w.closed.Get(), Equals, closedTrue) + c.Assert(w.closed.Get(), IsTrue) c.Assert(w.subTaskHolder.getAllSubTasks(), HasLen, 0) - c.Assert(w.closed.Get(), Equals, closedTrue) + c.Assert(w.closed.Get(), IsTrue) c.Assert(w.StartSubTask(&config.SubTaskConfig{ Name: "testStartTask", - }, pb.Stage_Running), IsNil) + }, pb.Stage_Running, true), IsNil) task := w.subTaskHolder.findSubTask("testStartTask") c.Assert(task, NotNil) c.Assert(task.Result().String(), Matches, ".*worker already closed.*") @@ -176,18 +172,23 @@ func (t *testServer2) TestTaskAutoResume(c *C) { if s.closed.Get() { return false } - c.Assert(s.startWorker(&sourceConfig), IsNil) + w, err2 := s.getOrStartWorker(&sourceConfig) + c.Assert(err2, IsNil) + // we set sourceConfig.EnableRelay = true above + c.Assert(w.EnableRelay(), IsNil) + c.Assert(w.EnableHandleSubtasks(), IsNil) return true }), IsTrue) // start task var subtaskCfg config.SubTaskConfig c.Assert(subtaskCfg.DecodeFile("./subtask.toml", true), IsNil) c.Assert(err, IsNil) - c.Assert(s.getWorker(true).StartSubTask(&subtaskCfg, pb.Stage_Running), IsNil) + c.Assert(s.getWorker(true).StartSubTask(&subtaskCfg, pb.Stage_Running, true), IsNil) // check task in paused state c.Assert(utils.WaitSomething(100, 100*time.Millisecond, func() bool { - for _, st := range s.getWorker(true).QueryStatus(context.Background(), taskName) { + subtaskStatus, _ := s.getWorker(true).QueryStatus(context.Background(), taskName) + for _, st := range subtaskStatus { if st.Name == taskName && st.Stage == pb.Stage_Paused { return true } @@ -207,7 +208,7 @@ func (t *testServer2) TestTaskAutoResume(c *C) { // check task will be auto resumed c.Assert(utils.WaitSomething(10, 100*time.Millisecond, func() bool { - sts := s.getWorker(true).QueryStatus(context.Background(), taskName) + sts, _ := s.getWorker(true).QueryStatus(context.Background(), taskName) for _, st := range sts { if st.Name == taskName && st.Stage == pb.Stage_Running { return true @@ -218,6 +219,181 @@ func (t *testServer2) TestTaskAutoResume(c *C) { }), IsTrue) } +type testWorkerFunctionalities struct { + createUnitCount int32 + expectedCreateUnitCount int32 +} + +var _ = Suite(&testWorkerFunctionalities{}) + +func (t *testWorkerFunctionalities) SetUpSuite(c *C) { + NewRelayHolder = NewDummyRelayHolder + NewSubTask = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client) *SubTask { + return NewRealSubTask(cfg, etcdClient) + } + createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client) []unit.Unit { + atomic.AddInt32(&t.createUnitCount, 1) + mockDumper := NewMockUnit(pb.UnitType_Dump) + mockLoader := NewMockUnit(pb.UnitType_Load) + mockSync := NewMockUnit(pb.UnitType_Sync) + return []unit.Unit{mockDumper, mockLoader, mockSync} + } + getMinLocForSubTaskFunc = getFakeLocForSubTask +} + +func (t *testWorkerFunctionalities) TearDownSuite(c *C) { + NewRelayHolder = NewRealRelayHolder + NewSubTask = NewRealSubTask + createUnits = createRealUnits + getMinLocForSubTaskFunc = getMinLocForSubTask +} + +func (t *testWorkerFunctionalities) TestWorkerFunctionalities(c *C) { + var ( + masterAddr = tempurl.Alloc()[len("http://"):] + keepAliveTTL = int64(1) + ) + etcdDir := c.MkDir() + ETCD, err := createMockETCD(etcdDir, "http://"+masterAddr) + c.Assert(err, IsNil) + defer ETCD.Close() + cfg := NewConfig() + c.Assert(cfg.Parse([]string{"-config=./dm-worker.toml"}), IsNil) + cfg.Join = masterAddr + cfg.KeepAliveTTL = keepAliveTTL + cfg.RelayKeepAliveTTL = keepAliveTTL + + etcdCli, err := clientv3.New(clientv3.Config{ + Endpoints: GetJoinURLs(cfg.Join), + DialTimeout: dialTimeout, + DialKeepAliveTime: keepaliveTime, + DialKeepAliveTimeout: keepaliveTimeout, + }) + c.Assert(err, IsNil) + sourceCfg := loadSourceConfigWithoutPassword(c) + sourceCfg.EnableRelay = false + + subtaskCfg := config.SubTaskConfig{} + err = subtaskCfg.DecodeFile(subtaskSampleFile, true) + c.Assert(err, IsNil) + + // start worker + w, err := NewWorker(&sourceCfg, etcdCli, "") + c.Assert(err, IsNil) + defer w.Close() + go func() { + w.Start() + }() + c.Assert(utils.WaitSomething(50, 100*time.Millisecond, func() bool { + return !w.closed.Get() + }), IsTrue) + + // test 1: when subTaskEnabled is false, switch on relay + c.Assert(w.subTaskEnabled.Get(), IsFalse) + t.testEnableRelay(c, w, etcdCli, sourceCfg, cfg) + + // test2: when subTaskEnabled is false, switch off relay + c.Assert(w.subTaskEnabled.Get(), IsFalse) + t.testDisableRelay(c, w) + + // test3: when relayEnabled is false, switch on subtask + c.Assert(w.relayEnabled.Get(), IsFalse) + + t.testEnableHandleSubtasks(c, w, etcdCli, subtaskCfg, sourceCfg) + + // test4: when subTaskEnabled is true, switch on relay + c.Assert(w.subTaskEnabled.Get(), IsTrue) + + t.testEnableRelay(c, w, etcdCli, sourceCfg, cfg) + c.Assert(w.subTaskHolder.findSubTask(subtaskCfg.Name).cfg.UseRelay, IsTrue) + t.expectedCreateUnitCount++ + c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool { + return atomic.LoadInt32(&t.createUnitCount) == t.expectedCreateUnitCount + }), IsTrue) + + // test5: when subTaskEnabled is true, switch off relay + c.Assert(w.subTaskEnabled.Get(), IsTrue) + t.testDisableRelay(c, w) + + c.Assert(w.subTaskHolder.findSubTask(subtaskCfg.Name).cfg.UseRelay, IsFalse) + t.expectedCreateUnitCount++ + c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool { + return atomic.LoadInt32(&t.createUnitCount) == t.expectedCreateUnitCount + }), IsTrue) + + // test6: when relayEnabled is false, switch off subtask + c.Assert(w.relayEnabled.Get(), IsFalse) + + w.DisableHandleSubtasks() + c.Assert(w.subTaskEnabled.Get(), IsFalse) + + // prepare for test7 & 8 + t.testEnableRelay(c, w, etcdCli, sourceCfg, cfg) + // test7: when relayEnabled is true, switch on subtask + c.Assert(w.relayEnabled.Get(), IsTrue) + + subtaskCfg2 := subtaskCfg + subtaskCfg2.Name = "sub-task-name-2" + // we already added subtaskCfg, so below EnableHandleSubtasks will find an extra subtask + t.expectedCreateUnitCount++ + t.testEnableHandleSubtasks(c, w, etcdCli, subtaskCfg2, sourceCfg) + c.Assert(w.subTaskHolder.findSubTask(subtaskCfg.Name).cfg.UseRelay, IsTrue) + c.Assert(w.subTaskHolder.findSubTask(subtaskCfg2.Name).cfg.UseRelay, IsTrue) + + // test8: when relayEnabled is true, switch off subtask + c.Assert(w.relayEnabled.Get(), IsTrue) + + w.DisableHandleSubtasks() + c.Assert(w.subTaskEnabled.Get(), IsFalse) +} + +func (t *testWorkerFunctionalities) testEnableRelay(c *C, w *Worker, etcdCli *clientv3.Client, + sourceCfg config.SourceConfig, cfg *Config) { + c.Assert(w.EnableRelay(), IsNil) + + c.Assert(w.relayEnabled.Get(), IsTrue) + c.Assert(w.relayHolder.Stage(), Equals, pb.Stage_New) + + _, err := ha.PutSourceCfg(etcdCli, sourceCfg) + c.Assert(err, IsNil) + _, err = ha.PutRelayStageSourceBound(etcdCli, ha.NewRelayStage(pb.Stage_Running, sourceCfg.SourceID), + ha.NewSourceBound(sourceCfg.SourceID, cfg.Name)) + c.Assert(err, IsNil) + c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool { + return w.relayHolder.Stage() == pb.Stage_Running + }), IsTrue) + + _, err = ha.DeleteSourceCfgRelayStageSourceBound(etcdCli, sourceCfg.SourceID, cfg.Name) + c.Assert(err, IsNil) + c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool { + return w.relayHolder.Stage() == pb.Stage_Stopped + }), IsTrue) +} + +func (t *testWorkerFunctionalities) testDisableRelay(c *C, w *Worker) { + w.DisableRelay() + + c.Assert(w.relayEnabled.Get(), IsFalse) + c.Assert(w.relayHolder, IsNil) +} + +func (t *testWorkerFunctionalities) testEnableHandleSubtasks(c *C, w *Worker, etcdCli *clientv3.Client, + subtaskCfg config.SubTaskConfig, sourceCfg config.SourceConfig) { + c.Assert(w.EnableHandleSubtasks(), IsNil) + c.Assert(w.subTaskEnabled.Get(), IsTrue) + + _, err := ha.PutSubTaskCfgStage(etcdCli, []config.SubTaskConfig{subtaskCfg}, + []ha.Stage{ha.NewSubTaskStage(pb.Stage_Running, sourceCfg.SourceID, subtaskCfg.Name)}) + c.Assert(err, IsNil) + c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool { + return w.subTaskHolder.findSubTask(subtaskCfg.Name) != nil + }), IsTrue) + t.expectedCreateUnitCount++ + c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool { + return atomic.LoadInt32(&t.createUnitCount) == t.expectedCreateUnitCount + }), IsTrue) +} + type testWorkerEtcdCompact struct{} var _ = Suite(&testWorkerEtcdCompact{}) @@ -278,7 +454,7 @@ func (t *testWorkerEtcdCompact) TestWatchSubtaskStageEtcdCompact(c *C) { w.Start() }() c.Assert(utils.WaitSomething(50, 100*time.Millisecond, func() bool { - return w.closed.Get() == closedFalse + return !w.closed.Get() }), IsTrue) // step 2: Put a subtask config and subtask stage to this source, then delete it subtaskCfg := config.SubTaskConfig{} @@ -293,7 +469,7 @@ func (t *testWorkerEtcdCompact) TestWatchSubtaskStageEtcdCompact(c *C) { []ha.Stage{ha.NewSubTaskStage(pb.Stage_Stopped, sourceCfg.SourceID, subtaskCfg.Name)}) c.Assert(err, IsNil) // step 2.1: start a subtask manually - c.Assert(w.StartSubTask(&subtaskCfg, pb.Stage_Running), IsNil) + c.Assert(w.StartSubTask(&subtaskCfg, pb.Stage_Running, true), IsNil) // step 3: trigger etcd compaction and check whether we can receive it through watcher _, err = etcdCli.Compact(ctx, rev) c.Assert(err, IsNil) @@ -327,7 +503,7 @@ func (t *testWorkerEtcdCompact) TestWatchSubtaskStageEtcdCompact(c *C) { c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool { return w.subTaskHolder.findSubTask(subtaskCfg.Name) != nil }), IsTrue) - status := w.QueryStatus(ctx1, subtaskCfg.Name) + status, _ := w.QueryStatus(ctx1, subtaskCfg.Name) c.Assert(status, HasLen, 1) c.Assert(status[0].Name, Equals, subtaskCfg.Name) c.Assert(status[0].Stage, Equals, pb.Stage_Running) @@ -345,7 +521,7 @@ func (t *testWorkerEtcdCompact) TestWatchSubtaskStageEtcdCompact(c *C) { c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool { return w.subTaskHolder.findSubTask(subtaskCfg.Name) != nil }), IsTrue) - status = w.QueryStatus(ctx2, subtaskCfg.Name) + status, _ = w.QueryStatus(ctx2, subtaskCfg.Name) c.Assert(status, HasLen, 1) c.Assert(status[0].Name, Equals, subtaskCfg.Name) c.Assert(status[0].Stage, Equals, pb.Stage_Running) @@ -393,7 +569,7 @@ func (t *testWorkerEtcdCompact) TestWatchRelayStageEtcdCompact(c *C) { w.Start() }() c.Assert(utils.WaitSomething(50, 100*time.Millisecond, func() bool { - return w.closed.Get() == closedFalse + return !w.closed.Get() }), IsTrue) // step 2: Put a relay stage to this source, then delete it // put mysql config into relative etcd key adapter to trigger operation event diff --git a/errors.toml b/errors.toml index 9045af6af9..4a77f15f7e 100644 --- a/errors.toml +++ b/errors.toml @@ -2663,9 +2663,9 @@ workaround = "" tags = ["internal", "high"] [error.DM-dm-worker-40071] -message = "mysql source handler worker already started" +message = "mysql source worker %s has already started with source %s, but get a request with source %s" description = "" -workaround = "" +workaround = "Please try restart this DM-worker" tags = ["internal", "high"] [error.DM-dm-worker-40072] diff --git a/go.mod b/go.mod index 72289ea333..e3e0a2d367 100644 --- a/go.mod +++ b/go.mod @@ -50,3 +50,5 @@ require ( go 1.13 replace github.com/siddontang/go-mysql v1.1.1-0.20200824131207-0c5789dd0bd3 => github.com/lance6716/go-mysql v1.1.1-0.20210303100354-b0e44c2c5623 + +replace github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726 => github.com/lance6716/go v0.0.0-20210312094856-8a1d496ae7d4 diff --git a/go.sum b/go.sum index c824e67f4e..5c07f02eaa 100644 --- a/go.sum +++ b/go.sum @@ -554,6 +554,8 @@ github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/lance6716/go v0.0.0-20210312094856-8a1d496ae7d4 h1:19RYJOIPZqTJxJqwJCDdxFaav7Wp9S+gaKUc6hd+fXE= +github.com/lance6716/go v0.0.0-20210312094856-8a1d496ae7d4/go.mod h1:FcZnptvZ/cMvXwOfn5vbaV7T2rv607STeXC3LTYGYp8= github.com/lance6716/go-mysql v1.1.1-0.20210303100354-b0e44c2c5623 h1:HesfLUZvjUaluKJzKetJ6odasdH/WQ/Tc8UO1oAs0WI= github.com/lance6716/go-mysql v1.1.1-0.20210303100354-b0e44c2c5623/go.mod h1:+W4RCzesQDI11HvIkaDjS8yM36SpAnGNQ7jmTLn5BnU= github.com/leodido/go-urn v1.1.0/go.mod h1:+cyI34gQWZcE1eQU7NVgKkkzdXDQHr1dBMtdAPozLkw= @@ -938,8 +940,7 @@ github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeV github.com/shurcooL/vfsgen v0.0.0-20181020040650-a97a25d856ca/go.mod h1:TrYk7fJVaAttu97ZZKrO9UbRa8izdowaMIZcxYMbVaw= github.com/shurcooL/vfsgen v0.0.0-20181202132449-6a9ea43bcacd h1:ug7PpSOB5RBPK1Kg6qskGBoP3Vnj/aNYFTznWvlkGo0= github.com/shurcooL/vfsgen v0.0.0-20181202132449-6a9ea43bcacd/go.mod h1:TrYk7fJVaAttu97ZZKrO9UbRa8izdowaMIZcxYMbVaw= -github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726 h1:xT+JlYxNGqyT+XcU8iUrN18JYed2TvG9yN5ULG2jATM= -github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726/go.mod h1:3yhqj7WBBfRhbBlzyOC3gUxftwsU0u8gqevxwIHQpMw= +github.com/siddontang/go-log v0.0.0-20180807004314-8d05993dda07 h1:oI+RNwuC9jF2g2lP0u0cVEEZrc/AYBCuFdvwrLWM/6Q= github.com/siddontang/go-log v0.0.0-20180807004314-8d05993dda07/go.mod h1:yFdBgwXP24JziuRl2NMUahT7nGLNOKi1SIiFxMttVD4= github.com/siddontang/go-log v0.0.0-20190221022429-1e957dd83bed h1:KMgQoLJGCq1IoZpLZE3AIffh9veYWoVlsvA4ib55TMM= github.com/siddontang/go-log v0.0.0-20190221022429-1e957dd83bed/go.mod h1:yFdBgwXP24JziuRl2NMUahT7nGLNOKi1SIiFxMttVD4= diff --git a/pkg/ha/subtask.go b/pkg/ha/subtask.go index eff639a6d0..e4b12d6b8b 100644 --- a/pkg/ha/subtask.go +++ b/pkg/ha/subtask.go @@ -24,17 +24,6 @@ import ( "github.com/pingcap/dm/pkg/terror" ) -// PutSubTaskCfg puts the subtask configs of the specified source and task name into etcd. -// k/k/v: sourceID, taskName -> subtask config. -func PutSubTaskCfg(cli *clientv3.Client, cfgs ...config.SubTaskConfig) (int64, error) { - ops, err := putSubTaskCfgOp(cfgs...) - if err != nil { - return 0, err - } - _, rev, err := etcdutil.DoOpsInOneTxnWithRetry(cli, ops...) - return rev, err -} - // GetSubTaskCfg gets the subtask config of the specified source and task name. // if the config for the source not exist, return with `err == nil` and `revision=0`. // if task name is "", will return all the subtaskConfigs as a map{taskName: subtaskConfig} of the source diff --git a/pkg/ha/subtask_test.go b/pkg/ha/subtask_test.go index 40b2a0905d..2518cebeb5 100644 --- a/pkg/ha/subtask_test.go +++ b/pkg/ha/subtask_test.go @@ -47,7 +47,7 @@ func (t *testForEtcd) TestSubTaskEtcd(c *C) { c.Assert(tsm1, HasLen, 0) // put subtask configs. - rev2, err := PutSubTaskCfg(etcdTestCli, cfg1, cfg2) + rev2, err := PutSubTaskCfgStage(etcdTestCli, []config.SubTaskConfig{cfg1, cfg2}, []Stage{}) c.Assert(err, IsNil) c.Assert(rev2, Greater, rev1) @@ -92,14 +92,14 @@ func (t *testForEtcd) TestSubTaskEtcd(c *C) { c.Assert(tsm4, HasLen, 0) // put subtask config. - rev6, err := PutSubTaskCfg(etcdTestCli, cfg1) + rev6, err := PutSubTaskCfgStage(etcdTestCli, []config.SubTaskConfig{cfg1}, []Stage{}) c.Assert(err, IsNil) c.Assert(rev6, Greater, int64(0)) // update subtask config. cfg3 := cfg1 cfg3.SourceID = "testForRevision" - rev7, err := PutSubTaskCfg(etcdTestCli, cfg3) + rev7, err := PutSubTaskCfgStage(etcdTestCli, []config.SubTaskConfig{cfg3}, []Stage{}) c.Assert(err, IsNil) c.Assert(rev7, Greater, rev6) diff --git a/pkg/terror/error_list.go b/pkg/terror/error_list.go index cd67e2a958..7e1b0a5122 100644 --- a/pkg/terror/error_list.go +++ b/pkg/terror/error_list.go @@ -1131,7 +1131,7 @@ var ( ErrWorkerRelayIsPurging = New(codeWorkerRelayIsPurging, ClassDMWorker, ScopeInternal, LevelHigh, "relay log purger is purging, cannot start sub task %s", "Please try again later.") ErrWorkerHostPortNotValid = New(codeWorkerHostPortNotValid, ClassDMWorker, ScopeInternal, LevelHigh, "host:port '%s' not valid", "Please check configs in worker configuration file.") ErrWorkerNoStart = New(codeWorkerNoStart, ClassDMWorker, ScopeInternal, LevelHigh, "no mysql source is being handled in the worker", "") - ErrWorkerAlreadyStart = New(codeWorkerAlreadyStarted, ClassDMWorker, ScopeInternal, LevelHigh, "mysql source handler worker already started", "") + ErrWorkerAlreadyStart = New(codeWorkerAlreadyStarted, ClassDMWorker, ScopeInternal, LevelHigh, "mysql source worker %s has already started with source %s, but get a request with source %s", "Please try restart this DM-worker") ErrWorkerSourceNotMatch = New(codeWorkerSourceNotMatch, ClassDMWorker, ScopeInternal, LevelHigh, "source of request does not match with source in worker", "") ErrWorkerWaitRelayCatchupGTID = New(codeWorkerWaitRelayCatchupGTID, ClassDMWorker, ScopeInternal, LevelHigh, "cannot compare gtid between loader and relay, loader gtid: %s, relay gtid: %s", "") diff --git a/pkg/upgrade/upgrade.go b/pkg/upgrade/upgrade.go index 161df2e289..96c5d4a5ce 100644 --- a/pkg/upgrade/upgrade.go +++ b/pkg/upgrade/upgrade.go @@ -95,14 +95,25 @@ func TryUpgrade(cli *clientv3.Client, uctx Context) error { return err } +// UntouchVersionUpgrade runs all upgrade functions but doesn't change cluster version. This function is called when +// upgrade from v1.0, with a later PutVersion in caller after success +func UntouchVersionUpgrade(cli *clientv3.Client, uctx Context) error { + for _, upgrade := range upgrades { + err := upgrade(cli, uctx) + if err != nil { + return err + } + } + return nil +} + // upgradeToVer1 does upgrade operations from Ver0 to Ver1. // in fact, this do nothing now, and just for demonstration. func upgradeToVer1(cli *clientv3.Client, uctx Context) error { return nil } -// upgradeToVer2 does upgrade operations from Ver1 to Ver2 (v2.0.0-rc.3) to upgrade syncer checkpoint schema -// TODO: determine v2.0.0-rc.3 or another version in above line +// upgradeToVer2 does upgrade operations from Ver1 to Ver2 (v2.0.0-GA) to upgrade syncer checkpoint schema func upgradeToVer2(cli *clientv3.Client, uctx Context) error { upgradeTaskName := "upgradeToVer2" logger := log.L().WithFields(zap.String("task", upgradeTaskName)) diff --git a/relay/metrics.go b/relay/metrics.go index f58be5d27a..5a3a122dd3 100644 --- a/relay/metrics.go +++ b/relay/metrics.go @@ -14,6 +14,7 @@ package relay import ( + "context" "time" "github.com/prometheus/client_golang/prometheus" @@ -149,7 +150,7 @@ func RegisterMetrics(registry *prometheus.Registry) { registry.MustRegister(relayExitWithErrorCounter) } -func reportRelayLogSpaceInBackground(dirpath string) error { +func reportRelayLogSpaceInBackground(ctx context.Context, dirpath string) error { if len(dirpath) == 0 { return terror.ErrRelayLogDirpathEmpty.Generate() } @@ -158,7 +159,10 @@ func reportRelayLogSpaceInBackground(dirpath string) error { ticker := time.NewTicker(time.Second * 10) defer ticker.Stop() - for range ticker.C { + select { + case <-ctx.Done(): + return + case <-ticker.C: size, err := utils.GetStorageSize(dirpath) if err != nil { log.L().Error("fail to update relay log storage size", log.ShortError(err)) diff --git a/relay/relay.go b/relay/relay.go index 9d37f9ac53..bc22f85a4a 100755 --- a/relay/relay.go +++ b/relay/relay.go @@ -165,7 +165,7 @@ func (r *Relay) Init(ctx context.Context) (err error) { r.relayMetaHub = pkgstreamer.GetRelayMetaHub() r.relayMetaHub.ClearMeta() - return reportRelayLogSpaceInBackground(r.cfg.RelayDir) + return reportRelayLogSpaceInBackground(ctx, r.cfg.RelayDir) } // Process implements the dm.Unit interface. diff --git a/tests/ha_cases/lib.sh b/tests/ha_cases/lib.sh index b8300eb3c6..3a82858c5d 100755 --- a/tests/ha_cases/lib.sh +++ b/tests/ha_cases/lib.sh @@ -19,7 +19,7 @@ function load_data() { run_sql "CREATE DATABASE if not exists ${db};" $port $pswd run_sql "DROP TABLE if exists ${db}.t${i};" $port $pswd - run_sql "CREATE TABLE ${db}.t${i}(i TINYINT, j INT UNIQUE KEY);" $port $pswd + run_sql "CREATE TABLE ${db}.t${i}(i SMALLINT, j INT UNIQUE KEY);" $port $pswd for j in $(seq 800); do run_sql "INSERT INTO ${db}.t${i} VALUES ($j,${j}00$j),($j,${j}01$j);" $port $pswd sleep 0.1 diff --git a/tests/ha_cases/run.sh b/tests/ha_cases/run.sh index 9689ed2a82..bab90f49dc 100755 --- a/tests/ha_cases/run.sh +++ b/tests/ha_cases/run.sh @@ -554,13 +554,12 @@ function test_multi_task_reduce_and_restart_worker() { check_port_offline ${worker_ports[$[ $wk - 1] ]} 20 # just one worker was killed should be safe echo "${worker_inuse[$i]} was killed" + for name in ${task_name[@]}; do + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status $name"\ + "\"stage\": \"Running\"" 4 + done if [ $i = 0 ]; then - for name in ${task_name[@]}; do - run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "query-status $name"\ - "\"stage\": \"Running\"" 4 - done - # waiting for syncing wait sleep 2 @@ -568,18 +567,6 @@ function test_multi_task_reduce_and_restart_worker() { check_sync_diff $WORK_DIR $cur/conf/diff_config.toml check_sync_diff $WORK_DIR $cur/conf/diff_config_multi_task.toml echo "data checked after one worker was killed" - else - status_str="" - for name in ${task_name[@]}; do - status_str=$status_str$($PWD/bin/dmctl.test DEVEL --master-addr "127.0.0.1":$MASTER_PORT query-status $name) - done - search_str="\"stage\": \"Running\"" - running_count=$(echo $status_str | sed "s/$search_str/$search_str\n/g" | grep -c "$search_str") - if [ $running_count != 8 ]; then - echo "error running worker" - echo $status_str - exit 1 - fi fi done echo "[$(date)] <<<<<< finish test_multi_task_reduce_and_restart_worker >>>>>>"