From 0b80d66897ccc8986297370c1f457c45ed1d0025 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Tue, 9 Aug 2016 19:28:16 -0700 Subject: [PATCH] clientv3: handle watchGrpcStream shutdown if prior to goroutine start Fixes #6141 --- clientv3/watch.go | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/clientv3/watch.go b/clientv3/watch.go index ef4aa5304e7b..25e6c47719f1 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -268,6 +268,13 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch case reqc <- wr: ok = true case <-wr.ctx.Done(): + // tear down stream if this request is the only one using it + wgs.mu.Lock() + if len(wgs.streams) == 0 && wgs.stopc != nil { + close(wgs.stopc) + wgs.stopc = nil + } + wgs.mu.Unlock() case <-donec: if wgs.closeErr != nil { closeCh <- WatchResponse{closeErr: wgs.closeErr} @@ -408,6 +415,14 @@ func (w *watchGrpcStream) run() { w.cancel() }() + // already stopped? + w.mu.RLock() + stopc := w.stopc + w.mu.RUnlock() + if stopc == nil { + return + } + // start a stream with the etcd grpc server if wc, closeErr = w.newWatchClient(); closeErr != nil { return @@ -415,7 +430,6 @@ func (w *watchGrpcStream) run() { var pendingReq, failedReq *watchRequest curReqC := w.reqc - stopc := w.stopc cancelSet := make(map[int64]struct{}) for {