Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
*: check channel closed in select{} (#963)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-srebot authored Aug 31, 2020
1 parent 4e7e3d1 commit 7ebe94a
Show file tree
Hide file tree
Showing 10 changed files with 53 additions and 16 deletions.
13 changes: 9 additions & 4 deletions dm/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,14 +348,14 @@ func (s *Server) retryWriteEctd(ops ...clientv3.Op) string {
}

func (s *Server) handleSourceBound(ctx context.Context, boundCh chan ha.SourceBound, errCh chan error) error {
OUTER:
for {
select {
case <-ctx.Done():
log.L().Info("worker server is closed, handleSourceBound will quit now")
return nil
break OUTER
case bound, ok := <-boundCh:
if !ok {
continue
break OUTER
}
err := s.operateSourceBound(bound)
s.setSourceStatus(bound.Source, err, true)
Expand All @@ -368,14 +368,19 @@ func (s *Server) handleSourceBound(ctx context.Context, boundCh chan ha.SourceBo
return err
}
}
case err := <-errCh:
case err, ok := <-errCh:
if !ok {
break OUTER
}
// TODO: Deal with err
log.L().Error("WatchSourceBound received an error", zap.Error(err))
if etcdutil.IsRetryableError(err) {
return err
}
}
}
log.L().Info("worker server is closed, handleSourceBound will quit now")
return nil
}

func (s *Server) operateSourceBound(bound ha.SourceBound) error {
Expand Down
16 changes: 12 additions & 4 deletions dm/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,24 +532,32 @@ func (w *Worker) observeRelayStage(ctx context.Context, etcdCli *clientv3.Client
}

func (w *Worker) handleRelayStage(ctx context.Context, stageCh chan ha.Stage, errCh chan error) error {
OUTER:
for {
select {
case <-ctx.Done():
log.L().Info("worker is closed, handleRelayStage will quit now")
return nil
case stage := <-stageCh:
break OUTER
case stage, ok := <-stageCh:
if !ok {
break OUTER
}
opType, err := w.operateRelayStage(ctx, stage)
if err != nil {
opErrCounter.WithLabelValues(w.name, opType).Inc()
log.L().Error("fail to operate relay", zap.Stringer("stage", stage), zap.Error(err))
}
case err := <-errCh:
case err, ok := <-errCh:
if !ok {
break OUTER
}
log.L().Error("WatchRelayStage received an error", zap.Error(err))
if etcdutil.IsRetryableError(err) {
return err
}
}
}
log.L().Info("worker is closed, handleRelayStage will quit now")
return nil
}

// operateRelayStage returns RelayOp.String() additionally to record metrics
Expand Down
5 changes: 4 additions & 1 deletion pkg/ha/bound.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,10 @@ func WatchSourceBound(ctx context.Context, cli *clientv3.Client,
select {
case <-ctx.Done():
return
case resp := <-ch:
case resp, ok := <-ch:
if !ok {
return
}
if resp.Canceled {
// TODO(csuzhangxc): do retry here.
if resp.Err() != nil {
Expand Down
5 changes: 4 additions & 1 deletion pkg/ha/keepalive.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,10 @@ func WatchWorkerEvent(ctx context.Context, cli *clientv3.Client, rev int64, outC
case <-ctx.Done():
log.L().Info("watch keepalive worker quit due to context canceled")
return
case resp := <-ch:
case resp, ok := <-ch:
if !ok {
return
}
if resp.Canceled {
select {
case errCh <- resp.Err():
Expand Down
5 changes: 4 additions & 1 deletion pkg/ha/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,10 @@ func watchStage(ctx context.Context, watchCh clientv3.WatchChan,
select {
case <-ctx.Done():
return
case resp := <-watchCh:
case resp, ok := <-watchCh:
if !ok {
return
}
if resp.Canceled {
// TODO(csuzhangxc): do retry here.
if resp.Err() != nil {
Expand Down
5 changes: 4 additions & 1 deletion pkg/shardddl/optimism/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,10 @@ func WatchInfo(ctx context.Context, cli *clientv3.Client, revision int64,
select {
case <-ctx.Done():
return
case resp := <-ch:
case resp, ok := <-ch:
if !ok {
return
}
if resp.Canceled {
select {
case errCh <- resp.Err():
Expand Down
5 changes: 4 additions & 1 deletion pkg/shardddl/optimism/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,10 @@ func WatchOperationPut(ctx context.Context, cli *clientv3.Client,
select {
case <-ctx.Done():
return
case resp := <-ch:
case resp, ok := <-ch:
if !ok {
return
}
if resp.Canceled {
select {
case errCh <- resp.Err():
Expand Down
5 changes: 4 additions & 1 deletion pkg/shardddl/optimism/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,10 @@ func WatchSourceTables(ctx context.Context, cli *clientv3.Client, revision int64
select {
case <-ctx.Done():
return
case resp := <-ch:
case resp, ok := <-ch:
if !ok {
return
}
if resp.Canceled {
select {
case errCh <- resp.Err():
Expand Down
5 changes: 4 additions & 1 deletion pkg/shardddl/pessimism/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,10 @@ func WatchInfoPut(ctx context.Context, cli *clientv3.Client, revision int64, out
select {
case <-ctx.Done():
return
case resp := <-ch:
case resp, ok := <-ch:
if !ok {
return
}
if resp.Canceled {
select {
case errCh <- resp.Err():
Expand Down
5 changes: 4 additions & 1 deletion pkg/shardddl/pessimism/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,10 @@ func watchOperation(ctx context.Context, cli *clientv3.Client, watchType mvccpb.
select {
case <-ctx.Done():
return
case resp := <-ch:
case resp, ok := <-ch:
if !ok {
return
}
if resp.Canceled {
select {
case errCh <- resp.Err():
Expand Down

0 comments on commit 7ebe94a

Please sign in to comment.