Skip to content

Commit

Permalink
clientv3: handle watchGrpcStream shutdown if prior to goroutine start
Browse files Browse the repository at this point in the history
Fixes #6141
  • Loading branch information
Anthony Romano committed Aug 10, 2016
1 parent 88a77f3 commit 0b80d66
Showing 1 changed file with 15 additions and 1 deletion.
16 changes: 15 additions & 1 deletion clientv3/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,13 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch
case reqc <- wr:
ok = true
case <-wr.ctx.Done():
// tear down stream if this request is the only one using it
wgs.mu.Lock()
if len(wgs.streams) == 0 && wgs.stopc != nil {
close(wgs.stopc)
wgs.stopc = nil
}
wgs.mu.Unlock()
case <-donec:
if wgs.closeErr != nil {
closeCh <- WatchResponse{closeErr: wgs.closeErr}
Expand Down Expand Up @@ -408,14 +415,21 @@ func (w *watchGrpcStream) run() {
w.cancel()
}()

// already stopped?
w.mu.RLock()
stopc := w.stopc
w.mu.RUnlock()
if stopc == nil {
return
}

// start a stream with the etcd grpc server
if wc, closeErr = w.newWatchClient(); closeErr != nil {
return
}

var pendingReq, failedReq *watchRequest
curReqC := w.reqc
stopc := w.stopc
cancelSet := make(map[int64]struct{})

for {
Expand Down

0 comments on commit 0b80d66

Please sign in to comment.