diff --git a/dm/worker/server.go b/dm/worker/server.go index 0c5dcab182..ad0aef2002 100644 --- a/dm/worker/server.go +++ b/dm/worker/server.go @@ -348,14 +348,14 @@ func (s *Server) retryWriteEctd(ops ...clientv3.Op) string { } func (s *Server) handleSourceBound(ctx context.Context, boundCh chan ha.SourceBound, errCh chan error) error { +OUTER: for { select { case <-ctx.Done(): - log.L().Info("worker server is closed, handleSourceBound will quit now") - return nil + break OUTER case bound, ok := <-boundCh: if !ok { - continue + break OUTER } err := s.operateSourceBound(bound) s.setSourceStatus(bound.Source, err, true) @@ -368,7 +368,10 @@ func (s *Server) handleSourceBound(ctx context.Context, boundCh chan ha.SourceBo return err } } - case err := <-errCh: + case err, ok := <-errCh: + if !ok { + break OUTER + } // TODO: Deal with err log.L().Error("WatchSourceBound received an error", zap.Error(err)) if etcdutil.IsRetryableError(err) { @@ -376,6 +379,8 @@ func (s *Server) handleSourceBound(ctx context.Context, boundCh chan ha.SourceBo } } } + log.L().Info("worker server is closed, handleSourceBound will quit now") + return nil } func (s *Server) operateSourceBound(bound ha.SourceBound) error { diff --git a/dm/worker/worker.go b/dm/worker/worker.go index d33f915af8..284f47c91b 100644 --- a/dm/worker/worker.go +++ b/dm/worker/worker.go @@ -532,24 +532,32 @@ func (w *Worker) observeRelayStage(ctx context.Context, etcdCli *clientv3.Client } func (w *Worker) handleRelayStage(ctx context.Context, stageCh chan ha.Stage, errCh chan error) error { +OUTER: for { select { case <-ctx.Done(): - log.L().Info("worker is closed, handleRelayStage will quit now") - return nil - case stage := <-stageCh: + break OUTER + case stage, ok := <-stageCh: + if !ok { + break OUTER + } opType, err := w.operateRelayStage(ctx, stage) if err != nil { opErrCounter.WithLabelValues(w.name, opType).Inc() log.L().Error("fail to operate relay", zap.Stringer("stage", stage), zap.Error(err)) } - case err := <-errCh: + case err, ok := <-errCh: + if !ok { + break OUTER + } log.L().Error("WatchRelayStage received an error", zap.Error(err)) if etcdutil.IsRetryableError(err) { return err } } } + log.L().Info("worker is closed, handleRelayStage will quit now") + return nil } // operateRelayStage returns RelayOp.String() additionally to record metrics diff --git a/pkg/ha/bound.go b/pkg/ha/bound.go index 3805aa08b0..a36589fc4a 100644 --- a/pkg/ha/bound.go +++ b/pkg/ha/bound.go @@ -223,7 +223,10 @@ func WatchSourceBound(ctx context.Context, cli *clientv3.Client, select { case <-ctx.Done(): return - case resp := <-ch: + case resp, ok := <-ch: + if !ok { + return + } if resp.Canceled { // TODO(csuzhangxc): do retry here. if resp.Err() != nil { diff --git a/pkg/ha/keepalive.go b/pkg/ha/keepalive.go index 6b59f7da92..53920e0193 100644 --- a/pkg/ha/keepalive.go +++ b/pkg/ha/keepalive.go @@ -130,7 +130,10 @@ func WatchWorkerEvent(ctx context.Context, cli *clientv3.Client, rev int64, outC case <-ctx.Done(): log.L().Info("watch keepalive worker quit due to context canceled") return - case resp := <-ch: + case resp, ok := <-ch: + if !ok { + return + } if resp.Canceled { select { case errCh <- resp.Err(): diff --git a/pkg/ha/stage.go b/pkg/ha/stage.go index 7c96b2e043..584e61fb57 100644 --- a/pkg/ha/stage.go +++ b/pkg/ha/stage.go @@ -322,7 +322,10 @@ func watchStage(ctx context.Context, watchCh clientv3.WatchChan, select { case <-ctx.Done(): return - case resp := <-watchCh: + case resp, ok := <-watchCh: + if !ok { + return + } if resp.Canceled { // TODO(csuzhangxc): do retry here. if resp.Err() != nil { diff --git a/pkg/shardddl/optimism/info.go b/pkg/shardddl/optimism/info.go index bc2c648fe9..627371de02 100644 --- a/pkg/shardddl/optimism/info.go +++ b/pkg/shardddl/optimism/info.go @@ -153,7 +153,10 @@ func WatchInfo(ctx context.Context, cli *clientv3.Client, revision int64, select { case <-ctx.Done(): return - case resp := <-ch: + case resp, ok := <-ch: + if !ok { + return + } if resp.Canceled { select { case errCh <- resp.Err(): diff --git a/pkg/shardddl/optimism/operation.go b/pkg/shardddl/optimism/operation.go index c230a9c50a..645cd2783e 100644 --- a/pkg/shardddl/optimism/operation.go +++ b/pkg/shardddl/optimism/operation.go @@ -214,7 +214,10 @@ func WatchOperationPut(ctx context.Context, cli *clientv3.Client, select { case <-ctx.Done(): return - case resp := <-ch: + case resp, ok := <-ch: + if !ok { + return + } if resp.Canceled { select { case errCh <- resp.Err(): diff --git a/pkg/shardddl/optimism/table.go b/pkg/shardddl/optimism/table.go index ed17b1b885..b9bed67394 100644 --- a/pkg/shardddl/optimism/table.go +++ b/pkg/shardddl/optimism/table.go @@ -237,7 +237,10 @@ func WatchSourceTables(ctx context.Context, cli *clientv3.Client, revision int64 select { case <-ctx.Done(): return - case resp := <-ch: + case resp, ok := <-ch: + if !ok { + return + } if resp.Canceled { select { case errCh <- resp.Err(): diff --git a/pkg/shardddl/pessimism/info.go b/pkg/shardddl/pessimism/info.go index 4c794bb5f1..993ca78520 100644 --- a/pkg/shardddl/pessimism/info.go +++ b/pkg/shardddl/pessimism/info.go @@ -174,7 +174,10 @@ func WatchInfoPut(ctx context.Context, cli *clientv3.Client, revision int64, out select { case <-ctx.Done(): return - case resp := <-ch: + case resp, ok := <-ch: + if !ok { + return + } if resp.Canceled { select { case errCh <- resp.Err(): diff --git a/pkg/shardddl/pessimism/operation.go b/pkg/shardddl/pessimism/operation.go index 8ae96578e4..7f65293155 100644 --- a/pkg/shardddl/pessimism/operation.go +++ b/pkg/shardddl/pessimism/operation.go @@ -227,7 +227,10 @@ func watchOperation(ctx context.Context, cli *clientv3.Client, watchType mvccpb. select { case <-ctx.Done(): return - case resp := <-ch: + case resp, ok := <-ch: + if !ok { + return + } if resp.Canceled { select { case errCh <- resp.Err():