diff --git a/clientv3/watch.go b/clientv3/watch.go index 4ae3a0b33fb..66e16ad63fe 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -141,6 +141,7 @@ type watcher struct { // streams holds all the active grpc streams keyed by ctx value. streams map[string]*watchGrpcStream + lg *zap.Logger } // watchGrpcStream tracks all watch resources attached to a single grpc stream. @@ -177,6 +178,8 @@ type watchGrpcStream struct { resumec chan struct{} // closeErr is the error that closed the watch stream closeErr error + + lg *zap.Logger } // watchStreamRequest is a union of the supported watch request operation types @@ -243,6 +246,7 @@ func NewWatchFromWatchClient(wc pb.WatchClient, c *Client) Watcher { } if c != nil { w.callOpts = c.callOpts + w.lg = c.lg } return w } @@ -274,6 +278,7 @@ func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream { errc: make(chan error, 1), closingc: make(chan *watcherStream), resumec: make(chan struct{}), + lg: w.lg, } go wgs.run() return wgs @@ -545,10 +550,18 @@ func (w *watchGrpcStream) run() { w.resuming = append(w.resuming, ws) if len(w.resuming) == 1 { // head of resume queue, can register a new watcher - wc.Send(ws.initReq.toPB()) + if err := wc.Send(ws.initReq.toPB()); err != nil { + if w.lg != nil { + w.lg.Debug("error when sending request", zap.Error(err)) + } + } } case *progressRequest: - wc.Send(wreq.toPB()) + if err := wc.Send(wreq.toPB()); err != nil { + if w.lg != nil { + w.lg.Debug("error when sending request", zap.Error(err)) + } + } } // new events from the watch client @@ -572,7 +585,11 @@ func (w *watchGrpcStream) run() { } if ws := w.nextResume(); ws != nil { - wc.Send(ws.initReq.toPB()) + if err := wc.Send(ws.initReq.toPB()); err != nil { + if w.lg != nil { + w.lg.Debug("error when sending request", zap.Error(err)) + } + } } // reset for next iteration @@ -617,9 +634,13 @@ func (w *watchGrpcStream) run() { }, } req := &pb.WatchRequest{RequestUnion: cr} - lg.Info("sending watch cancel request for failed dispatch", zap.Int64("watch-id", pbresp.WatchId)) + if w.lg != nil { + w.lg.Debug("sending watch cancel request for failed dispatch", zap.Int64("watch-id", pbresp.WatchId)) + } if err := wc.Send(req); err != nil { - lg.Warning("failed to send watch cancel request", zap.Int64("watch-id", pbresp.WatchId), zap.Error(err)) + if w.lg != nil { + w.lg.Debug("failed to send watch cancel request", zap.Int64("watch-id", pbresp.WatchId), zap.Error(err)) + } } } @@ -633,7 +654,11 @@ func (w *watchGrpcStream) run() { return } if ws := w.nextResume(); ws != nil { - wc.Send(ws.initReq.toPB()) + if err := wc.Send(ws.initReq.toPB()); err != nil { + if w.lg != nil { + w.lg.Debug("error when sending request", zap.Error(err)) + } + } } cancelSet = make(map[int64]struct{}) @@ -651,9 +676,13 @@ func (w *watchGrpcStream) run() { }, } req := &pb.WatchRequest{RequestUnion: cr} - lg.Info("sending watch cancel request for closed watcher", zap.Int64("watch-id", ws.id)) + if w.lg != nil { + w.lg.Debug("sending watch cancel request for closed watcher", zap.Int64("watch-id", ws.id)) + } if err := wc.Send(req); err != nil { - lg.Warning("failed to send watch cancel request", zap.Int64("watch-id", ws.id), zap.Error(err)) + if w.lg != nil { + w.lg.Debug("failed to send watch cancel request", zap.Int64("watch-id", ws.id), zap.Error(err)) + } } } w.closeSubstream(ws)