Skip to content

Commit

Permalink
Merge pull request #9840 from liggitt/client-hotloop
Browse files Browse the repository at this point in the history
Backoff on reestablishing watches when Unavailable errors are encountered
  • Loading branch information
gyuho authored Jun 14, 2018
2 parents 03cec4a + d1579c9 commit 410d28c
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 0 deletions.
14 changes: 14 additions & 0 deletions clientv3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,20 @@ func isHaltErr(ctx context.Context, err error) bool {
return ev.Code() != codes.Unavailable && ev.Code() != codes.Internal
}

// isUnavailableErr returns true if the given error is an unavailable error
func isUnavailableErr(ctx context.Context, err error) bool {
if ctx != nil && ctx.Err() != nil {
return false
}
if err == nil {
return false
}
ev, _ := status.FromError(err)
// Unavailable codes mean the system will be right back.
// (e.g., can't connect, lost leader)
return ev.Code() == codes.Unavailable
}

func toErr(ctx context.Context, err error) error {
if err == nil {
return nil
Expand Down
14 changes: 14 additions & 0 deletions clientv3/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -830,10 +830,13 @@ func (w *watchGrpcStream) joinSubstreams() {
}
}

var maxBackoff = 100 * time.Millisecond

// openWatchClient retries opening a watch client until success or halt.
// manually retry in case "ws==nil && err==nil"
// TODO: remove FailFast=false
func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) {
backoff := time.Millisecond
for {
select {
case <-w.ctx.Done():
Expand All @@ -849,6 +852,17 @@ func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error)
if isHaltErr(w.ctx, err) {
return nil, v3rpc.Error(err)
}
if isUnavailableErr(w.ctx, err) {
// retry, but backoff
if backoff < maxBackoff {
// 25% backoff factor
backoff = backoff + backoff/4
if backoff > maxBackoff {
backoff = maxBackoff
}
}
time.Sleep(backoff)
}
}
return ws, nil
}
Expand Down

0 comments on commit 410d28c

Please sign in to comment.