diff --git a/clientv3/client.go b/clientv3/client.go index 95afd1f6623..00d621ffe8b 100644 --- a/clientv3/client.go +++ b/clientv3/client.go @@ -527,6 +527,20 @@ func isHaltErr(ctx context.Context, err error) bool { return ev.Code() != codes.Unavailable && ev.Code() != codes.Internal } +// isUnavailableErr returns true if the given error is an unavailable error +func isUnavailableErr(ctx context.Context, err error) bool { + if ctx != nil && ctx.Err() != nil { + return false + } + if err == nil { + return false + } + ev, _ := status.FromError(err) + // Unavailable codes mean the system will be right back. + // (e.g., can't connect, lost leader) + return ev.Code() == codes.Unavailable +} + func toErr(ctx context.Context, err error) error { if err == nil { return nil diff --git a/clientv3/watch.go b/clientv3/watch.go index cd84095edfb..39d149c530e 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -830,10 +830,13 @@ func (w *watchGrpcStream) joinSubstreams() { } } +var maxBackoff = 100 * time.Millisecond + // openWatchClient retries opening a watch client until success or halt. // manually retry in case "ws==nil && err==nil" // TODO: remove FailFast=false func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) { + backoff := time.Millisecond for { select { case <-w.ctx.Done(): @@ -849,6 +852,17 @@ func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) if isHaltErr(w.ctx, err) { return nil, v3rpc.Error(err) } + if isUnavailableErr(w.ctx, err) { + // retry, but backoff + if backoff < maxBackoff { + // 25% backoff factor + backoff = backoff + backoff/4 + if backoff > maxBackoff { + backoff = maxBackoff + } + } + time.Sleep(backoff) + } } return ws, nil }