diff --git a/clientv3/watch.go b/clientv3/watch.go index ef4aa5304e7b..25e6c47719f1 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -268,6 +268,13 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch case reqc <- wr: ok = true case <-wr.ctx.Done(): + // tear down stream if this request is the only one using it + wgs.mu.Lock() + if len(wgs.streams) == 0 && wgs.stopc != nil { + close(wgs.stopc) + wgs.stopc = nil + } + wgs.mu.Unlock() case <-donec: if wgs.closeErr != nil { closeCh <- WatchResponse{closeErr: wgs.closeErr} @@ -408,6 +415,14 @@ func (w *watchGrpcStream) run() { w.cancel() }() + // already stopped? + w.mu.RLock() + stopc := w.stopc + w.mu.RUnlock() + if stopc == nil { + return + } + // start a stream with the etcd grpc server if wc, closeErr = w.newWatchClient(); closeErr != nil { return @@ -415,7 +430,6 @@ func (w *watchGrpcStream) run() { var pendingReq, failedReq *watchRequest curReqC := w.reqc - stopc := w.stopc cancelSet := make(map[int64]struct{}) for {