diff --git a/clientv3/balancer.go b/clientv3/balancer.go index cf7419b54d1..51414e10bf8 100644 --- a/clientv3/balancer.go +++ b/clientv3/balancer.go @@ -152,7 +152,23 @@ func (b *simpleBalancer) pinned() string { return b.pinAddr } -func (b *simpleBalancer) endpointError(addr string, err error) { return } +func (b *simpleBalancer) endpointError(addr string, err error) { + if addr == "" { + return + } + b.mu.Lock() + if b.pinAddr == "" || b.pinAddr != addr { + b.mu.Unlock() + return + } + b.upc = make(chan struct{}) + close(b.downc) + b.pinAddr = "" + b.mu.Unlock() + if logger.V(4) { + logger.Infof("clientv3/balancer: unpin %s (%v)", addr, err) + } +} func getHost2ep(eps []string) map[string]string { hm := make(map[string]string, len(eps)) @@ -329,16 +345,7 @@ func (b *simpleBalancer) up(addr grpc.Address) (func(error), bool) { } // notify client that a connection is up b.readyOnce.Do(func() { close(b.readyc) }) - return func(err error) { - b.mu.Lock() - b.upc = make(chan struct{}) - close(b.downc) - b.pinAddr = "" - b.mu.Unlock() - if logger.V(4) { - logger.Infof("clientv3/balancer: unpin %s (%v)", addr.Addr, err) - } - }, true + return func(err error) { b.endpointError(addr.Addr, err) }, true } func (b *simpleBalancer) Get(ctx context.Context, opts grpc.BalancerGetOptions) (grpc.Address, func(), error) { diff --git a/clientv3/health_balancer.go b/clientv3/health_balancer.go index dd4958c8084..93049cc15d2 100644 --- a/clientv3/health_balancer.go +++ b/clientv3/health_balancer.go @@ -185,6 +185,7 @@ func (hb *healthBalancer) endpointError(addr string, err error) { if logger.V(4) { logger.Infof("clientv3/health-balancer: marking %s as unhealthy (%v)", addr, err) } + hb.balancer.endpointError(addr, err) } func (hb *healthBalancer) mayPin(addr grpc.Address) bool { diff --git a/clientv3/integration/kv_test.go b/clientv3/integration/kv_test.go index 62d915a858f..161fb2525f0 100644 --- a/clientv3/integration/kv_test.go +++ b/clientv3/integration/kv_test.go @@ -473,8 +473,8 @@ func TestKVNewAfterClose(t *testing.T) { donec := make(chan struct{}) go func() { - if _, err := cli.Get(context.TODO(), "foo"); err != context.Canceled { - t.Fatalf("expected %v, got %v", context.Canceled, err) + if _, err := cli.Get(context.TODO(), "foo"); err != context.Canceled && err != grpc.ErrClientConnClosing { + t.Fatalf("expected %v or %v, got %v", context.Canceled, grpc.ErrClientConnClosing, err) } close(donec) }() diff --git a/clientv3/retry.go b/clientv3/retry.go index d33fff92d98..712239e61ae 100644 --- a/clientv3/retry.go +++ b/clientv3/retry.go @@ -66,15 +66,17 @@ func (c *Client) newRetryWrapper(isStop retryStopErrFunc) retryRpcFunc { if logger.V(4) { logger.Infof("clientv3/retry: error %v on pinned endpoint %s", err, pinned) } + // do not switch endpoint when server is stopped + // (should exit on non-transient error) + if isStop(err) { + return err + } // mark this before endpoint switch is triggered c.balancer.endpointError(pinned, err) notify := c.balancer.ConnectNotify() if s, ok := status.FromError(err); ok && s.Code() == codes.Unavailable { c.balancer.next() } - if isStop(err) { - return err - } select { case <-notify: case <-rpcCtx.Done():