Skip to content

Commit

Permalink
Merge pull request #12189 from jingyih/automated-cherry-pick-of-#11452-
Browse files Browse the repository at this point in the history
…#12187-upstream-release-3.4

Automated cherry pick of #11452 #12187 on release 3.4
  • Loading branch information
gyuho authored Aug 14, 2020
2 parents 0080741 + 6fcab5a commit 8a4afdb
Showing 1 changed file with 37 additions and 8 deletions.
45 changes: 37 additions & 8 deletions clientv3/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ type watcher struct {

// streams holds all the active grpc streams keyed by ctx value.
streams map[string]*watchGrpcStream
lg *zap.Logger
}

// watchGrpcStream tracks all watch resources attached to a single grpc stream.
Expand Down Expand Up @@ -177,6 +178,8 @@ type watchGrpcStream struct {
resumec chan struct{}
// closeErr is the error that closed the watch stream
closeErr error

lg *zap.Logger
}

// watchStreamRequest is a union of the supported watch request operation types
Expand Down Expand Up @@ -243,6 +246,7 @@ func NewWatchFromWatchClient(wc pb.WatchClient, c *Client) Watcher {
}
if c != nil {
w.callOpts = c.callOpts
w.lg = c.lg
}
return w
}
Expand Down Expand Up @@ -274,6 +278,7 @@ func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
errc: make(chan error, 1),
closingc: make(chan *watcherStream),
resumec: make(chan struct{}),
lg: w.lg,
}
go wgs.run()
return wgs
Expand Down Expand Up @@ -545,10 +550,18 @@ func (w *watchGrpcStream) run() {
w.resuming = append(w.resuming, ws)
if len(w.resuming) == 1 {
// head of resume queue, can register a new watcher
wc.Send(ws.initReq.toPB())
if err := wc.Send(ws.initReq.toPB()); err != nil {
if w.lg != nil {
w.lg.Debug("error when sending request", zap.Error(err))
}
}
}
case *progressRequest:
wc.Send(wreq.toPB())
if err := wc.Send(wreq.toPB()); err != nil {
if w.lg != nil {
w.lg.Debug("error when sending request", zap.Error(err))
}
}
}

// new events from the watch client
Expand All @@ -572,7 +585,11 @@ func (w *watchGrpcStream) run() {
}

if ws := w.nextResume(); ws != nil {
wc.Send(ws.initReq.toPB())
if err := wc.Send(ws.initReq.toPB()); err != nil {
if w.lg != nil {
w.lg.Debug("error when sending request", zap.Error(err))
}
}
}

// reset for next iteration
Expand Down Expand Up @@ -617,9 +634,13 @@ func (w *watchGrpcStream) run() {
},
}
req := &pb.WatchRequest{RequestUnion: cr}
lg.Info("sending watch cancel request for failed dispatch", zap.Int64("watch-id", pbresp.WatchId))
if w.lg != nil {
w.lg.Debug("sending watch cancel request for failed dispatch", zap.Int64("watch-id", pbresp.WatchId))
}
if err := wc.Send(req); err != nil {
lg.Warning("failed to send watch cancel request", zap.Int64("watch-id", pbresp.WatchId), zap.Error(err))
if w.lg != nil {
w.lg.Debug("failed to send watch cancel request", zap.Int64("watch-id", pbresp.WatchId), zap.Error(err))
}
}
}

Expand All @@ -633,7 +654,11 @@ func (w *watchGrpcStream) run() {
return
}
if ws := w.nextResume(); ws != nil {
wc.Send(ws.initReq.toPB())
if err := wc.Send(ws.initReq.toPB()); err != nil {
if w.lg != nil {
w.lg.Debug("error when sending request", zap.Error(err))
}
}
}
cancelSet = make(map[int64]struct{})

Expand All @@ -651,9 +676,13 @@ func (w *watchGrpcStream) run() {
},
}
req := &pb.WatchRequest{RequestUnion: cr}
lg.Info("sending watch cancel request for closed watcher", zap.Int64("watch-id", ws.id))
if w.lg != nil {
w.lg.Debug("sending watch cancel request for closed watcher", zap.Int64("watch-id", ws.id))
}
if err := wc.Send(req); err != nil {
lg.Warning("failed to send watch cancel request", zap.Int64("watch-id", ws.id), zap.Error(err))
if w.lg != nil {
w.lg.Debug("failed to send watch cancel request", zap.Int64("watch-id", ws.id), zap.Error(err))
}
}
}
w.closeSubstream(ws)
Expand Down

0 comments on commit 8a4afdb

Please sign in to comment.