Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

*: check channel closed in select{} (#962) #963

Merged
merged 1 commit into from
Aug 31, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions dm/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,14 +348,14 @@ func (s *Server) retryWriteEctd(ops ...clientv3.Op) string {
}

func (s *Server) handleSourceBound(ctx context.Context, boundCh chan ha.SourceBound, errCh chan error) error {
OUTER:
for {
select {
case <-ctx.Done():
log.L().Info("worker server is closed, handleSourceBound will quit now")
return nil
break OUTER
case bound, ok := <-boundCh:
if !ok {
continue
break OUTER
}
err := s.operateSourceBound(bound)
s.setSourceStatus(bound.Source, err, true)
Expand All @@ -368,14 +368,19 @@ func (s *Server) handleSourceBound(ctx context.Context, boundCh chan ha.SourceBo
return err
}
}
case err := <-errCh:
case err, ok := <-errCh:
if !ok {
break OUTER
}
// TODO: Deal with err
log.L().Error("WatchSourceBound received an error", zap.Error(err))
if etcdutil.IsRetryableError(err) {
return err
}
}
}
log.L().Info("worker server is closed, handleSourceBound will quit now")
return nil
}

func (s *Server) operateSourceBound(bound ha.SourceBound) error {
Expand Down
16 changes: 12 additions & 4 deletions dm/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,24 +532,32 @@ func (w *Worker) observeRelayStage(ctx context.Context, etcdCli *clientv3.Client
}

func (w *Worker) handleRelayStage(ctx context.Context, stageCh chan ha.Stage, errCh chan error) error {
OUTER:
for {
select {
case <-ctx.Done():
log.L().Info("worker is closed, handleRelayStage will quit now")
return nil
case stage := <-stageCh:
break OUTER
case stage, ok := <-stageCh:
if !ok {
break OUTER
}
opType, err := w.operateRelayStage(ctx, stage)
if err != nil {
opErrCounter.WithLabelValues(w.name, opType).Inc()
log.L().Error("fail to operate relay", zap.Stringer("stage", stage), zap.Error(err))
}
case err := <-errCh:
case err, ok := <-errCh:
if !ok {
break OUTER
}
log.L().Error("WatchRelayStage received an error", zap.Error(err))
if etcdutil.IsRetryableError(err) {
return err
}
}
}
log.L().Info("worker is closed, handleRelayStage will quit now")
return nil
}

// operateRelayStage returns RelayOp.String() additionally to record metrics
Expand Down
5 changes: 4 additions & 1 deletion pkg/ha/bound.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,10 @@ func WatchSourceBound(ctx context.Context, cli *clientv3.Client,
select {
case <-ctx.Done():
return
case resp := <-ch:
case resp, ok := <-ch:
if !ok {
return
}
if resp.Canceled {
// TODO(csuzhangxc): do retry here.
if resp.Err() != nil {
Expand Down
5 changes: 4 additions & 1 deletion pkg/ha/keepalive.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,10 @@ func WatchWorkerEvent(ctx context.Context, cli *clientv3.Client, rev int64, outC
case <-ctx.Done():
log.L().Info("watch keepalive worker quit due to context canceled")
return
case resp := <-ch:
case resp, ok := <-ch:
if !ok {
return
}
if resp.Canceled {
select {
case errCh <- resp.Err():
Expand Down
5 changes: 4 additions & 1 deletion pkg/ha/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,10 @@ func watchStage(ctx context.Context, watchCh clientv3.WatchChan,
select {
case <-ctx.Done():
return
case resp := <-watchCh:
case resp, ok := <-watchCh:
if !ok {
return
}
if resp.Canceled {
// TODO(csuzhangxc): do retry here.
if resp.Err() != nil {
Expand Down
5 changes: 4 additions & 1 deletion pkg/shardddl/optimism/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,10 @@ func WatchInfo(ctx context.Context, cli *clientv3.Client, revision int64,
select {
case <-ctx.Done():
return
case resp := <-ch:
case resp, ok := <-ch:
if !ok {
return
}
if resp.Canceled {
select {
case errCh <- resp.Err():
Expand Down
5 changes: 4 additions & 1 deletion pkg/shardddl/optimism/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,10 @@ func WatchOperationPut(ctx context.Context, cli *clientv3.Client,
select {
case <-ctx.Done():
return
case resp := <-ch:
case resp, ok := <-ch:
if !ok {
return
}
if resp.Canceled {
select {
case errCh <- resp.Err():
Expand Down
5 changes: 4 additions & 1 deletion pkg/shardddl/optimism/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,10 @@ func WatchSourceTables(ctx context.Context, cli *clientv3.Client, revision int64
select {
case <-ctx.Done():
return
case resp := <-ch:
case resp, ok := <-ch:
if !ok {
return
}
if resp.Canceled {
select {
case errCh <- resp.Err():
Expand Down
5 changes: 4 additions & 1 deletion pkg/shardddl/pessimism/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,10 @@ func WatchInfoPut(ctx context.Context, cli *clientv3.Client, revision int64, out
select {
case <-ctx.Done():
return
case resp := <-ch:
case resp, ok := <-ch:
if !ok {
return
}
if resp.Canceled {
select {
case errCh <- resp.Err():
Expand Down
5 changes: 4 additions & 1 deletion pkg/shardddl/pessimism/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,10 @@ func watchOperation(ctx context.Context, cli *clientv3.Client, watchType mvccpb.
select {
case <-ctx.Done():
return
case resp := <-ch:
case resp, ok := <-ch:
if !ok {
return
}
if resp.Canceled {
select {
case errCh <- resp.Err():
Expand Down