Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

worker: create subtask for paused stage (#1165) #1183

Merged
merged 1 commit into from
Oct 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions .github/workflows/chaos-mesh.yml
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,9 @@ jobs:
kubectl describe -f $GITHUB_WORKSPACE/chaos/manifests/mysql.yaml
- name: Wait for MySQL ready # kubectl wait --all not working
run: |
kubectl wait --for=condition=Ready pod/mysql-0 --timeout=120s || true
kubectl wait --for=condition=Ready pod/mysql-0 --timeout=300s || true
sleep 10
kubectl wait --for=condition=Ready pod/mysql-1 --timeout=120s || true
kubectl wait --for=condition=Ready pod/mysql-1 --timeout=300s || true
echo show pvc
kubectl get pvc -l app=mysql -o wide
echo show pv
Expand All @@ -139,7 +139,7 @@ jobs:
kubectl describe -f $GITHUB_WORKSPACE/chaos/manifests/tidb.yaml
- name: Wait for TiDB ready
run: |
kubectl wait --for=condition=Ready pod/tidb-0 --timeout=120s || true
kubectl wait --for=condition=Ready pod/tidb-0 --timeout=300s || true
echo show pvc
kubectl get pvc -l app=tidb -o wide
echo show pv
Expand All @@ -165,7 +165,7 @@ jobs:
- name: Wait for DM-master ready
run: |
sleep 10
kubectl wait --for=condition=Ready pod -l app=dm-master --all --timeout=120s || true
kubectl wait --for=condition=Ready pod -l app=dm-master --all --timeout=300s || true
echo "<<<<< show pvc >>>>>"
kubectl get pvc -l app=dm-master -o wide
echo "<<<<< show pv >>>>>"
Expand Down Expand Up @@ -202,7 +202,7 @@ jobs:
- name: Wait for DM-worker ready
run: |
sleep 10
kubectl wait --for=condition=Ready pod -l app=dm-worker --all --timeout=120s || true
kubectl wait --for=condition=Ready pod -l app=dm-worker --all --timeout=300s || true
echo "<<<<< show pvc >>>>>"
kubectl get pvc -l app=dm-worker -o wide
echo "<<<<< show pv >>>>>"
Expand Down
6 changes: 3 additions & 3 deletions _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -410,9 +410,7 @@ ErrWorkerGetVersionFromKV,[code=40045:class=dm-worker:scope=internal:level=high]
ErrWorkerSaveVersionToKV,[code=40046:class=dm-worker:scope=internal:level=high], "Message: save version %v into levelDB with key %v"
ErrWorkerVerAutoDowngrade,[code=40047:class=dm-worker:scope=internal:level=high], "Message: the previous version %s is newer than current %s, automatic downgrade is not supported now, please handle it manually"
ErrWorkerStartService,[code=40048:class=dm-worker:scope=internal:level=high], "Message: start server"
ErrWorkerNoStart,[code=40070:class=dm-worker:scope=internal:level=high], "Message: worker has not started"
ErrWorkerAlreadyClosed,[code=40049:class=dm-worker:scope=internal:level=high], "Message: worker already closed"
ErrWorkerAlreadyStart,[code=40071:class=dm-worker:scope=internal:level=high], "Message: worker already started"
ErrWorkerAlreadyClosed,[code=40049:class=dm-worker:scope=internal:level=high], "Message: mysql source handler worker already closed"
ErrWorkerNotRunningStage,[code=40050:class=dm-worker:scope=internal:level=high], "Message: current stage is %s but not running, invalid"
ErrWorkerNotPausedStage,[code=40051:class=dm-worker:scope=internal:level=high], "Message: current stage is %s but not paused, invalid"
ErrWorkerUpdateTaskStage,[code=40052:class=dm-worker:scope=internal:level=high], "Message: can only update task on Paused stage, but current stage is %s, Workaround: Please use `pause-task` command to pause the task."
Expand All @@ -433,6 +431,8 @@ ErrWorkerExecDDLTimeout,[code=40066:class=dm-worker:scope=internal:level=high],
ErrWorkerWaitRelayCatchupTimeout,[code=40067:class=dm-worker:scope=internal:level=high], "Message: waiting for relay binlog pos to catch up with loader end binlog pos is timeout (exceeding %s), loader end binlog pos: %s, relay binlog pos: %s"
ErrWorkerRelayIsPurging,[code=40068:class=dm-worker:scope=internal:level=high], "Message: relay log purger is purging, cannot start sub task %s, Workaround: Please try again later."
ErrWorkerHostPortNotValid,[code=40069:class=dm-worker:scope=internal:level=high], "Message: host:port '%s' not valid, Workaround: Please check configs in worker configuration file."
ErrWorkerNoStart,[code=40070:class=dm-worker:scope=internal:level=high], "Message: no mysql source is being handled in the worker"
ErrWorkerAlreadyStart,[code=40071:class=dm-worker:scope=internal:level=high], "Message: mysql source handler worker already started"
ErrWorkerSourceNotMatch,[code=40072:class=dm-worker:scope=internal:level=high], "Message: source of request does not match with source in worker"
ErrWorkerFailToGetSubtaskConfigFromEtcd,[code=40073:class=dm-worker:scope=internal:level=medium], "Message: there is no relative subtask config for task %s in etcd"
ErrWorkerFailToGetSourceConfigFromEtcd,[code=40074:class=dm-worker:scope=internal:level=medium], "Message: there is no relative source config for source %s in etcd"
Expand Down
12 changes: 6 additions & 6 deletions chaos/cases/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,13 @@ func newTask(ctx context.Context, cli pb.MasterClient, taskFile string, schema s
)
for i := range taskCfg.MySQLInstances { // only use necessary part of sources.
cfg := sourcesCfg[i]
db, err := conn.DefaultDBProvider.Apply(cfg)
if err != nil {
return nil, err
db, err2 := conn.DefaultDBProvider.Apply(cfg)
if err2 != nil {
return nil, err2
}
conn, err := createDBConn(ctx, db, schema)
if err != nil {
return nil, err
conn, err2 := createDBConn(ctx, db, schema)
if err2 != nil {
return nil, err2
}
sourceDBs = append(sourceDBs, db)
sourceConns = append(sourceConns, conn)
Expand Down
14 changes: 0 additions & 14 deletions dm/worker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,17 +217,3 @@ func (c *Config) configFromFile(path string) error {
}
return nil
}

// Reload reload configure from ConfigFile
func (c *Config) Reload() error {
if c.ConfigFile == "" {
c.ConfigFile = "dm-worker-config.bak"
}

err := c.configFromFile(c.ConfigFile)
if err != nil {
return err
}

return nil
}
6 changes: 3 additions & 3 deletions dm/worker/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (s *Server) JoinMaster(endpoints []string) error {
if conn != nil {
conn.Close()
}
log.L().Error("fail to dial dm-master", zap.Error(err))
log.L().Error("fail to dial dm-master", zap.String("endpoint", endpoint), zap.Error(err))
continue
}
client := pb.NewMasterClient(conn)
Expand All @@ -70,11 +70,11 @@ func (s *Server) JoinMaster(endpoints []string) error {
cancel1()
conn.Close()
if err != nil {
log.L().Error("fail to register worker", zap.Error(err))
log.L().Error("fail to register worker", zap.String("endpoint", endpoint), zap.Error(err))
continue
}
if !resp.GetResult() {
log.L().Error("fail to register worker", zap.String("error", resp.Msg))
log.L().Error("fail to register worker", zap.String("endpoint", endpoint), zap.String("error", resp.Msg))
continue
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion dm/worker/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func InitStatus(lis net.Listener) {
}
err := httpS.Serve(lis)
if err != nil && !common.IsErrNetClosing(err) && err != http.ErrServerClosed {
log.L().Error("fail to start status server return", log.ShortError(err))
log.L().Error("status server returned", log.ShortError(err))
}
}

Expand Down
81 changes: 41 additions & 40 deletions dm/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ func NewServer(cfg *Config) *Server {

// Start starts to serving
func (s *Server) Start() error {
log.L().Info("starting dm-worker server")
RegistryMetrics()
s.ctx, s.cancel = context.WithCancel(context.Background())
tls, err := toolutils.NewTLS(s.cfg.SSLCA, s.cfg.SSLCert, s.cfg.SSLKey, s.cfg.AdvertiseAddr, s.cfg.CertAllowedCN)
if err != nil {
return terror.ErrWorkerTLSConfigNotValid.Delegate(err)
Expand All @@ -95,9 +98,6 @@ func (s *Server) Start() error {
}
s.rootLis = tls.WrapListener(rootLis)

log.L().Info("Start Server")
s.setWorker(nil, true)
s.ctx, s.cancel = context.WithCancel(context.Background())
s.etcdClient, err = clientv3.New(clientv3.Config{
Endpoints: GetJoinURLs(s.cfg.Join),
DialTimeout: dialTimeout,
Expand All @@ -109,24 +109,19 @@ func (s *Server) Start() error {
return err
}

s.wg.Add(1)
go func() {
s.syncMasterEndpoints(s.ctx)
s.wg.Done()
}()

s.setWorker(nil, true)
bound, sourceCfg, revBound, err := ha.GetSourceBoundConfig(s.etcdClient, s.cfg.Name)
if err != nil {
// TODO: need retry
return err
}
if !bound.IsEmpty() {
log.L().Warn("worker has been assigned source before keepalive")
log.L().Warn("worker has been assigned source before keepalive", zap.Stringer("bound", bound), zap.Bool("is deleted", bound.IsDeleted))
err = s.startWorker(&sourceCfg)
s.setSourceStatus(bound.Source, err, true)
if err != nil {
log.L().Error("fail to operate sourceBound on worker", zap.String("worker", s.cfg.Name),
zap.Stringer("bound", bound), zap.Error(err))
// if DM-worker can't handle pre-assigned source before keepalive, it simply exits with the error,
// because no re-assigned mechanism exists for keepalived DM-worker yet.
return err
}
}

Expand All @@ -136,12 +131,18 @@ func (s *Server) Start() error {
s.wg.Done()
}()

s.wg.Add(1)
go func() {
s.syncMasterEndpoints(s.ctx)
s.wg.Done()
}()

s.wg.Add(1)
go func() {
defer s.wg.Done()
// TODO: handle fatal error from observeSourceBound
//nolint:errcheck
s.observeSourceBound(s.ctx, s.etcdClient, revBound)
s.observeSourceBound(s.ctx, revBound)
}()

s.wg.Add(1)
Expand All @@ -165,18 +166,23 @@ func (s *Server) Start() error {
// NOTE: don't need to set tls config, because rootLis already use tls
s.svr = grpc.NewServer()
pb.RegisterWorkerServer(s.svr, s)
s.wg.Add(1)
go func() {
defer s.wg.Done()
err2 := s.svr.Serve(grpcL)
if err2 != nil && !common.IsErrNetClosing(err2) && err2 != cmux.ErrListenerClosed {
log.L().Error("fail to start gRPC server", log.ShortError(err2))
log.L().Error("gRPC server returned", log.ShortError(err2))
}
}()

RegistryMetrics()
go InitStatus(httpL) // serve status
s.wg.Add(1)
go func() {
defer s.wg.Done()
InitStatus(httpL) // serve status
}()

s.closed.Set(false)
log.L().Info("start gRPC API", zap.String("listened address", s.cfg.WorkerAddr))
log.L().Info("listening gRPC API and status request", zap.String("address", s.cfg.WorkerAddr))
err = m.Serve()
if err != nil && common.IsErrNetClosing(err) {
err = nil
Expand Down Expand Up @@ -220,7 +226,7 @@ func (s *Server) syncMasterEndpoints(ctx context.Context) {
}
}

func (s *Server) observeSourceBound(ctx context.Context, etcdCli *clientv3.Client, rev int64) error {
func (s *Server) observeSourceBound(ctx context.Context, rev int64) error {
var wg sync.WaitGroup
for {
sourceBoundCh := make(chan ha.SourceBound, 10)
Expand All @@ -234,7 +240,7 @@ func (s *Server) observeSourceBound(ctx context.Context, etcdCli *clientv3.Clien
close(sourceBoundErrCh)
wg.Done()
}()
ha.WatchSourceBound(ctx1, etcdCli, s.cfg.Name, rev+1, sourceBoundCh, sourceBoundErrCh)
ha.WatchSourceBound(ctx1, s.etcdClient, s.cfg.Name, rev+1, sourceBoundCh, sourceBoundErrCh)
}()
err := s.handleSourceBound(ctx1, sourceBoundCh, sourceBoundErrCh)
cancel1()
Expand Down Expand Up @@ -272,8 +278,7 @@ func (s *Server) observeSourceBound(ctx context.Context, etcdCli *clientv3.Clien
err1 = s.startWorker(&cfg)
s.setSourceStatus(bound.Source, err1, true)
if err1 != nil {
log.L().Error("fail to operate sourceBound on worker", zap.String("worker", s.cfg.Name),
zap.Stringer("bound", bound), zap.Error(err1))
log.L().Error("fail to operate sourceBound on worker", zap.Stringer("bound", bound), zap.Bool("is deleted", bound.IsDeleted), zap.Error(err1))
}
}
}
Expand Down Expand Up @@ -396,13 +401,12 @@ OUTER:
if !ok {
break OUTER
}
log.L().Info("receive source bound", zap.Stringer("bound", bound), zap.Bool("is deleted", bound.IsDeleted))
err := s.operateSourceBound(bound)
s.setSourceStatus(bound.Source, err, true)
if err != nil {
// record the reason for operating source bound
opErrCounter.WithLabelValues(s.cfg.Name, opErrTypeSourceBound).Inc()
log.L().Error("fail to operate sourceBound on worker", zap.String("worker", s.cfg.Name),
zap.Stringer("bound", bound), zap.Error(err))
log.L().Error("fail to operate sourceBound on worker", zap.Stringer("bound", bound), zap.Bool("is deleted", bound.IsDeleted), zap.Error(err))
if etcdutil.IsRetryableError(err) {
return err
}
Expand Down Expand Up @@ -451,7 +455,7 @@ func (s *Server) QueryStatus(ctx context.Context, req *pb.QueryStatusRequest) (*

w := s.getWorker(true)
if w == nil {
log.L().Error("fail to call QueryStatus, because mysql worker has not been started")
log.L().Warn("fail to call QueryStatus, because no mysql source is being handled in the worker")
resp.Result = false
resp.Msg = terror.ErrWorkerNoStart.Error()
return resp, nil
Expand All @@ -475,15 +479,14 @@ func (s *Server) PurgeRelay(ctx context.Context, req *pb.PurgeRelayRequest) (*pb
log.L().Info("", zap.String("request", "PurgeRelay"), zap.Stringer("payload", req))
w := s.getWorker(true)
if w == nil {
log.L().Error("fail to call StartSubTask, because mysql worker has not been started")
log.L().Warn("fail to call StartSubTask, because no mysql source is being handled in the worker")
return makeCommonWorkerResponse(terror.ErrWorkerNoStart.Generate()), nil
}

err := w.PurgeRelay(ctx, req)
if err != nil {
log.L().Error("fail to purge relay", zap.String("request", "PurgeRelay"), zap.Stringer("payload", req), zap.Error(err))
}
// TODO: check whether this interface need to store message in ETCD
return makeCommonWorkerResponse(err), nil
}

Expand All @@ -493,10 +496,10 @@ func (s *Server) OperateSchema(ctx context.Context, req *pb.OperateWorkerSchemaR

w := s.getWorker(true)
if w == nil {
log.L().Error("fail to call OperateSchema, because mysql worker has not been started")
log.L().Warn("fail to call OperateSchema, because no mysql source is being handled in the worker")
return makeCommonWorkerResponse(terror.ErrWorkerNoStart.Generate()), nil
} else if req.Source != w.cfg.SourceID {
log.L().Error("fail to call OperateSchema, because source mismatch")
log.L().Error("fail to call OperateSchema, because source mismatch", zap.String("request", req.Source), zap.String("current", w.cfg.SourceID))
return makeCommonWorkerResponse(terror.ErrWorkerSourceNotMatch.Generate()), nil
}

Expand All @@ -517,8 +520,7 @@ func (s *Server) startWorker(cfg *config.SourceConfig) error {
defer s.Unlock()
if w := s.getWorker(false); w != nil {
if w.cfg.SourceID == cfg.SourceID {
// This mysql task has started. It may be a repeated request. Just return true
log.L().Info("This mysql task has started. It may be a repeated request. Just return true", zap.String("sourceID", s.worker.cfg.SourceID))
log.L().Info("mysql source is being handled", zap.String("sourceID", s.worker.cfg.SourceID))
return nil
}
return terror.ErrWorkerAlreadyStart.Generate()
Expand All @@ -528,7 +530,6 @@ func (s *Server) startWorker(cfg *config.SourceConfig) error {
// because triggering these events is useless now
subTaskStages, subTaskCfgm, revSubTask, err := ha.GetSubTaskStageConfig(s.etcdClient, cfg.SourceID)
if err != nil {
// TODO: need retry
return err
}

Expand Down Expand Up @@ -558,8 +559,7 @@ func (s *Server) startWorker(cfg *config.SourceConfig) error {
}
}

log.L().Info("start worker", zap.String("sourceCfg", cfg.String()), zap.Reflect("subTasks", subTaskCfgs))

log.L().Info("starting to handle mysql source", zap.String("sourceCfg", cfg.String()), zap.Reflect("subTasks", subTaskCfgs))
w, err := NewWorker(cfg, s.etcdClient, s.cfg.Name)
if err != nil {
return err
Expand Down Expand Up @@ -587,17 +587,17 @@ func (s *Server) startWorker(cfg *config.SourceConfig) error {
return w.closed.Get() == closedFalse
})
if !isStarted {
// TODO: add more mechanism to wait
// TODO: add more mechanism to wait or un-bound the source
return terror.ErrWorkerNoStart
}

for _, subTaskCfg := range subTaskCfgs {
expectStage := subTaskStages[subTaskCfg.Name]
if expectStage.IsDeleted || expectStage.Expect != pb.Stage_Running {
if expectStage.IsDeleted {
continue
}
w.StartSubTask(subTaskCfg)
log.L().Info("load subtask", zap.String("sourceID", subTaskCfg.SourceID), zap.String("task", subTaskCfg.Name))
log.L().Info("start to create subtask", zap.String("sourceID", subTaskCfg.SourceID), zap.String("task", subTaskCfg.Name))
w.StartSubTask(subTaskCfg, expectStage.Expect)
}

w.wg.Add(1)
Expand All @@ -618,6 +618,7 @@ func (s *Server) startWorker(cfg *config.SourceConfig) error {
}()
}

log.L().Info("started to handle mysql source", zap.String("sourceCfg", cfg.String()))
return nil
}

Expand Down Expand Up @@ -762,7 +763,7 @@ func (s *Server) HandleError(ctx context.Context, req *pb.HandleWorkerErrorReque

w := s.getWorker(true)
if w == nil {
log.L().Error("fail to call HandleError, because mysql worker has not been started")
log.L().Warn("fail to call HandleError, because no mysql source is being handled in the worker")
return makeCommonWorkerResponse(terror.ErrWorkerNoStart.Generate()), nil
}

Expand Down
4 changes: 2 additions & 2 deletions dm/worker/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ func (t *testServer) TestWatchSourceBoundEtcdCompact(c *C) {
wg.Add(1)
go func() {
defer wg.Done()
c.Assert(s.observeSourceBound(ctx1, etcdCli, startRev), IsNil)
c.Assert(s.observeSourceBound(ctx1, startRev), IsNil)
}()
// step 4.1: should stop the running worker, source bound has been deleted, should stop this worker
time.Sleep(time.Second)
Expand All @@ -302,7 +302,7 @@ func (t *testServer) TestWatchSourceBoundEtcdCompact(c *C) {
wg.Add(1)
go func() {
defer wg.Done()
c.Assert(s.observeSourceBound(ctx2, etcdCli, startRev), IsNil)
c.Assert(s.observeSourceBound(ctx2, startRev), IsNil)
}()
c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool {
return s.getWorker(true) != nil
Expand Down
Loading