From 4818f23951c879b27bc9aeaf27ffbd4faba4d251 Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Fri, 13 Oct 2017 15:42:35 -0700 Subject: [PATCH 01/17] words: whitelist more Signed-off-by: Gyu-Ho Lee --- .words | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/.words b/.words index 66f5340b465..6dbf8f3ac1f 100644 --- a/.words +++ b/.words @@ -1,7 +1,14 @@ +ConnectionError ErrCodeEnhanceYourCalm +ErrConnClosing +ErrEmptyKey +ErrNoSpace +ErrTimeout +FailFast GoAway RPC RPCs +StreamError backoff blackholed cancelable @@ -9,6 +16,7 @@ cancelation cluster_proxy defragment defragmenting +downErr etcd gRPC goroutine @@ -25,9 +33,10 @@ mutex prefetching protobuf serializable +statusError teardown +toRPCErr too_many_pings uncontended unprefixed unlisting - From 1a12a6496210ab9aac9d925533107a48785793c4 Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Sun, 15 Oct 2017 08:09:16 -0700 Subject: [PATCH 02/17] vendor/grpc-go: cherry-pick WriteStatus fix from v1.7.0 Cherry-pick 22c3f92f5faea8db492fb0f5ae4daf0d2752b19e. Signed-off-by: Gyu-Ho Lee --- cmd/vendor/google.golang.org/grpc/transport/handler_server.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/cmd/vendor/google.golang.org/grpc/transport/handler_server.go b/cmd/vendor/google.golang.org/grpc/transport/handler_server.go index 0489fada52e..31a260cfc91 100644 --- a/cmd/vendor/google.golang.org/grpc/transport/handler_server.go +++ b/cmd/vendor/google.golang.org/grpc/transport/handler_server.go @@ -183,6 +183,7 @@ func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) erro ht.mu.Unlock() return nil } + ht.streamDone = true ht.mu.Unlock() err := ht.do(func() { ht.writeCommonHeaders(s) @@ -223,9 +224,6 @@ func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) erro } }) close(ht.writes) - ht.mu.Lock() - ht.streamDone = true - ht.mu.Unlock() return err } From bc45227284d2759d57cecbc5f0e67226985e5fbd Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Sun, 15 Oct 2017 08:18:35 -0700 Subject: [PATCH 03/17] glide: add note to grpc version Signed-off-by: Gyu-Ho Lee --- glide.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/glide.yaml b/glide.yaml index a5cbd20821d..1488aaa6141 100644 --- a/glide.yaml +++ b/glide.yaml @@ -97,7 +97,7 @@ import: subpackages: - rate - package: google.golang.org/grpc - version: v1.6.0 + version: v1.6.0 with 22c3f92f5faea8db492fb0f5ae4daf0d2752b19e (temporary) subpackages: - codes - credentials From a03f0d17fa24071ae371ff3ba474fbcd08c92ba1 Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Mon, 16 Oct 2017 12:56:17 -0700 Subject: [PATCH 04/17] clientv3/health: handle stale endpoints, rename to 'unhealthyHosts' 1. Handle stale endpoints in health balancer. 2. Rename 'unhealthy' to 'unhealthyHosts' to make it clear. 3. Use quote format string to log empty hosts. Signed-off-by: Gyu-Ho Lee --- clientv3/health_balancer.go | 67 +++++++++++++++++++++---------------- 1 file changed, 39 insertions(+), 28 deletions(-) diff --git a/clientv3/health_balancer.go b/clientv3/health_balancer.go index dd4958c8084..d770f8f5335 100644 --- a/clientv3/health_balancer.go +++ b/clientv3/health_balancer.go @@ -48,8 +48,8 @@ type healthBalancer struct { // eps stores all client endpoints eps []string - // unhealthy tracks the last unhealthy time of endpoints. - unhealthy map[string]time.Time + // unhealthyHosts tracks the last unhealthy time of endpoints. + unhealthyHosts map[string]time.Time stopc chan struct{} stopOnce sync.Once @@ -61,13 +61,13 @@ type healthBalancer struct { func newHealthBalancer(b balancer, timeout time.Duration, hc healthCheckFunc) *healthBalancer { hb := &healthBalancer{ - balancer: b, - healthCheck: hc, - eps: b.endpoints(), - addrs: eps2addrs(b.endpoints()), - host2ep: getHost2ep(b.endpoints()), - unhealthy: make(map[string]time.Time), - stopc: make(chan struct{}), + balancer: b, + healthCheck: hc, + eps: b.endpoints(), + addrs: eps2addrs(b.endpoints()), + host2ep: getHost2ep(b.endpoints()), + unhealthyHosts: make(map[string]time.Time), + stopc: make(chan struct{}), } if timeout < minHealthRetryDuration { timeout = minHealthRetryDuration @@ -94,11 +94,11 @@ func (hb *healthBalancer) Up(addr grpc.Address) func(error) { // finding healthy endpoint on retry could take several timeouts and redials. // To avoid wasting retries, gray-list unhealthy endpoints. hb.mu.Lock() - hb.unhealthy[addr.Addr] = time.Now() + hb.unhealthyHosts[addr.Addr] = time.Now() hb.mu.Unlock() f(err) if logger.V(4) { - logger.Infof("clientv3/health-balancer: %s becomes unhealthy (%v)", addr.Addr, err) + logger.Infof("clientv3/health-balancer: %q becomes unhealthy (%v)", addr.Addr, err) } } } @@ -120,7 +120,7 @@ func (hb *healthBalancer) updateAddrs(eps ...string) { addrs, host2ep := eps2addrs(eps), getHost2ep(eps) hb.mu.Lock() hb.addrs, hb.eps, hb.host2ep = addrs, eps, host2ep - hb.unhealthy = make(map[string]time.Time) + hb.unhealthyHosts = make(map[string]time.Time) hb.mu.Unlock() hb.balancer.updateAddrs(eps...) } @@ -142,11 +142,18 @@ func (hb *healthBalancer) updateUnhealthy(timeout time.Duration) { select { case <-time.After(timeout): hb.mu.Lock() - for k, v := range hb.unhealthy { + for k, v := range hb.unhealthyHosts { + if _, ok := hb.host2ep[k]; !ok { + delete(hb.unhealthyHosts, k) + if logger.V(4) { + logger.Infof("clientv3/health-balancer: removes stale endpoint %q from unhealthy", k) + } + continue + } if time.Since(v) > timeout { - delete(hb.unhealthy, k) + delete(hb.unhealthyHosts, k) if logger.V(4) { - logger.Infof("clientv3/health-balancer: removes %s from unhealthy after %v", k, timeout) + logger.Infof("clientv3/health-balancer: removes %q from unhealthy after %v", k, timeout) } } } @@ -166,31 +173,35 @@ func (hb *healthBalancer) liveAddrs() []grpc.Address { hb.mu.RLock() defer hb.mu.RUnlock() hbAddrs := hb.addrs - if len(hb.addrs) == 1 || len(hb.unhealthy) == 0 || len(hb.unhealthy) == len(hb.addrs) { + if len(hb.addrs) == 1 || len(hb.unhealthyHosts) == 0 || len(hb.unhealthyHosts) == len(hb.addrs) { return hbAddrs } - addrs := make([]grpc.Address, 0, len(hb.addrs)-len(hb.unhealthy)) + addrs := make([]grpc.Address, 0, len(hb.addrs)-len(hb.unhealthyHosts)) for _, addr := range hb.addrs { - if _, unhealthy := hb.unhealthy[addr.Addr]; !unhealthy { + if _, unhealthy := hb.unhealthyHosts[addr.Addr]; !unhealthy { addrs = append(addrs, addr) } } return addrs } -func (hb *healthBalancer) endpointError(addr string, err error) { +func (hb *healthBalancer) endpointError(host string, err error) { hb.mu.Lock() - hb.unhealthy[addr] = time.Now() + hb.unhealthyHosts[host] = time.Now() hb.mu.Unlock() if logger.V(4) { - logger.Infof("clientv3/health-balancer: marking %s as unhealthy (%v)", addr, err) + logger.Infof("clientv3/health-balancer: marking %q as unhealthy (%v)", host, err) } } func (hb *healthBalancer) mayPin(addr grpc.Address) bool { hb.mu.RLock() - skip := len(hb.addrs) == 1 || len(hb.unhealthy) == 0 || len(hb.addrs) == len(hb.unhealthy) - failedTime, bad := hb.unhealthy[addr.Addr] + if _, ok := hb.host2ep[addr.Addr]; !ok { + hb.mu.RUnlock() + return false + } + skip := len(hb.addrs) == 1 || len(hb.unhealthyHosts) == 0 || len(hb.addrs) == len(hb.unhealthyHosts) + failedTime, bad := hb.unhealthyHosts[addr.Addr] dur := hb.healthCheckTimeout hb.mu.RUnlock() if skip || !bad { @@ -203,24 +214,24 @@ func (hb *healthBalancer) mayPin(addr grpc.Address) bool { // instead, return before grpc-healthcheck if failed within healthcheck timeout if elapsed := time.Since(failedTime); elapsed < dur { if logger.V(4) { - logger.Infof("clientv3/health-balancer: %s is up but not pinned (failed %v ago, require minimum %v after failure)", addr.Addr, elapsed, dur) + logger.Infof("clientv3/health-balancer: %q is up but not pinned (failed %v ago, require minimum %v after failure)", addr.Addr, elapsed, dur) } return false } if ok, _ := hb.healthCheck(addr.Addr); ok { hb.mu.Lock() - delete(hb.unhealthy, addr.Addr) + delete(hb.unhealthyHosts, addr.Addr) hb.mu.Unlock() if logger.V(4) { - logger.Infof("clientv3/health-balancer: %s is healthy (health check success)", addr.Addr) + logger.Infof("clientv3/health-balancer: %q is healthy (health check success)", addr.Addr) } return true } hb.mu.Lock() - hb.unhealthy[addr.Addr] = time.Now() + hb.unhealthyHosts[addr.Addr] = time.Now() hb.mu.Unlock() if logger.V(4) { - logger.Infof("clientv3/health-balancer: %s becomes unhealthy (health check failed)", addr.Addr) + logger.Infof("clientv3/health-balancer: %q becomes unhealthy (health check failed)", addr.Addr) } return false } From 5351ae2b9a881fd4985dbc8a18ef21971ceba34b Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Sat, 14 Oct 2017 13:51:57 -0700 Subject: [PATCH 05/17] clientv3: clean up logging, variable names Signed-off-by: Gyu-Ho Lee --- clientv3/balancer.go | 12 ++++--- clientv3/health_balancer.go | 4 +-- clientv3/retry.go | 67 +++++++++++++++++++------------------ 3 files changed, 44 insertions(+), 39 deletions(-) diff --git a/clientv3/balancer.go b/clientv3/balancer.go index cf7419b54d1..8c581c75bb1 100644 --- a/clientv3/balancer.go +++ b/clientv3/balancer.go @@ -45,7 +45,7 @@ type balancer interface { // pinned returns the current pinned endpoint. pinned() string // endpointError handles error from server-side. - endpointError(addr string, err error) + endpointError(host string, err error) // up is Up but includes whether the balancer will use the connection. up(addr grpc.Address) (func(error), bool) @@ -152,7 +152,9 @@ func (b *simpleBalancer) pinned() string { return b.pinAddr } -func (b *simpleBalancer) endpointError(addr string, err error) { return } +func (b *simpleBalancer) endpointError(host string, err error) { + panic("'endpointError' not implemented") +} func getHost2ep(eps []string) map[string]string { hm := make(map[string]string, len(eps)) @@ -316,7 +318,7 @@ func (b *simpleBalancer) up(addr grpc.Address) (func(error), bool) { } if b.pinAddr != "" { if logger.V(4) { - logger.Infof("clientv3/balancer: %s is up but not pinned (already pinned %s)", addr.Addr, b.pinAddr) + logger.Infof("clientv3/balancer: %q is up but not pinned (already pinned %q)", addr.Addr, b.pinAddr) } return func(err error) {}, false } @@ -325,7 +327,7 @@ func (b *simpleBalancer) up(addr grpc.Address) (func(error), bool) { b.downc = make(chan struct{}) b.pinAddr = addr.Addr if logger.V(4) { - logger.Infof("clientv3/balancer: pin %s", addr.Addr) + logger.Infof("clientv3/balancer: pin %q", addr.Addr) } // notify client that a connection is up b.readyOnce.Do(func() { close(b.readyc) }) @@ -336,7 +338,7 @@ func (b *simpleBalancer) up(addr grpc.Address) (func(error), bool) { b.pinAddr = "" b.mu.Unlock() if logger.V(4) { - logger.Infof("clientv3/balancer: unpin %s (%v)", addr.Addr, err) + logger.Infof("clientv3/balancer: unpin %q (%q)", addr.Addr, err.Error()) } }, true } diff --git a/clientv3/health_balancer.go b/clientv3/health_balancer.go index d770f8f5335..8e80d2495c7 100644 --- a/clientv3/health_balancer.go +++ b/clientv3/health_balancer.go @@ -98,7 +98,7 @@ func (hb *healthBalancer) Up(addr grpc.Address) func(error) { hb.mu.Unlock() f(err) if logger.V(4) { - logger.Infof("clientv3/health-balancer: %q becomes unhealthy (%v)", addr.Addr, err) + logger.Infof("clientv3/health-balancer: %q becomes unhealthy (%q)", addr.Addr, err.Error()) } } } @@ -190,7 +190,7 @@ func (hb *healthBalancer) endpointError(host string, err error) { hb.unhealthyHosts[host] = time.Now() hb.mu.Unlock() if logger.V(4) { - logger.Infof("clientv3/health-balancer: marking %q as unhealthy (%v)", host, err) + logger.Infof("clientv3/health-balancer: marking %q as unhealthy (%q)", host, err.Error()) } } diff --git a/clientv3/retry.go b/clientv3/retry.go index d33fff92d98..d51fb0a8342 100644 --- a/clientv3/retry.go +++ b/clientv3/retry.go @@ -26,7 +26,7 @@ import ( ) type rpcFunc func(ctx context.Context) error -type retryRpcFunc func(context.Context, rpcFunc) error +type retryRPCFunc func(context.Context, rpcFunc) error type retryStopErrFunc func(error) bool func isReadStopError(err error) bool { @@ -48,7 +48,7 @@ func isWriteStopError(err error) bool { return rpctypes.ErrorDesc(err) != "there is no address available" } -func (c *Client) newRetryWrapper(isStop retryStopErrFunc) retryRpcFunc { +func (c *Client) newRetryWrapper(isStop retryStopErrFunc) retryRPCFunc { return func(rpcCtx context.Context, f rpcFunc) error { for { select { @@ -64,7 +64,7 @@ func (c *Client) newRetryWrapper(isStop retryStopErrFunc) retryRpcFunc { return nil } if logger.V(4) { - logger.Infof("clientv3/retry: error %v on pinned endpoint %s", err, pinned) + logger.Infof("clientv3/retry: error %q on pinned endpoint %q", err.Error(), pinned) } // mark this before endpoint switch is triggered c.balancer.endpointError(pinned, err) @@ -86,7 +86,7 @@ func (c *Client) newRetryWrapper(isStop retryStopErrFunc) retryRpcFunc { } } -func (c *Client) newAuthRetryWrapper() retryRpcFunc { +func (c *Client) newAuthRetryWrapper() retryRPCFunc { return func(rpcCtx context.Context, f rpcFunc) error { for { pinned := c.balancer.pinned() @@ -94,15 +94,18 @@ func (c *Client) newAuthRetryWrapper() retryRpcFunc { if err == nil { return nil } - if logger.V(4) { - logger.Infof("clientv3/auth-retry: error %v on pinned endpoint %s", err, pinned) - } // always stop retry on etcd errors other than invalid auth token if rpctypes.Error(err) == rpctypes.ErrInvalidAuthToken { gterr := c.getToken(rpcCtx) if gterr != nil { + if logger.V(4) { + logger.Infof("clientv3/auth-retry: error %q(%q) on pinned endpoint %q (returning)", err.Error(), gterr.Error(), pinned) + } return err // return the original error for simplicity } + if logger.V(4) { + logger.Infof("clientv3/auth-retry: error %q on pinned endpoint %q (retrying)", err, pinned) + } continue } return err @@ -124,7 +127,7 @@ func RetryKVClient(c *Client) pb.KVClient { type retryKVClient struct { *retryWriteKVClient - readRetry retryRpcFunc + readRetry retryRPCFunc } func (rkv *retryKVClient) Range(ctx context.Context, in *pb.RangeRequest, opts ...grpc.CallOption) (resp *pb.RangeResponse, err error) { @@ -137,11 +140,11 @@ func (rkv *retryKVClient) Range(ctx context.Context, in *pb.RangeRequest, opts . type retryWriteKVClient struct { pb.KVClient - retryf retryRpcFunc + writeRetry retryRPCFunc } func (rkv *retryWriteKVClient) Put(ctx context.Context, in *pb.PutRequest, opts ...grpc.CallOption) (resp *pb.PutResponse, err error) { - err = rkv.retryf(ctx, func(rctx context.Context) error { + err = rkv.writeRetry(ctx, func(rctx context.Context) error { resp, err = rkv.KVClient.Put(rctx, in, opts...) return err }) @@ -149,7 +152,7 @@ func (rkv *retryWriteKVClient) Put(ctx context.Context, in *pb.PutRequest, opts } func (rkv *retryWriteKVClient) DeleteRange(ctx context.Context, in *pb.DeleteRangeRequest, opts ...grpc.CallOption) (resp *pb.DeleteRangeResponse, err error) { - err = rkv.retryf(ctx, func(rctx context.Context) error { + err = rkv.writeRetry(ctx, func(rctx context.Context) error { resp, err = rkv.KVClient.DeleteRange(rctx, in, opts...) return err }) @@ -157,7 +160,7 @@ func (rkv *retryWriteKVClient) DeleteRange(ctx context.Context, in *pb.DeleteRan } func (rkv *retryWriteKVClient) Txn(ctx context.Context, in *pb.TxnRequest, opts ...grpc.CallOption) (resp *pb.TxnResponse, err error) { - err = rkv.retryf(ctx, func(rctx context.Context) error { + err = rkv.writeRetry(ctx, func(rctx context.Context) error { resp, err = rkv.KVClient.Txn(rctx, in, opts...) return err }) @@ -165,7 +168,7 @@ func (rkv *retryWriteKVClient) Txn(ctx context.Context, in *pb.TxnRequest, opts } func (rkv *retryWriteKVClient) Compact(ctx context.Context, in *pb.CompactionRequest, opts ...grpc.CallOption) (resp *pb.CompactionResponse, err error) { - err = rkv.retryf(ctx, func(rctx context.Context) error { + err = rkv.writeRetry(ctx, func(rctx context.Context) error { resp, err = rkv.KVClient.Compact(rctx, in, opts...) return err }) @@ -174,7 +177,7 @@ func (rkv *retryWriteKVClient) Compact(ctx context.Context, in *pb.CompactionReq type retryLeaseClient struct { pb.LeaseClient - retryf retryRpcFunc + readRetry retryRPCFunc } // RetryLeaseClient implements a LeaseClient that uses the client's FailFast retry policy. @@ -187,7 +190,7 @@ func RetryLeaseClient(c *Client) pb.LeaseClient { } func (rlc *retryLeaseClient) LeaseGrant(ctx context.Context, in *pb.LeaseGrantRequest, opts ...grpc.CallOption) (resp *pb.LeaseGrantResponse, err error) { - err = rlc.retryf(ctx, func(rctx context.Context) error { + err = rlc.readRetry(ctx, func(rctx context.Context) error { resp, err = rlc.LeaseClient.LeaseGrant(rctx, in, opts...) return err }) @@ -196,7 +199,7 @@ func (rlc *retryLeaseClient) LeaseGrant(ctx context.Context, in *pb.LeaseGrantRe } func (rlc *retryLeaseClient) LeaseRevoke(ctx context.Context, in *pb.LeaseRevokeRequest, opts ...grpc.CallOption) (resp *pb.LeaseRevokeResponse, err error) { - err = rlc.retryf(ctx, func(rctx context.Context) error { + err = rlc.readRetry(ctx, func(rctx context.Context) error { resp, err = rlc.LeaseClient.LeaseRevoke(rctx, in, opts...) return err }) @@ -205,7 +208,7 @@ func (rlc *retryLeaseClient) LeaseRevoke(ctx context.Context, in *pb.LeaseRevoke type retryClusterClient struct { pb.ClusterClient - retryf retryRpcFunc + writeRetry retryRPCFunc } // RetryClusterClient implements a ClusterClient that uses the client's FailFast retry policy. @@ -214,7 +217,7 @@ func RetryClusterClient(c *Client) pb.ClusterClient { } func (rcc *retryClusterClient) MemberAdd(ctx context.Context, in *pb.MemberAddRequest, opts ...grpc.CallOption) (resp *pb.MemberAddResponse, err error) { - err = rcc.retryf(ctx, func(rctx context.Context) error { + err = rcc.writeRetry(ctx, func(rctx context.Context) error { resp, err = rcc.ClusterClient.MemberAdd(rctx, in, opts...) return err }) @@ -222,7 +225,7 @@ func (rcc *retryClusterClient) MemberAdd(ctx context.Context, in *pb.MemberAddRe } func (rcc *retryClusterClient) MemberRemove(ctx context.Context, in *pb.MemberRemoveRequest, opts ...grpc.CallOption) (resp *pb.MemberRemoveResponse, err error) { - err = rcc.retryf(ctx, func(rctx context.Context) error { + err = rcc.writeRetry(ctx, func(rctx context.Context) error { resp, err = rcc.ClusterClient.MemberRemove(rctx, in, opts...) return err }) @@ -230,7 +233,7 @@ func (rcc *retryClusterClient) MemberRemove(ctx context.Context, in *pb.MemberRe } func (rcc *retryClusterClient) MemberUpdate(ctx context.Context, in *pb.MemberUpdateRequest, opts ...grpc.CallOption) (resp *pb.MemberUpdateResponse, err error) { - err = rcc.retryf(ctx, func(rctx context.Context) error { + err = rcc.writeRetry(ctx, func(rctx context.Context) error { resp, err = rcc.ClusterClient.MemberUpdate(rctx, in, opts...) return err }) @@ -239,7 +242,7 @@ func (rcc *retryClusterClient) MemberUpdate(ctx context.Context, in *pb.MemberUp type retryAuthClient struct { pb.AuthClient - retryf retryRpcFunc + writeRetry retryRPCFunc } // RetryAuthClient implements a AuthClient that uses the client's FailFast retry policy. @@ -248,7 +251,7 @@ func RetryAuthClient(c *Client) pb.AuthClient { } func (rac *retryAuthClient) AuthEnable(ctx context.Context, in *pb.AuthEnableRequest, opts ...grpc.CallOption) (resp *pb.AuthEnableResponse, err error) { - err = rac.retryf(ctx, func(rctx context.Context) error { + err = rac.writeRetry(ctx, func(rctx context.Context) error { resp, err = rac.AuthClient.AuthEnable(rctx, in, opts...) return err }) @@ -256,7 +259,7 @@ func (rac *retryAuthClient) AuthEnable(ctx context.Context, in *pb.AuthEnableReq } func (rac *retryAuthClient) AuthDisable(ctx context.Context, in *pb.AuthDisableRequest, opts ...grpc.CallOption) (resp *pb.AuthDisableResponse, err error) { - err = rac.retryf(ctx, func(rctx context.Context) error { + err = rac.writeRetry(ctx, func(rctx context.Context) error { resp, err = rac.AuthClient.AuthDisable(rctx, in, opts...) return err }) @@ -264,7 +267,7 @@ func (rac *retryAuthClient) AuthDisable(ctx context.Context, in *pb.AuthDisableR } func (rac *retryAuthClient) UserAdd(ctx context.Context, in *pb.AuthUserAddRequest, opts ...grpc.CallOption) (resp *pb.AuthUserAddResponse, err error) { - err = rac.retryf(ctx, func(rctx context.Context) error { + err = rac.writeRetry(ctx, func(rctx context.Context) error { resp, err = rac.AuthClient.UserAdd(rctx, in, opts...) return err }) @@ -272,7 +275,7 @@ func (rac *retryAuthClient) UserAdd(ctx context.Context, in *pb.AuthUserAddReque } func (rac *retryAuthClient) UserDelete(ctx context.Context, in *pb.AuthUserDeleteRequest, opts ...grpc.CallOption) (resp *pb.AuthUserDeleteResponse, err error) { - err = rac.retryf(ctx, func(rctx context.Context) error { + err = rac.writeRetry(ctx, func(rctx context.Context) error { resp, err = rac.AuthClient.UserDelete(rctx, in, opts...) return err }) @@ -280,7 +283,7 @@ func (rac *retryAuthClient) UserDelete(ctx context.Context, in *pb.AuthUserDelet } func (rac *retryAuthClient) UserChangePassword(ctx context.Context, in *pb.AuthUserChangePasswordRequest, opts ...grpc.CallOption) (resp *pb.AuthUserChangePasswordResponse, err error) { - err = rac.retryf(ctx, func(rctx context.Context) error { + err = rac.writeRetry(ctx, func(rctx context.Context) error { resp, err = rac.AuthClient.UserChangePassword(rctx, in, opts...) return err }) @@ -288,7 +291,7 @@ func (rac *retryAuthClient) UserChangePassword(ctx context.Context, in *pb.AuthU } func (rac *retryAuthClient) UserGrantRole(ctx context.Context, in *pb.AuthUserGrantRoleRequest, opts ...grpc.CallOption) (resp *pb.AuthUserGrantRoleResponse, err error) { - err = rac.retryf(ctx, func(rctx context.Context) error { + err = rac.writeRetry(ctx, func(rctx context.Context) error { resp, err = rac.AuthClient.UserGrantRole(rctx, in, opts...) return err }) @@ -296,7 +299,7 @@ func (rac *retryAuthClient) UserGrantRole(ctx context.Context, in *pb.AuthUserGr } func (rac *retryAuthClient) UserRevokeRole(ctx context.Context, in *pb.AuthUserRevokeRoleRequest, opts ...grpc.CallOption) (resp *pb.AuthUserRevokeRoleResponse, err error) { - err = rac.retryf(ctx, func(rctx context.Context) error { + err = rac.writeRetry(ctx, func(rctx context.Context) error { resp, err = rac.AuthClient.UserRevokeRole(rctx, in, opts...) return err }) @@ -304,7 +307,7 @@ func (rac *retryAuthClient) UserRevokeRole(ctx context.Context, in *pb.AuthUserR } func (rac *retryAuthClient) RoleAdd(ctx context.Context, in *pb.AuthRoleAddRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleAddResponse, err error) { - err = rac.retryf(ctx, func(rctx context.Context) error { + err = rac.writeRetry(ctx, func(rctx context.Context) error { resp, err = rac.AuthClient.RoleAdd(rctx, in, opts...) return err }) @@ -312,7 +315,7 @@ func (rac *retryAuthClient) RoleAdd(ctx context.Context, in *pb.AuthRoleAddReque } func (rac *retryAuthClient) RoleDelete(ctx context.Context, in *pb.AuthRoleDeleteRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleDeleteResponse, err error) { - err = rac.retryf(ctx, func(rctx context.Context) error { + err = rac.writeRetry(ctx, func(rctx context.Context) error { resp, err = rac.AuthClient.RoleDelete(rctx, in, opts...) return err }) @@ -320,7 +323,7 @@ func (rac *retryAuthClient) RoleDelete(ctx context.Context, in *pb.AuthRoleDelet } func (rac *retryAuthClient) RoleGrantPermission(ctx context.Context, in *pb.AuthRoleGrantPermissionRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleGrantPermissionResponse, err error) { - err = rac.retryf(ctx, func(rctx context.Context) error { + err = rac.writeRetry(ctx, func(rctx context.Context) error { resp, err = rac.AuthClient.RoleGrantPermission(rctx, in, opts...) return err }) @@ -328,7 +331,7 @@ func (rac *retryAuthClient) RoleGrantPermission(ctx context.Context, in *pb.Auth } func (rac *retryAuthClient) RoleRevokePermission(ctx context.Context, in *pb.AuthRoleRevokePermissionRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleRevokePermissionResponse, err error) { - err = rac.retryf(ctx, func(rctx context.Context) error { + err = rac.writeRetry(ctx, func(rctx context.Context) error { resp, err = rac.AuthClient.RoleRevokePermission(rctx, in, opts...) return err }) From 26291abb3200005150dcceed95c6a9ea83994bef Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Wed, 18 Oct 2017 12:36:38 -0700 Subject: [PATCH 06/17] clientv3/balancer: fix retry logic Signed-off-by: Gyu-Ho Lee --- clientv3/balancer.go | 7 + clientv3/health_balancer.go | 33 +++-- clientv3/retry.go | 282 +++++++++++++++++++++++++++++------- 3 files changed, 262 insertions(+), 60 deletions(-) diff --git a/clientv3/balancer.go b/clientv3/balancer.go index 8c581c75bb1..83b6796aba6 100644 --- a/clientv3/balancer.go +++ b/clientv3/balancer.go @@ -46,6 +46,9 @@ type balancer interface { pinned() string // endpointError handles error from server-side. endpointError(host string, err error) + // returns errorInfo of host, if any. + // ok is false, if host is healthy. + isFailed(host string) (ev errorInfo, ok bool) // up is Up but includes whether the balancer will use the connection. up(addr grpc.Address) (func(error), bool) @@ -156,6 +159,10 @@ func (b *simpleBalancer) endpointError(host string, err error) { panic("'endpointError' not implemented") } +func (b *simpleBalancer) isFailed(host string) (errorInfo, bool) { + panic("'error' not implemented") +} + func getHost2ep(eps []string) map[string]string { hm := make(map[string]string, len(eps)) for i := range eps { diff --git a/clientv3/health_balancer.go b/clientv3/health_balancer.go index 8e80d2495c7..bc47f866100 100644 --- a/clientv3/health_balancer.go +++ b/clientv3/health_balancer.go @@ -30,6 +30,11 @@ const unknownService = "unknown service grpc.health.v1.Health" type healthCheckFunc func(ep string) (bool, error) +type errorInfo struct { + failed time.Time + err error +} + // healthBalancer wraps a balancer so that it uses health checking // to choose its endpoints. type healthBalancer struct { @@ -49,7 +54,7 @@ type healthBalancer struct { eps []string // unhealthyHosts tracks the last unhealthy time of endpoints. - unhealthyHosts map[string]time.Time + unhealthyHosts map[string]errorInfo stopc chan struct{} stopOnce sync.Once @@ -66,7 +71,7 @@ func newHealthBalancer(b balancer, timeout time.Duration, hc healthCheckFunc) *h eps: b.endpoints(), addrs: eps2addrs(b.endpoints()), host2ep: getHost2ep(b.endpoints()), - unhealthyHosts: make(map[string]time.Time), + unhealthyHosts: make(map[string]errorInfo), stopc: make(chan struct{}), } if timeout < minHealthRetryDuration { @@ -94,7 +99,7 @@ func (hb *healthBalancer) Up(addr grpc.Address) func(error) { // finding healthy endpoint on retry could take several timeouts and redials. // To avoid wasting retries, gray-list unhealthy endpoints. hb.mu.Lock() - hb.unhealthyHosts[addr.Addr] = time.Now() + hb.unhealthyHosts[addr.Addr] = errorInfo{failed: time.Now(), err: err} hb.mu.Unlock() f(err) if logger.V(4) { @@ -120,7 +125,7 @@ func (hb *healthBalancer) updateAddrs(eps ...string) { addrs, host2ep := eps2addrs(eps), getHost2ep(eps) hb.mu.Lock() hb.addrs, hb.eps, hb.host2ep = addrs, eps, host2ep - hb.unhealthyHosts = make(map[string]time.Time) + hb.unhealthyHosts = make(map[string]errorInfo) hb.mu.Unlock() hb.balancer.updateAddrs(eps...) } @@ -150,7 +155,7 @@ func (hb *healthBalancer) updateUnhealthy(timeout time.Duration) { } continue } - if time.Since(v) > timeout { + if time.Since(v.failed) > timeout { delete(hb.unhealthyHosts, k) if logger.V(4) { logger.Infof("clientv3/health-balancer: removes %q from unhealthy after %v", k, timeout) @@ -187,13 +192,20 @@ func (hb *healthBalancer) liveAddrs() []grpc.Address { func (hb *healthBalancer) endpointError(host string, err error) { hb.mu.Lock() - hb.unhealthyHosts[host] = time.Now() + hb.unhealthyHosts[host] = errorInfo{failed: time.Now(), err: err} hb.mu.Unlock() if logger.V(4) { logger.Infof("clientv3/health-balancer: marking %q as unhealthy (%q)", host, err.Error()) } } +func (hb *healthBalancer) isFailed(host string) (ev errorInfo, ok bool) { + hb.mu.RLock() + ev, ok = hb.unhealthyHosts[host] + hb.mu.RUnlock() + return ev, ok +} + func (hb *healthBalancer) mayPin(addr grpc.Address) bool { hb.mu.RLock() if _, ok := hb.host2ep[addr.Addr]; !ok { @@ -201,7 +213,7 @@ func (hb *healthBalancer) mayPin(addr grpc.Address) bool { return false } skip := len(hb.addrs) == 1 || len(hb.unhealthyHosts) == 0 || len(hb.addrs) == len(hb.unhealthyHosts) - failedTime, bad := hb.unhealthyHosts[addr.Addr] + ef, bad := hb.unhealthyHosts[addr.Addr] dur := hb.healthCheckTimeout hb.mu.RUnlock() if skip || !bad { @@ -212,13 +224,14 @@ func (hb *healthBalancer) mayPin(addr grpc.Address) bool { // 2. balancer 'Up' unpins with grpc: failed with network I/O error // 3. grpc-healthcheck still SERVING, thus retry to pin // instead, return before grpc-healthcheck if failed within healthcheck timeout - if elapsed := time.Since(failedTime); elapsed < dur { + if elapsed := time.Since(ef.failed); elapsed < dur { if logger.V(4) { logger.Infof("clientv3/health-balancer: %q is up but not pinned (failed %v ago, require minimum %v after failure)", addr.Addr, elapsed, dur) } return false } - if ok, _ := hb.healthCheck(addr.Addr); ok { + ok, err := hb.healthCheck(addr.Addr) + if ok { hb.mu.Lock() delete(hb.unhealthyHosts, addr.Addr) hb.mu.Unlock() @@ -228,7 +241,7 @@ func (hb *healthBalancer) mayPin(addr grpc.Address) bool { return true } hb.mu.Lock() - hb.unhealthyHosts[addr.Addr] = time.Now() + hb.unhealthyHosts[addr.Addr] = errorInfo{failed: time.Now(), err: err} hb.mu.Unlock() if logger.V(4) { logger.Infof("clientv3/health-balancer: %q becomes unhealthy (health check failed)", addr.Addr) diff --git a/clientv3/retry.go b/clientv3/retry.go index d51fb0a8342..4438c8b58ae 100644 --- a/clientv3/retry.go +++ b/clientv3/retry.go @@ -16,6 +16,7 @@ package clientv3 import ( "context" + "time" "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" @@ -23,65 +24,248 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "google.golang.org/grpc/transport" ) -type rpcFunc func(ctx context.Context) error -type retryRPCFunc func(context.Context, rpcFunc) error -type retryStopErrFunc func(error) bool +type rpcFunc func(context.Context) error + +// TODO: clean up gRPC error handling when "FailFast=false" is fixed. +// See https://github.com/grpc/grpc-go/issues/1532. +func (c *Client) do( + rpcCtx context.Context, + pinned string, + write bool, + f rpcFunc) (unhealthy, connectWait, switchEp, retryEp bool, err error) { + err = f(rpcCtx) + if err == nil { + unhealthy, connectWait, switchEp, retryEp = false, false, false, false + return unhealthy, connectWait, switchEp, retryEp, nil + } -func isReadStopError(err error) bool { - eErr := rpctypes.Error(err) - // always stop retry on etcd errors - if _, ok := eErr.(rpctypes.EtcdError); ok { - return true + if logger.V(4) { + logger.Infof("clientv3/do: error %q on pinned endpoint %q (write %v)", err.Error(), pinned, write) + } + + rerr := rpctypes.Error(err) + if ev, ok := rerr.(rpctypes.EtcdError); ok { + if ev.Code() != codes.Unavailable { + // error from etcd server with non codes.Unavailable + // then no endpoint switch and no retry + // e.g. rpctypes.ErrEmptyKey, rpctypes.ErrNoSpace + unhealthy, connectWait, switchEp, retryEp = false, false, false, false + return unhealthy, connectWait, switchEp, retryEp, err + } + // error from etcd server with codes.Unavailable + // then endpoint switch and retry + // e.g. rpctypes.ErrTimeout + unhealthy, connectWait, switchEp, retryEp = true, false, true, true + if write { // only retry immutable RPCs ("put at-most-once semantics") + retryEp = false + } + return unhealthy, connectWait, switchEp, retryEp, err } - // only retry if unavailable - ev, _ := status.FromError(err) - return ev.Code() != codes.Unavailable -} -func isWriteStopError(err error) bool { - ev, _ := status.FromError(err) - if ev.Code() != codes.Unavailable { - return true + // if failed to establish new stream or server is not reachable, + // then endpoint switch and retry + // e.g. transport.ErrConnClosing + if tv, ok := err.(transport.ConnectionError); ok && tv.Temporary() { + unhealthy, connectWait, switchEp, retryEp = true, false, true, true + return unhealthy, connectWait, switchEp, retryEp, err } - return rpctypes.ErrorDesc(err) != "there is no address available" + + // if unknown status error from gRPC, then endpoint switch and no retry + s, ok := status.FromError(err) + if !ok { + unhealthy, connectWait, switchEp, retryEp = true, false, true, false + return unhealthy, connectWait, switchEp, retryEp, err + } + + // assume transport.ConnectionError, transport.StreamError, or others from gRPC + // converts to grpc/status.(*statusError) by grpc/toRPCErr + // (e.g. transport.ErrConnClosing when server closed transport, failing node) + // if known status error from gRPC with following codes, + // then endpoint switch and retry + if s.Code() == codes.Unavailable || + s.Code() == codes.Internal || + s.Code() == codes.DeadlineExceeded { + unhealthy, connectWait, switchEp, retryEp = true, false, true, true + switch s.Message() { + case "there is no address available": + // pinned == "" or endpoint unpinned right before/during RPC send + // no need to mark empty endpoint + unhealthy = false + + // RPC was sent with empty pinned address: + // 1. initial connection has not been established (never pinned) + // 2. an endpoint has just been unpinned from an error + // Both cases expect to pin a new endpoint when Up is called. + // Thus, should be retried. + retryEp = true + if logger.V(4) { + logger.Infof("clientv3/do: there was no pinned endpoint (will be retried)") + } + + // case A. + // gRPC is being too slow to start transport (e.g. errors, backoff), + // then endpoint switch. Otherwise, it can block forever to connect wait. + // This case is safe to reset/drain all current connections. + switchEp = true + + // case B. + // gRPC is just about to establish transport within a moment, + // where endpoint switch would enforce connection drain on the healthy + // endpoint, which otherwise could be pinned and connected. + // The healthy endpoint gets pinned but unpinned right after with + // an error "grpc: the connection is drained", from endpoint switch. + // Thus, not safe to trigger endpoint switch; new healthy endpoint + // will be marked as unhealthy and not be pinned for another >3 seconds. + // Not acceptable when this healthy endpoint was the only available one! + // Workaround is wait up to dial timeout, and then endpoint switch + // only when the connection still has not been "Up" (still no pinned endpoint). + // TODO: remove this when gRPC has better way to track connection status + connectWait = true + + case "grpc: the connection is closing", + "grpc: the connection is unavailable": + // gRPC v1.7 retries on these errors with FailFast=false + // etcd client sets FailFast=true, thus retry manually + unhealthy, connectWait, switchEp, retryEp = true, false, true, true + + default: + // only retry immutable RPCs ("put at-most-once semantics") + // mutable RPCs should not be retried if unknown errors + if write { + unhealthy, connectWait, switchEp, retryEp = true, false, true, false + } + } + } + + return unhealthy, connectWait, switchEp, retryEp, err } -func (c *Client) newRetryWrapper(isStop retryStopErrFunc) retryRPCFunc { +type retryRPCFunc func(context.Context, rpcFunc) error + +const minDialDuration = 3 * time.Second + +func (c *Client) newRetryWrapper(write bool) retryRPCFunc { + dialTimeout := c.cfg.DialTimeout + if dialTimeout < minDialDuration { + dialTimeout = minDialDuration + } + dialWait := func(rpcCtx context.Context) (up bool, err error) { + select { + case <-c.balancer.ConnectNotify(): + if logger.V(4) { + logger.Infof("clientv3/retry: new healthy endpoint %q is up!", c.balancer.pinned()) + } + return true, nil + case <-time.After(dialTimeout): + return false, nil + case <-rpcCtx.Done(): + return false, rpcCtx.Err() + case <-c.ctx.Done(): + return false, c.ctx.Err() + } + } return func(rpcCtx context.Context, f rpcFunc) error { for { - select { - case <-c.balancer.ConnectNotify(): - case <-rpcCtx.Done(): - return rpcCtx.Err() - case <-c.ctx.Done(): - return c.ctx.Err() - } pinned := c.balancer.pinned() - err := f(rpcCtx) - if err == nil { + staleEp := pinned != "" && c.balancer.endpoint(pinned) == "" + eps := c.balancer.endpoints() + singleEp, multiEp := len(eps) == 1, len(eps) > 1 + + var unhealthy, connectWait, switchEp, retryEp bool + var err error + if pinned == "" { + // no endpoint has not been up, then wait for connection up and retry + unhealthy, connectWait, switchEp, retryEp = false, true, true, true + } else if staleEp { + // if stale, then endpoint switch and retry + unhealthy, switchEp, retryEp = false, true, true + // should wait in case this endpoint is stale from "SetEndpoints" + // which resets all connections, thus expecting new healthy endpoint "Up" + connectWait = true + if logger.V(4) { + logger.Infof("clientv3/retry: found stale endpoint %q (switching and retrying)", pinned) + } + } else { // endpoint is up-to-date + unhealthy, connectWait, switchEp, retryEp, err = c.do(rpcCtx, pinned, write, f) + } + if !unhealthy && !connectWait && !switchEp && !retryEp && err == nil { return nil } - if logger.V(4) { - logger.Infof("clientv3/retry: error %q on pinned endpoint %q", err.Error(), pinned) + + // single endpoint with failed gRPC connection, before RPC is sent + // should neither do endpoint switch nor retry + // because client might have manually closed the connection + // and there's no other endpoint to switch + // e.g. grpc.ErrClientConnClosing + if singleEp && connectWait { + ep := eps[0] + _, host, _ := parseEndpoint(ep) + ev, ok := c.balancer.isFailed(host) + if ok { + // error returned to gRPC balancer "Up" error function + // before RPC is sent (error is typed "grpc.downErr") + if ev.err.Error() == grpc.ErrClientConnClosing.Error() { + if logger.V(4) { + logger.Infof("clientv3/retry: single endpoint %q with error %q (returning)", host, ev.err.Error()) + } + return grpc.ErrClientConnClosing + } + if logger.V(4) { + logger.Infof("clientv3/retry: single endpoint %q with error %q", host, ev.err.Error()) + } + } } - // mark this before endpoint switch is triggered - c.balancer.endpointError(pinned, err) - notify := c.balancer.ConnectNotify() - if s, ok := status.FromError(err); ok && s.Code() == codes.Unavailable { - c.balancer.next() + + // mark as unhealthy, only when there are multiple endpoints + if multiEp && unhealthy { + c.balancer.endpointError(pinned, err) + } + + // wait for next healthy endpoint up + // before draining all current connections + if connectWait { + if logger.V(4) { + logger.Infof("clientv3/retry: wait %v for healthy endpoint", dialTimeout) + } + up, derr := dialWait(rpcCtx) + if derr != nil { + return derr + } + if up { // connection is up, no need to switch endpoint + continue + } + if logger.V(4) { + logger.Infof("clientv3/retry: took too long to reset transport (switching endpoints)") + } } - if isStop(err) { - return err + + // trigger endpoint switch in balancer + if switchEp { + c.balancer.next() } - select { - case <-notify: - case <-rpcCtx.Done(): - return rpcCtx.Err() - case <-c.ctx.Done(): - return c.ctx.Err() + + // wait for another endpoint to come up + if retryEp { + if logger.V(4) { + logger.Infof("clientv3/retry: wait %v for connection before retry", dialTimeout) + } + up, derr := dialWait(rpcCtx) + if derr != nil { + return derr + } + if up { // retry with new endpoint + continue + } + if logger.V(4) { + logger.Infof("clientv3/retry: took too long for connection up (retrying)") + } } + + // TODO: remove duplicate error handling inside toErr + return toErr(rpcCtx, err) } } } @@ -115,14 +299,12 @@ func (c *Client) newAuthRetryWrapper() retryRPCFunc { // RetryKVClient implements a KVClient that uses the client's FailFast retry policy. func RetryKVClient(c *Client) pb.KVClient { - readRetry := c.newRetryWrapper(isReadStopError) - writeRetry := c.newRetryWrapper(isWriteStopError) + readRetry := c.newRetryWrapper(false) + writeRetry := c.newRetryWrapper(true) conn := pb.NewKVClient(c.conn) retryBasic := &retryKVClient{&retryWriteKVClient{conn, writeRetry}, readRetry} retryAuthWrapper := c.newAuthRetryWrapper() - return &retryKVClient{ - &retryWriteKVClient{retryBasic, retryAuthWrapper}, - retryAuthWrapper} + return &retryKVClient{&retryWriteKVClient{retryBasic, retryAuthWrapper}, retryAuthWrapper} } type retryKVClient struct { @@ -184,7 +366,7 @@ type retryLeaseClient struct { func RetryLeaseClient(c *Client) pb.LeaseClient { retry := &retryLeaseClient{ pb.NewLeaseClient(c.conn), - c.newRetryWrapper(isReadStopError), + c.newRetryWrapper(false), } return &retryLeaseClient{retry, c.newAuthRetryWrapper()} } @@ -213,7 +395,7 @@ type retryClusterClient struct { // RetryClusterClient implements a ClusterClient that uses the client's FailFast retry policy. func RetryClusterClient(c *Client) pb.ClusterClient { - return &retryClusterClient{pb.NewClusterClient(c.conn), c.newRetryWrapper(isWriteStopError)} + return &retryClusterClient{pb.NewClusterClient(c.conn), c.newRetryWrapper(true)} } func (rcc *retryClusterClient) MemberAdd(ctx context.Context, in *pb.MemberAddRequest, opts ...grpc.CallOption) (resp *pb.MemberAddResponse, err error) { @@ -247,7 +429,7 @@ type retryAuthClient struct { // RetryAuthClient implements a AuthClient that uses the client's FailFast retry policy. func RetryAuthClient(c *Client) pb.AuthClient { - return &retryAuthClient{pb.NewAuthClient(c.conn), c.newRetryWrapper(isWriteStopError)} + return &retryAuthClient{pb.NewAuthClient(c.conn), c.newRetryWrapper(true)} } func (rac *retryAuthClient) AuthEnable(ctx context.Context, in *pb.AuthEnableRequest, opts ...grpc.CallOption) (resp *pb.AuthEnableResponse, err error) { From 8fac498d85a25dbad74c06dc2d878a485508a38f Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Wed, 18 Oct 2017 12:43:01 -0700 Subject: [PATCH 07/17] clientv3: remove redundant retries in KV, set FailFast=true Signed-off-by: Gyu-Ho Lee --- clientv3/kv.go | 39 +++++++++++---------------------------- clientv3/retry.go | 32 +++++++++++++++++++++++++++----- clientv3/txn.go | 28 +++++++--------------------- 3 files changed, 45 insertions(+), 54 deletions(-) diff --git a/clientv3/kv.go b/clientv3/kv.go index ead08a0827f..160a48110d8 100644 --- a/clientv3/kv.go +++ b/clientv3/kv.go @@ -18,8 +18,6 @@ import ( "context" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" - - "google.golang.org/grpc" ) type ( @@ -90,15 +88,17 @@ func (resp *TxnResponse) OpResponse() OpResponse { } type kv struct { - remote pb.KVClient + readWrite pb.KVClient + readOnly pb.KVClient } func NewKV(c *Client) KV { - return &kv{remote: RetryKVClient(c)} + readWrite, readOnly := RetryKVClient(c) + return &kv{readWrite: readWrite, readOnly: readOnly} } func NewKVFromKVClient(remote pb.KVClient) KV { - return &kv{remote: remote} + return &kv{readWrite: remote, readOnly: remote} } func (kv *kv) Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error) { @@ -117,7 +117,7 @@ func (kv *kv) Delete(ctx context.Context, key string, opts ...OpOption) (*Delete } func (kv *kv) Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error) { - resp, err := kv.remote.Compact(ctx, OpCompact(rev, opts...).toRequest()) + resp, err := kv.readWrite.Compact(ctx, OpCompact(rev, opts...).toRequest()) if err != nil { return nil, toErr(ctx, err) } @@ -132,53 +132,36 @@ func (kv *kv) Txn(ctx context.Context) Txn { } func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) { - for { - resp, err := kv.do(ctx, op) - if err == nil { - return resp, nil - } - - if isHaltErr(ctx, err) { - return resp, toErr(ctx, err) - } - // do not retry on modifications - if op.isWrite() { - return resp, toErr(ctx, err) - } - } -} - -func (kv *kv) do(ctx context.Context, op Op) (OpResponse, error) { var err error switch op.t { case tRange: var resp *pb.RangeResponse - resp, err = kv.remote.Range(ctx, op.toRangeRequest(), grpc.FailFast(false)) + resp, err = kv.readWrite.Range(ctx, op.toRangeRequest()) if err == nil { return OpResponse{get: (*GetResponse)(resp)}, nil } case tPut: var resp *pb.PutResponse r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID), PrevKv: op.prevKV, IgnoreValue: op.ignoreValue, IgnoreLease: op.ignoreLease} - resp, err = kv.remote.Put(ctx, r) + resp, err = kv.readWrite.Put(ctx, r) if err == nil { return OpResponse{put: (*PutResponse)(resp)}, nil } case tDeleteRange: var resp *pb.DeleteRangeResponse r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end, PrevKv: op.prevKV} - resp, err = kv.remote.DeleteRange(ctx, r) + resp, err = kv.readWrite.DeleteRange(ctx, r) if err == nil { return OpResponse{del: (*DeleteResponse)(resp)}, nil } case tTxn: var resp *pb.TxnResponse - resp, err = kv.remote.Txn(ctx, op.toTxnRequest()) + resp, err = kv.readWrite.Txn(ctx, op.toTxnRequest()) if err == nil { return OpResponse{txn: (*TxnResponse)(resp)}, nil } default: panic("Unknown op") } - return OpResponse{}, err + return OpResponse{}, toErr(ctx, err) } diff --git a/clientv3/retry.go b/clientv3/retry.go index 4438c8b58ae..4f8479acfe4 100644 --- a/clientv3/retry.go +++ b/clientv3/retry.go @@ -297,14 +297,23 @@ func (c *Client) newAuthRetryWrapper() retryRPCFunc { } } -// RetryKVClient implements a KVClient that uses the client's FailFast retry policy. -func RetryKVClient(c *Client) pb.KVClient { +// RetryKVClient implements a KVClient. +func RetryKVClient(c *Client) (readWrite, readOnly pb.KVClient) { readRetry := c.newRetryWrapper(false) writeRetry := c.newRetryWrapper(true) - conn := pb.NewKVClient(c.conn) - retryBasic := &retryKVClient{&retryWriteKVClient{conn, writeRetry}, readRetry} retryAuthWrapper := c.newAuthRetryWrapper() - return &retryKVClient{&retryWriteKVClient{retryBasic, retryAuthWrapper}, retryAuthWrapper} + kvc := pb.NewKVClient(c.conn) + + retryBasic := &retryKVClient{&retryWriteKVClient{kvc, writeRetry}, readRetry} + readWrite = &retryKVClient{ + &retryWriteKVClient{retryBasic, retryAuthWrapper}, + retryAuthWrapper, + } + + retryRead := &retryReadKVClient{kvc, readRetry} + readOnly = &retryReadKVClient{retryRead, retryAuthWrapper} + + return readWrite, readOnly } type retryKVClient struct { @@ -349,6 +358,19 @@ func (rkv *retryWriteKVClient) Txn(ctx context.Context, in *pb.TxnRequest, opts return resp, err } +type retryReadKVClient struct { + pb.KVClient + readRetry retryRPCFunc +} + +func (rkv *retryReadKVClient) Txn(ctx context.Context, in *pb.TxnRequest, opts ...grpc.CallOption) (resp *pb.TxnResponse, err error) { + err = rkv.readRetry(ctx, func(rctx context.Context) error { + resp, err = rkv.KVClient.Txn(rctx, in, opts...) + return err + }) + return resp, err +} + func (rkv *retryWriteKVClient) Compact(ctx context.Context, in *pb.CompactionRequest, opts ...grpc.CallOption) (resp *pb.CompactionResponse, err error) { err = rkv.writeRetry(ctx, func(rctx context.Context) error { resp, err = rkv.KVClient.Compact(rctx, in, opts...) diff --git a/clientv3/txn.go b/clientv3/txn.go index ea4ec6160b5..fb5214262ca 100644 --- a/clientv3/txn.go +++ b/clientv3/txn.go @@ -19,8 +19,6 @@ import ( "sync" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" - - "google.golang.org/grpc" ) // Txn is the interface that wraps mini-transactions. @@ -136,30 +134,18 @@ func (txn *txn) Else(ops ...Op) Txn { func (txn *txn) Commit() (*TxnResponse, error) { txn.mu.Lock() defer txn.mu.Unlock() - for { - resp, err := txn.commit() - if err == nil { - return resp, err - } - if isHaltErr(txn.ctx, err) { - return nil, toErr(txn.ctx, err) - } - if txn.isWrite { - return nil, toErr(txn.ctx, err) - } - } -} -func (txn *txn) commit() (*TxnResponse, error) { r := &pb.TxnRequest{Compare: txn.cmps, Success: txn.sus, Failure: txn.fas} - var opts []grpc.CallOption - if !txn.isWrite { - opts = []grpc.CallOption{grpc.FailFast(false)} + var resp *pb.TxnResponse + var err error + if txn.isWrite { + resp, err = txn.kv.readWrite.Txn(txn.ctx, r) + } else { + resp, err = txn.kv.readOnly.Txn(txn.ctx, r) } - resp, err := txn.kv.remote.Txn(txn.ctx, r, opts...) if err != nil { - return nil, err + return nil, toErr(txn.ctx, err) } return (*TxnResponse)(resp), nil } From d13827f23536f00a7d499e3172ae177eac0a1874 Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Wed, 18 Oct 2017 13:20:50 -0700 Subject: [PATCH 08/17] clientv3: remove redundant retries in Watch, set FailFast=true Signed-off-by: Gyu-Ho Lee --- clientv3/retry.go | 20 ++++++++++++++++++++ clientv3/watch.go | 7 ++++--- 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/clientv3/retry.go b/clientv3/retry.go index 4f8479acfe4..05e6e7dd494 100644 --- a/clientv3/retry.go +++ b/clientv3/retry.go @@ -379,6 +379,26 @@ func (rkv *retryWriteKVClient) Compact(ctx context.Context, in *pb.CompactionReq return resp, err } +// RetryWatchClient implements a WatchClient. +func RetryWatchClient(c *Client) pb.WatchClient { + readRetry := c.newRetryWrapper(false) + wc := pb.NewWatchClient(c.conn) + return &retryWatchClient{wc, readRetry} +} + +type retryWatchClient struct { + pb.WatchClient + readRetry retryRPCFunc +} + +func (rwc *retryWatchClient) Watch(ctx context.Context, opts ...grpc.CallOption) (stream pb.Watch_WatchClient, err error) { + err = rwc.readRetry(ctx, func(rctx context.Context) error { + stream, err = rwc.WatchClient.Watch(rctx, opts...) + return err + }) + return stream, err +} + type retryLeaseClient struct { pb.LeaseClient readRetry retryRPCFunc diff --git a/clientv3/watch.go b/clientv3/watch.go index cfa47812691..2b31c4f6af6 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -188,7 +188,7 @@ type watcherStream struct { } func NewWatcher(c *Client) Watcher { - return NewWatchFromWatchClient(pb.NewWatchClient(c.conn)) + return NewWatchFromWatchClient(RetryWatchClient(c)) } func NewWatchFromWatchClient(wc pb.WatchClient) Watcher { @@ -761,8 +761,9 @@ func (w *watchGrpcStream) joinSubstreams() { } } -// openWatchClient retries opening a watch client until success or halt. +// openWatchClient opens a watch client. func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) { + // manually retry in case "ws==nil && err==nil" for { select { case <-w.ctx.Done(): @@ -772,7 +773,7 @@ func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) return nil, err default: } - if ws, err = w.remote.Watch(w.ctx, grpc.FailFast(false)); ws != nil && err == nil { + if ws, err = w.remote.Watch(w.ctx); ws != nil && err == nil { break } if isHaltErr(w.ctx, err) { From f90a0b614c65cb9f556d320d77389b11e486b619 Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Wed, 18 Oct 2017 13:11:16 -0700 Subject: [PATCH 09/17] clientv3: remove redundant retries in Lease, set FailFast=true Signed-off-by: Gyu-Ho Lee --- clientv3/lease.go | 96 +++++++++++++++++++---------------------------- clientv3/retry.go | 36 +++++++++++++++--- 2 files changed, 69 insertions(+), 63 deletions(-) diff --git a/clientv3/lease.go b/clientv3/lease.go index e476db5be2e..6c2571ee04b 100644 --- a/clientv3/lease.go +++ b/clientv3/lease.go @@ -22,7 +22,6 @@ import ( "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" - "google.golang.org/grpc" "google.golang.org/grpc/metadata" ) @@ -183,72 +182,55 @@ func NewLeaseFromLeaseClient(remote pb.LeaseClient, keepAliveTimeout time.Durati } func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error) { - for { - r := &pb.LeaseGrantRequest{TTL: ttl} - resp, err := l.remote.LeaseGrant(ctx, r) - if err == nil { - gresp := &LeaseGrantResponse{ - ResponseHeader: resp.GetHeader(), - ID: LeaseID(resp.ID), - TTL: resp.TTL, - Error: resp.Error, - } - return gresp, nil - } - if isHaltErr(ctx, err) { - return nil, toErr(ctx, err) + r := &pb.LeaseGrantRequest{TTL: ttl} + resp, err := l.remote.LeaseGrant(ctx, r) + if err == nil { + gresp := &LeaseGrantResponse{ + ResponseHeader: resp.GetHeader(), + ID: LeaseID(resp.ID), + TTL: resp.TTL, + Error: resp.Error, } + return gresp, nil } + return nil, toErr(ctx, err) } func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error) { - for { - r := &pb.LeaseRevokeRequest{ID: int64(id)} - resp, err := l.remote.LeaseRevoke(ctx, r) - - if err == nil { - return (*LeaseRevokeResponse)(resp), nil - } - if isHaltErr(ctx, err) { - return nil, toErr(ctx, err) - } + r := &pb.LeaseRevokeRequest{ID: int64(id)} + resp, err := l.remote.LeaseRevoke(ctx, r) + if err == nil { + return (*LeaseRevokeResponse)(resp), nil } + return nil, toErr(ctx, err) } func (l *lessor) TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error) { - for { - r := toLeaseTimeToLiveRequest(id, opts...) - resp, err := l.remote.LeaseTimeToLive(ctx, r, grpc.FailFast(false)) - if err == nil { - gresp := &LeaseTimeToLiveResponse{ - ResponseHeader: resp.GetHeader(), - ID: LeaseID(resp.ID), - TTL: resp.TTL, - GrantedTTL: resp.GrantedTTL, - Keys: resp.Keys, - } - return gresp, nil - } - if isHaltErr(ctx, err) { - return nil, toErr(ctx, err) + r := toLeaseTimeToLiveRequest(id, opts...) + resp, err := l.remote.LeaseTimeToLive(ctx, r) + if err == nil { + gresp := &LeaseTimeToLiveResponse{ + ResponseHeader: resp.GetHeader(), + ID: LeaseID(resp.ID), + TTL: resp.TTL, + GrantedTTL: resp.GrantedTTL, + Keys: resp.Keys, } + return gresp, nil } + return nil, toErr(ctx, err) } func (l *lessor) Leases(ctx context.Context) (*LeaseLeasesResponse, error) { - for { - resp, err := l.remote.LeaseLeases(ctx, &pb.LeaseLeasesRequest{}, grpc.FailFast(false)) - if err == nil { - leases := make([]LeaseStatus, len(resp.Leases)) - for i := range resp.Leases { - leases[i] = LeaseStatus{ID: LeaseID(resp.Leases[i].ID)} - } - return &LeaseLeasesResponse{ResponseHeader: resp.GetHeader(), Leases: leases}, nil - } - if isHaltErr(ctx, err) { - return nil, toErr(ctx, err) + resp, err := l.remote.LeaseLeases(ctx, &pb.LeaseLeasesRequest{}) + if err == nil { + leases := make([]LeaseStatus, len(resp.Leases)) + for i := range resp.Leases { + leases[i] = LeaseStatus{ID: LeaseID(resp.Leases[i].ID)} } + return &LeaseLeasesResponse{ResponseHeader: resp.GetHeader(), Leases: leases}, nil } + return nil, toErr(ctx, err) } func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) { @@ -292,9 +274,10 @@ func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAl } func (l *lessor) KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) { + // manually retry in case "resp==nil && err==nil" for { resp, err := l.keepAliveOnce(ctx, id) - if err == nil { + if resp != nil && err == nil { if resp.TTL <= 0 { err = rpctypes.ErrLeaseNotFound } @@ -389,7 +372,7 @@ func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAlive cctx, cancel := context.WithCancel(ctx) defer cancel() - stream, err := l.remote.LeaseKeepAlive(cctx, grpc.FailFast(false)) + stream, err := l.remote.LeaseKeepAlive(cctx) if err != nil { return nil, toErr(ctx, err) } @@ -430,10 +413,9 @@ func (l *lessor) recvKeepAliveLoop() (gerr error) { if canceledByCaller(l.stopCtx, err) { return err } - } else { + } else if stream != nil { for { resp, err := stream.Recv() - if err != nil { if canceledByCaller(l.stopCtx, err) { return err @@ -461,8 +443,8 @@ func (l *lessor) recvKeepAliveLoop() (gerr error) { // resetRecv opens a new lease stream and starts sending keep alive requests. func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) { sctx, cancel := context.WithCancel(l.stopCtx) - stream, err := l.remote.LeaseKeepAlive(sctx, grpc.FailFast(false)) - if err != nil { + stream, err := l.remote.LeaseKeepAlive(sctx) + if stream == nil || err != nil { cancel() return nil, err } diff --git a/clientv3/retry.go b/clientv3/retry.go index 05e6e7dd494..8b43f8e71a6 100644 --- a/clientv3/retry.go +++ b/clientv3/retry.go @@ -404,13 +404,29 @@ type retryLeaseClient struct { readRetry retryRPCFunc } -// RetryLeaseClient implements a LeaseClient that uses the client's FailFast retry policy. +// RetryLeaseClient implements a LeaseClient. func RetryLeaseClient(c *Client) pb.LeaseClient { - retry := &retryLeaseClient{ - pb.NewLeaseClient(c.conn), - c.newRetryWrapper(false), - } - return &retryLeaseClient{retry, c.newAuthRetryWrapper()} + readRetry := c.newRetryWrapper(false) + lc := pb.NewLeaseClient(c.conn) + retryBasic := &retryLeaseClient{lc, readRetry} + retryAuthWrapper := c.newAuthRetryWrapper() + return &retryLeaseClient{retryBasic, retryAuthWrapper} +} + +func (rlc *retryLeaseClient) LeaseTimeToLive(ctx context.Context, in *pb.LeaseTimeToLiveRequest, opts ...grpc.CallOption) (resp *pb.LeaseTimeToLiveResponse, err error) { + err = rlc.readRetry(ctx, func(rctx context.Context) error { + resp, err = rlc.LeaseClient.LeaseTimeToLive(rctx, in, opts...) + return err + }) + return resp, err +} + +func (rlc *retryLeaseClient) LeaseLeases(ctx context.Context, in *pb.LeaseLeasesRequest, opts ...grpc.CallOption) (resp *pb.LeaseLeasesResponse, err error) { + err = rlc.readRetry(ctx, func(rctx context.Context) error { + resp, err = rlc.LeaseClient.LeaseLeases(rctx, in, opts...) + return err + }) + return resp, err } func (rlc *retryLeaseClient) LeaseGrant(ctx context.Context, in *pb.LeaseGrantRequest, opts ...grpc.CallOption) (resp *pb.LeaseGrantResponse, err error) { @@ -430,6 +446,14 @@ func (rlc *retryLeaseClient) LeaseRevoke(ctx context.Context, in *pb.LeaseRevoke return resp, err } +func (rlc *retryLeaseClient) LeaseKeepAlive(ctx context.Context, opts ...grpc.CallOption) (stream pb.Lease_LeaseKeepAliveClient, err error) { + err = rlc.readRetry(ctx, func(rctx context.Context) error { + stream, err = rlc.LeaseClient.LeaseKeepAlive(rctx, opts...) + return err + }) + return stream, err +} + type retryClusterClient struct { pb.ClusterClient writeRetry retryRPCFunc From d79e6552a9c09aefca2bb111af1d481d1f16611f Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Wed, 18 Oct 2017 13:14:05 -0700 Subject: [PATCH 10/17] clientv3: remove redundant retries in Cluster, set FailFast=true Signed-off-by: Gyu-Ho Lee --- clientv3/cluster.go | 27 +++++++++------------------ clientv3/retry.go | 32 ++++++++++++++++++++++++-------- 2 files changed, 33 insertions(+), 26 deletions(-) diff --git a/clientv3/cluster.go b/clientv3/cluster.go index bbecaaca74e..8beba58a67b 100644 --- a/clientv3/cluster.go +++ b/clientv3/cluster.go @@ -18,7 +18,6 @@ import ( "context" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" - "google.golang.org/grpc" ) type ( @@ -75,27 +74,19 @@ func (c *cluster) MemberRemove(ctx context.Context, id uint64) (*MemberRemoveRes func (c *cluster) MemberUpdate(ctx context.Context, id uint64, peerAddrs []string) (*MemberUpdateResponse, error) { // it is safe to retry on update. - for { - r := &pb.MemberUpdateRequest{ID: id, PeerURLs: peerAddrs} - resp, err := c.remote.MemberUpdate(ctx, r, grpc.FailFast(false)) - if err == nil { - return (*MemberUpdateResponse)(resp), nil - } - if isHaltErr(ctx, err) { - return nil, toErr(ctx, err) - } + r := &pb.MemberUpdateRequest{ID: id, PeerURLs: peerAddrs} + resp, err := c.remote.MemberUpdate(ctx, r) + if err == nil { + return (*MemberUpdateResponse)(resp), nil } + return nil, toErr(ctx, err) } func (c *cluster) MemberList(ctx context.Context) (*MemberListResponse, error) { // it is safe to retry on list. - for { - resp, err := c.remote.MemberList(ctx, &pb.MemberListRequest{}, grpc.FailFast(false)) - if err == nil { - return (*MemberListResponse)(resp), nil - } - if isHaltErr(ctx, err) { - return nil, toErr(ctx, err) - } + resp, err := c.remote.MemberList(ctx, &pb.MemberListRequest{}) + if err == nil { + return (*MemberListResponse)(resp), nil } + return nil, toErr(ctx, err) } diff --git a/clientv3/retry.go b/clientv3/retry.go index 8b43f8e71a6..115bf19a846 100644 --- a/clientv3/retry.go +++ b/clientv3/retry.go @@ -454,17 +454,33 @@ func (rlc *retryLeaseClient) LeaseKeepAlive(ctx context.Context, opts ...grpc.Ca return stream, err } +// RetryClusterClient implements a ClusterClient. +func RetryClusterClient(c *Client) pb.ClusterClient { + readRetry := c.newRetryWrapper(false) + writeRetry := c.newRetryWrapper(true) + cc := pb.NewClusterClient(c.conn) + return &retryClusterClient{&retryWriteClusterClient{cc, writeRetry}, readRetry} +} + type retryClusterClient struct { - pb.ClusterClient - writeRetry retryRPCFunc + *retryWriteClusterClient + readRetry retryRPCFunc } -// RetryClusterClient implements a ClusterClient that uses the client's FailFast retry policy. -func RetryClusterClient(c *Client) pb.ClusterClient { - return &retryClusterClient{pb.NewClusterClient(c.conn), c.newRetryWrapper(true)} +func (rcc *retryClusterClient) MemberList(ctx context.Context, in *pb.MemberListRequest, opts ...grpc.CallOption) (resp *pb.MemberListResponse, err error) { + err = rcc.readRetry(ctx, func(rctx context.Context) error { + resp, err = rcc.ClusterClient.MemberList(rctx, in, opts...) + return err + }) + return resp, err +} + +type retryWriteClusterClient struct { + pb.ClusterClient + writeRetry retryRPCFunc } -func (rcc *retryClusterClient) MemberAdd(ctx context.Context, in *pb.MemberAddRequest, opts ...grpc.CallOption) (resp *pb.MemberAddResponse, err error) { +func (rcc *retryWriteClusterClient) MemberAdd(ctx context.Context, in *pb.MemberAddRequest, opts ...grpc.CallOption) (resp *pb.MemberAddResponse, err error) { err = rcc.writeRetry(ctx, func(rctx context.Context) error { resp, err = rcc.ClusterClient.MemberAdd(rctx, in, opts...) return err @@ -472,7 +488,7 @@ func (rcc *retryClusterClient) MemberAdd(ctx context.Context, in *pb.MemberAddRe return resp, err } -func (rcc *retryClusterClient) MemberRemove(ctx context.Context, in *pb.MemberRemoveRequest, opts ...grpc.CallOption) (resp *pb.MemberRemoveResponse, err error) { +func (rcc *retryWriteClusterClient) MemberRemove(ctx context.Context, in *pb.MemberRemoveRequest, opts ...grpc.CallOption) (resp *pb.MemberRemoveResponse, err error) { err = rcc.writeRetry(ctx, func(rctx context.Context) error { resp, err = rcc.ClusterClient.MemberRemove(rctx, in, opts...) return err @@ -480,7 +496,7 @@ func (rcc *retryClusterClient) MemberRemove(ctx context.Context, in *pb.MemberRe return resp, err } -func (rcc *retryClusterClient) MemberUpdate(ctx context.Context, in *pb.MemberUpdateRequest, opts ...grpc.CallOption) (resp *pb.MemberUpdateResponse, err error) { +func (rcc *retryWriteClusterClient) MemberUpdate(ctx context.Context, in *pb.MemberUpdateRequest, opts ...grpc.CallOption) (resp *pb.MemberUpdateResponse, err error) { err = rcc.writeRetry(ctx, func(rctx context.Context) error { resp, err = rcc.ClusterClient.MemberUpdate(rctx, in, opts...) return err From ff1b2cdc33b3ed64ac55f1c2470715d27e3ba0e8 Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Wed, 18 Oct 2017 13:15:40 -0700 Subject: [PATCH 11/17] clientv3: remove redundant retries in Maintenance, set FailFast=true Signed-off-by: Gyu-Ho Lee --- clientv3/maintenance.go | 30 +++++++---------- clientv3/retry.go | 74 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 86 insertions(+), 18 deletions(-) diff --git a/clientv3/maintenance.go b/clientv3/maintenance.go index 988a5f7c288..25abc9c9100 100644 --- a/clientv3/maintenance.go +++ b/clientv3/maintenance.go @@ -19,8 +19,6 @@ import ( "io" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" - - "google.golang.org/grpc" ) type ( @@ -77,9 +75,9 @@ func NewMaintenance(c *Client) Maintenance { return nil, nil, err } cancel := func() { conn.Close() } - return pb.NewMaintenanceClient(conn), cancel, nil + return RetryMaintenanceClient(c, conn), cancel, nil }, - remote: pb.NewMaintenanceClient(c.conn), + remote: RetryMaintenanceClient(c, c.conn), } } @@ -98,15 +96,11 @@ func (m *maintenance) AlarmList(ctx context.Context) (*AlarmResponse, error) { MemberID: 0, // all Alarm: pb.AlarmType_NONE, // all } - for { - resp, err := m.remote.Alarm(ctx, req, grpc.FailFast(false)) - if err == nil { - return (*AlarmResponse)(resp), nil - } - if isHaltErr(ctx, err) { - return nil, toErr(ctx, err) - } + resp, err := m.remote.Alarm(ctx, req) + if err == nil { + return (*AlarmResponse)(resp), nil } + return nil, toErr(ctx, err) } func (m *maintenance) AlarmDisarm(ctx context.Context, am *AlarmMember) (*AlarmResponse, error) { @@ -132,7 +126,7 @@ func (m *maintenance) AlarmDisarm(ctx context.Context, am *AlarmMember) (*AlarmR return &ret, nil } - resp, err := m.remote.Alarm(ctx, req, grpc.FailFast(false)) + resp, err := m.remote.Alarm(ctx, req) if err == nil { return (*AlarmResponse)(resp), nil } @@ -145,7 +139,7 @@ func (m *maintenance) Defragment(ctx context.Context, endpoint string) (*Defragm return nil, toErr(ctx, err) } defer cancel() - resp, err := remote.Defragment(ctx, &pb.DefragmentRequest{}, grpc.FailFast(false)) + resp, err := remote.Defragment(ctx, &pb.DefragmentRequest{}) if err != nil { return nil, toErr(ctx, err) } @@ -158,7 +152,7 @@ func (m *maintenance) Status(ctx context.Context, endpoint string) (*StatusRespo return nil, toErr(ctx, err) } defer cancel() - resp, err := remote.Status(ctx, &pb.StatusRequest{}, grpc.FailFast(false)) + resp, err := remote.Status(ctx, &pb.StatusRequest{}) if err != nil { return nil, toErr(ctx, err) } @@ -171,7 +165,7 @@ func (m *maintenance) HashKV(ctx context.Context, endpoint string, rev int64) (* return nil, toErr(ctx, err) } defer cancel() - resp, err := remote.HashKV(ctx, &pb.HashKVRequest{Revision: rev}, grpc.FailFast(false)) + resp, err := remote.HashKV(ctx, &pb.HashKVRequest{Revision: rev}) if err != nil { return nil, toErr(ctx, err) } @@ -179,7 +173,7 @@ func (m *maintenance) HashKV(ctx context.Context, endpoint string, rev int64) (* } func (m *maintenance) Snapshot(ctx context.Context) (io.ReadCloser, error) { - ss, err := m.remote.Snapshot(ctx, &pb.SnapshotRequest{}, grpc.FailFast(false)) + ss, err := m.remote.Snapshot(ctx, &pb.SnapshotRequest{}) if err != nil { return nil, toErr(ctx, err) } @@ -206,6 +200,6 @@ func (m *maintenance) Snapshot(ctx context.Context) (io.ReadCloser, error) { } func (m *maintenance) MoveLeader(ctx context.Context, transfereeID uint64) (*MoveLeaderResponse, error) { - resp, err := m.remote.MoveLeader(ctx, &pb.MoveLeaderRequest{TargetID: transfereeID}, grpc.FailFast(false)) + resp, err := m.remote.MoveLeader(ctx, &pb.MoveLeaderRequest{TargetID: transfereeID}) return (*MoveLeaderResponse)(resp), toErr(ctx, err) } diff --git a/clientv3/retry.go b/clientv3/retry.go index 115bf19a846..f2e8cab76d5 100644 --- a/clientv3/retry.go +++ b/clientv3/retry.go @@ -504,6 +504,80 @@ func (rcc *retryWriteClusterClient) MemberUpdate(ctx context.Context, in *pb.Mem return resp, err } +// RetryMaintenanceClient implements a Maintenance. +func RetryMaintenanceClient(c *Client, conn *grpc.ClientConn) pb.MaintenanceClient { + readRetry := c.newRetryWrapper(false) + writeRetry := c.newRetryWrapper(true) + mc := pb.NewMaintenanceClient(conn) + return &retryMaintenanceClient{&retryWriteMaintenanceClient{mc, writeRetry}, readRetry} +} + +type retryMaintenanceClient struct { + pb.MaintenanceClient + readRetry retryRPCFunc +} + +func (rmc *retryMaintenanceClient) Alarm(ctx context.Context, in *pb.AlarmRequest, opts ...grpc.CallOption) (resp *pb.AlarmResponse, err error) { + err = rmc.readRetry(ctx, func(rctx context.Context) error { + resp, err = rmc.MaintenanceClient.Alarm(rctx, in, opts...) + return err + }) + return resp, err +} + +func (rmc *retryMaintenanceClient) Status(ctx context.Context, in *pb.StatusRequest, opts ...grpc.CallOption) (resp *pb.StatusResponse, err error) { + err = rmc.readRetry(ctx, func(rctx context.Context) error { + resp, err = rmc.MaintenanceClient.Status(rctx, in, opts...) + return err + }) + return resp, err +} + +func (rmc *retryMaintenanceClient) Hash(ctx context.Context, in *pb.HashRequest, opts ...grpc.CallOption) (resp *pb.HashResponse, err error) { + err = rmc.readRetry(ctx, func(rctx context.Context) error { + resp, err = rmc.MaintenanceClient.Hash(rctx, in, opts...) + return err + }) + return resp, err +} + +func (rmc *retryMaintenanceClient) HashKV(ctx context.Context, in *pb.HashKVRequest, opts ...grpc.CallOption) (resp *pb.HashKVResponse, err error) { + err = rmc.readRetry(ctx, func(rctx context.Context) error { + resp, err = rmc.MaintenanceClient.HashKV(rctx, in, opts...) + return err + }) + return resp, err +} + +func (rmc *retryMaintenanceClient) Snapshot(ctx context.Context, in *pb.SnapshotRequest, opts ...grpc.CallOption) (stream pb.Maintenance_SnapshotClient, err error) { + err = rmc.readRetry(ctx, func(rctx context.Context) error { + stream, err = rmc.MaintenanceClient.Snapshot(rctx, in, opts...) + return err + }) + return stream, err +} + +type retryWriteMaintenanceClient struct { + pb.MaintenanceClient + writeRetry retryRPCFunc +} + +func (rmc *retryWriteMaintenanceClient) Defragment(ctx context.Context, in *pb.DefragmentRequest, opts ...grpc.CallOption) (resp *pb.DefragmentResponse, err error) { + err = rmc.writeRetry(ctx, func(rctx context.Context) error { + resp, err = rmc.MaintenanceClient.Defragment(rctx, in, opts...) + return err + }) + return resp, err +} + +func (rmc *retryWriteMaintenanceClient) MoveLeader(ctx context.Context, in *pb.MoveLeaderRequest, opts ...grpc.CallOption) (resp *pb.MoveLeaderResponse, err error) { + err = rmc.writeRetry(ctx, func(rctx context.Context) error { + resp, err = rmc.MaintenanceClient.MoveLeader(rctx, in, opts...) + return err + }) + return resp, err +} + type retryAuthClient struct { pb.AuthClient writeRetry retryRPCFunc From 4e0b52d0a9da31db6d5f6433f5bf6950a1a6fb30 Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Wed, 18 Oct 2017 13:24:05 -0700 Subject: [PATCH 12/17] clientv3: remove redundant retries in Auth, set FailFast=true Signed-off-by: Gyu-Ho Lee --- clientv3/auth.go | 16 +++++------ clientv3/retry.go | 72 ++++++++++++++++++++++++++++++++++++----------- 2 files changed, 64 insertions(+), 24 deletions(-) diff --git a/clientv3/auth.go b/clientv3/auth.go index a6ab4684140..8df670f163a 100644 --- a/clientv3/auth.go +++ b/clientv3/auth.go @@ -105,16 +105,16 @@ type auth struct { } func NewAuth(c *Client) Auth { - return &auth{remote: pb.NewAuthClient(c.ActiveConnection())} + return &auth{remote: RetryAuthClient(c)} } func (auth *auth) AuthEnable(ctx context.Context) (*AuthEnableResponse, error) { - resp, err := auth.remote.AuthEnable(ctx, &pb.AuthEnableRequest{}, grpc.FailFast(false)) + resp, err := auth.remote.AuthEnable(ctx, &pb.AuthEnableRequest{}) return (*AuthEnableResponse)(resp), toErr(ctx, err) } func (auth *auth) AuthDisable(ctx context.Context) (*AuthDisableResponse, error) { - resp, err := auth.remote.AuthDisable(ctx, &pb.AuthDisableRequest{}, grpc.FailFast(false)) + resp, err := auth.remote.AuthDisable(ctx, &pb.AuthDisableRequest{}) return (*AuthDisableResponse)(resp), toErr(ctx, err) } @@ -139,12 +139,12 @@ func (auth *auth) UserGrantRole(ctx context.Context, user string, role string) ( } func (auth *auth) UserGet(ctx context.Context, name string) (*AuthUserGetResponse, error) { - resp, err := auth.remote.UserGet(ctx, &pb.AuthUserGetRequest{Name: name}, grpc.FailFast(false)) + resp, err := auth.remote.UserGet(ctx, &pb.AuthUserGetRequest{Name: name}) return (*AuthUserGetResponse)(resp), toErr(ctx, err) } func (auth *auth) UserList(ctx context.Context) (*AuthUserListResponse, error) { - resp, err := auth.remote.UserList(ctx, &pb.AuthUserListRequest{}, grpc.FailFast(false)) + resp, err := auth.remote.UserList(ctx, &pb.AuthUserListRequest{}) return (*AuthUserListResponse)(resp), toErr(ctx, err) } @@ -169,12 +169,12 @@ func (auth *auth) RoleGrantPermission(ctx context.Context, name string, key, ran } func (auth *auth) RoleGet(ctx context.Context, role string) (*AuthRoleGetResponse, error) { - resp, err := auth.remote.RoleGet(ctx, &pb.AuthRoleGetRequest{Role: role}, grpc.FailFast(false)) + resp, err := auth.remote.RoleGet(ctx, &pb.AuthRoleGetRequest{Role: role}) return (*AuthRoleGetResponse)(resp), toErr(ctx, err) } func (auth *auth) RoleList(ctx context.Context) (*AuthRoleListResponse, error) { - resp, err := auth.remote.RoleList(ctx, &pb.AuthRoleListRequest{}, grpc.FailFast(false)) + resp, err := auth.remote.RoleList(ctx, &pb.AuthRoleListRequest{}) return (*AuthRoleListResponse)(resp), toErr(ctx, err) } @@ -202,7 +202,7 @@ type authenticator struct { } func (auth *authenticator) authenticate(ctx context.Context, name string, password string) (*AuthenticateResponse, error) { - resp, err := auth.remote.Authenticate(ctx, &pb.AuthenticateRequest{Name: name, Password: password}, grpc.FailFast(false)) + resp, err := auth.remote.Authenticate(ctx, &pb.AuthenticateRequest{Name: name, Password: password}) return (*AuthenticateResponse)(resp), toErr(ctx, err) } diff --git a/clientv3/retry.go b/clientv3/retry.go index f2e8cab76d5..8d4a3fc7d6e 100644 --- a/clientv3/retry.go +++ b/clientv3/retry.go @@ -578,17 +578,49 @@ func (rmc *retryWriteMaintenanceClient) MoveLeader(ctx context.Context, in *pb.M return resp, err } +// RetryAuthClient implements a AuthClient. +func RetryAuthClient(c *Client) pb.AuthClient { + readRetry := c.newRetryWrapper(false) + writeRetry := c.newRetryWrapper(true) + ac := pb.NewAuthClient(c.conn) + return &retryAuthClient{&retryWriteAuthClient{ac, writeRetry}, readRetry} +} + type retryAuthClient struct { - pb.AuthClient - writeRetry retryRPCFunc + *retryWriteAuthClient + readRetry retryRPCFunc } -// RetryAuthClient implements a AuthClient that uses the client's FailFast retry policy. -func RetryAuthClient(c *Client) pb.AuthClient { - return &retryAuthClient{pb.NewAuthClient(c.conn), c.newRetryWrapper(true)} +func (rac *retryAuthClient) UserList(ctx context.Context, in *pb.AuthUserListRequest, opts ...grpc.CallOption) (resp *pb.AuthUserListResponse, err error) { + err = rac.readRetry(ctx, func(rctx context.Context) error { + resp, err = rac.AuthClient.UserList(rctx, in, opts...) + return err + }) + return resp, err +} + +func (rac *retryAuthClient) RoleGet(ctx context.Context, in *pb.AuthRoleGetRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleGetResponse, err error) { + err = rac.readRetry(ctx, func(rctx context.Context) error { + resp, err = rac.AuthClient.RoleGet(rctx, in, opts...) + return err + }) + return resp, err +} + +func (rac *retryAuthClient) RoleList(ctx context.Context, in *pb.AuthRoleListRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleListResponse, err error) { + err = rac.readRetry(ctx, func(rctx context.Context) error { + resp, err = rac.AuthClient.RoleList(rctx, in, opts...) + return err + }) + return resp, err +} + +type retryWriteAuthClient struct { + pb.AuthClient + writeRetry retryRPCFunc } -func (rac *retryAuthClient) AuthEnable(ctx context.Context, in *pb.AuthEnableRequest, opts ...grpc.CallOption) (resp *pb.AuthEnableResponse, err error) { +func (rac *retryWriteAuthClient) AuthEnable(ctx context.Context, in *pb.AuthEnableRequest, opts ...grpc.CallOption) (resp *pb.AuthEnableResponse, err error) { err = rac.writeRetry(ctx, func(rctx context.Context) error { resp, err = rac.AuthClient.AuthEnable(rctx, in, opts...) return err @@ -596,7 +628,7 @@ func (rac *retryAuthClient) AuthEnable(ctx context.Context, in *pb.AuthEnableReq return resp, err } -func (rac *retryAuthClient) AuthDisable(ctx context.Context, in *pb.AuthDisableRequest, opts ...grpc.CallOption) (resp *pb.AuthDisableResponse, err error) { +func (rac *retryWriteAuthClient) AuthDisable(ctx context.Context, in *pb.AuthDisableRequest, opts ...grpc.CallOption) (resp *pb.AuthDisableResponse, err error) { err = rac.writeRetry(ctx, func(rctx context.Context) error { resp, err = rac.AuthClient.AuthDisable(rctx, in, opts...) return err @@ -604,7 +636,7 @@ func (rac *retryAuthClient) AuthDisable(ctx context.Context, in *pb.AuthDisableR return resp, err } -func (rac *retryAuthClient) UserAdd(ctx context.Context, in *pb.AuthUserAddRequest, opts ...grpc.CallOption) (resp *pb.AuthUserAddResponse, err error) { +func (rac *retryWriteAuthClient) UserAdd(ctx context.Context, in *pb.AuthUserAddRequest, opts ...grpc.CallOption) (resp *pb.AuthUserAddResponse, err error) { err = rac.writeRetry(ctx, func(rctx context.Context) error { resp, err = rac.AuthClient.UserAdd(rctx, in, opts...) return err @@ -612,7 +644,7 @@ func (rac *retryAuthClient) UserAdd(ctx context.Context, in *pb.AuthUserAddReque return resp, err } -func (rac *retryAuthClient) UserDelete(ctx context.Context, in *pb.AuthUserDeleteRequest, opts ...grpc.CallOption) (resp *pb.AuthUserDeleteResponse, err error) { +func (rac *retryWriteAuthClient) UserDelete(ctx context.Context, in *pb.AuthUserDeleteRequest, opts ...grpc.CallOption) (resp *pb.AuthUserDeleteResponse, err error) { err = rac.writeRetry(ctx, func(rctx context.Context) error { resp, err = rac.AuthClient.UserDelete(rctx, in, opts...) return err @@ -620,7 +652,7 @@ func (rac *retryAuthClient) UserDelete(ctx context.Context, in *pb.AuthUserDelet return resp, err } -func (rac *retryAuthClient) UserChangePassword(ctx context.Context, in *pb.AuthUserChangePasswordRequest, opts ...grpc.CallOption) (resp *pb.AuthUserChangePasswordResponse, err error) { +func (rac *retryWriteAuthClient) UserChangePassword(ctx context.Context, in *pb.AuthUserChangePasswordRequest, opts ...grpc.CallOption) (resp *pb.AuthUserChangePasswordResponse, err error) { err = rac.writeRetry(ctx, func(rctx context.Context) error { resp, err = rac.AuthClient.UserChangePassword(rctx, in, opts...) return err @@ -628,7 +660,7 @@ func (rac *retryAuthClient) UserChangePassword(ctx context.Context, in *pb.AuthU return resp, err } -func (rac *retryAuthClient) UserGrantRole(ctx context.Context, in *pb.AuthUserGrantRoleRequest, opts ...grpc.CallOption) (resp *pb.AuthUserGrantRoleResponse, err error) { +func (rac *retryWriteAuthClient) UserGrantRole(ctx context.Context, in *pb.AuthUserGrantRoleRequest, opts ...grpc.CallOption) (resp *pb.AuthUserGrantRoleResponse, err error) { err = rac.writeRetry(ctx, func(rctx context.Context) error { resp, err = rac.AuthClient.UserGrantRole(rctx, in, opts...) return err @@ -636,7 +668,7 @@ func (rac *retryAuthClient) UserGrantRole(ctx context.Context, in *pb.AuthUserGr return resp, err } -func (rac *retryAuthClient) UserRevokeRole(ctx context.Context, in *pb.AuthUserRevokeRoleRequest, opts ...grpc.CallOption) (resp *pb.AuthUserRevokeRoleResponse, err error) { +func (rac *retryWriteAuthClient) UserRevokeRole(ctx context.Context, in *pb.AuthUserRevokeRoleRequest, opts ...grpc.CallOption) (resp *pb.AuthUserRevokeRoleResponse, err error) { err = rac.writeRetry(ctx, func(rctx context.Context) error { resp, err = rac.AuthClient.UserRevokeRole(rctx, in, opts...) return err @@ -644,7 +676,7 @@ func (rac *retryAuthClient) UserRevokeRole(ctx context.Context, in *pb.AuthUserR return resp, err } -func (rac *retryAuthClient) RoleAdd(ctx context.Context, in *pb.AuthRoleAddRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleAddResponse, err error) { +func (rac *retryWriteAuthClient) RoleAdd(ctx context.Context, in *pb.AuthRoleAddRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleAddResponse, err error) { err = rac.writeRetry(ctx, func(rctx context.Context) error { resp, err = rac.AuthClient.RoleAdd(rctx, in, opts...) return err @@ -652,7 +684,7 @@ func (rac *retryAuthClient) RoleAdd(ctx context.Context, in *pb.AuthRoleAddReque return resp, err } -func (rac *retryAuthClient) RoleDelete(ctx context.Context, in *pb.AuthRoleDeleteRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleDeleteResponse, err error) { +func (rac *retryWriteAuthClient) RoleDelete(ctx context.Context, in *pb.AuthRoleDeleteRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleDeleteResponse, err error) { err = rac.writeRetry(ctx, func(rctx context.Context) error { resp, err = rac.AuthClient.RoleDelete(rctx, in, opts...) return err @@ -660,7 +692,7 @@ func (rac *retryAuthClient) RoleDelete(ctx context.Context, in *pb.AuthRoleDelet return resp, err } -func (rac *retryAuthClient) RoleGrantPermission(ctx context.Context, in *pb.AuthRoleGrantPermissionRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleGrantPermissionResponse, err error) { +func (rac *retryWriteAuthClient) RoleGrantPermission(ctx context.Context, in *pb.AuthRoleGrantPermissionRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleGrantPermissionResponse, err error) { err = rac.writeRetry(ctx, func(rctx context.Context) error { resp, err = rac.AuthClient.RoleGrantPermission(rctx, in, opts...) return err @@ -668,10 +700,18 @@ func (rac *retryAuthClient) RoleGrantPermission(ctx context.Context, in *pb.Auth return resp, err } -func (rac *retryAuthClient) RoleRevokePermission(ctx context.Context, in *pb.AuthRoleRevokePermissionRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleRevokePermissionResponse, err error) { +func (rac *retryWriteAuthClient) RoleRevokePermission(ctx context.Context, in *pb.AuthRoleRevokePermissionRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleRevokePermissionResponse, err error) { err = rac.writeRetry(ctx, func(rctx context.Context) error { resp, err = rac.AuthClient.RoleRevokePermission(rctx, in, opts...) return err }) return resp, err } + +func (rac *retryWriteAuthClient) Authenticate(ctx context.Context, in *pb.AuthenticateRequest, opts ...grpc.CallOption) (resp *pb.AuthenticateResponse, err error) { + err = rac.writeRetry(ctx, func(rctx context.Context) error { + resp, err = rac.AuthClient.Authenticate(rctx, in, opts...) + return err + }) + return resp, err +} From 140cb0c7b377dc130c22b93d2aa76a2133fcead5 Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Mon, 16 Oct 2017 13:02:04 -0700 Subject: [PATCH 13/17] clientv3/integration: match grpc.ErrClientConnClosing in TestKVNewAfterClose Signed-off-by: Gyu-Ho Lee --- clientv3/integration/kv_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/clientv3/integration/kv_test.go b/clientv3/integration/kv_test.go index 62d915a858f..161fb2525f0 100644 --- a/clientv3/integration/kv_test.go +++ b/clientv3/integration/kv_test.go @@ -473,8 +473,8 @@ func TestKVNewAfterClose(t *testing.T) { donec := make(chan struct{}) go func() { - if _, err := cli.Get(context.TODO(), "foo"); err != context.Canceled { - t.Fatalf("expected %v, got %v", context.Canceled, err) + if _, err := cli.Get(context.TODO(), "foo"); err != context.Canceled && err != grpc.ErrClientConnClosing { + t.Fatalf("expected %v or %v, got %v", context.Canceled, grpc.ErrClientConnClosing, err) } close(donec) }() From 5bdbcba5f2bc05af3fa45d1b7d19f1b855b13967 Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Tue, 17 Oct 2017 14:27:14 -0700 Subject: [PATCH 14/17] clientv3/integration: match ErrTimeout in testNetworkPartitionBalancer Signed-off-by: Gyu-Ho Lee --- clientv3/integration/network_partition_test.go | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/clientv3/integration/network_partition_test.go b/clientv3/integration/network_partition_test.go index 573b1778c12..fe1831a9440 100644 --- a/clientv3/integration/network_partition_test.go +++ b/clientv3/integration/network_partition_test.go @@ -22,6 +22,7 @@ import ( "time" "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/integration" "github.com/coreos/etcd/pkg/testutil" ) @@ -33,7 +34,7 @@ func TestNetworkPartitionBalancerPut(t *testing.T) { testNetworkPartitionBalancer(t, func(cli *clientv3.Client, ctx context.Context) error { _, err := cli.Put(ctx, "a", "b") return err - }) + }, []error{context.DeadlineExceeded, etcdserver.ErrTimeout}) } // TestNetworkPartitionBalancerGet tests when one member becomes isolated, @@ -43,10 +44,10 @@ func TestNetworkPartitionBalancerGet(t *testing.T) { testNetworkPartitionBalancer(t, func(cli *clientv3.Client, ctx context.Context) error { _, err := cli.Get(ctx, "a") return err - }) + }, []error{context.DeadlineExceeded}) } -func testNetworkPartitionBalancer(t *testing.T, op func(*clientv3.Client, context.Context) error) { +func testNetworkPartitionBalancer(t *testing.T, op func(*clientv3.Client, context.Context) error, errs []error) { defer testutil.AfterTest(t) clus := integration.NewClusterV3(t, &integration.ClusterConfig{ @@ -82,8 +83,15 @@ func testNetworkPartitionBalancer(t *testing.T, op func(*clientv3.Client, contex if err == nil { break } - if err != context.DeadlineExceeded { - t.Fatalf("#%d: expected %v, got %v", i, context.DeadlineExceeded, err) + match := false + for _, e := range errs { + if err == e { + match = true + break + } + } + if !match { + t.Fatalf("#%d: expected %+v, got %v", i, errs, err) } // give enough time for endpoint switch // TODO: remove random sleep by syncing directly with balancer From 1ee5f32c04efd119937e90596b56a465d908b7de Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Tue, 17 Oct 2017 20:36:31 -0700 Subject: [PATCH 15/17] clientv3/integration: match errors in leasing tests Signed-off-by: Gyu-Ho Lee --- clientv3/integration/leasing_test.go | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/clientv3/integration/leasing_test.go b/clientv3/integration/leasing_test.go index b56478361d9..98a9b58771c 100644 --- a/clientv3/integration/leasing_test.go +++ b/clientv3/integration/leasing_test.go @@ -19,6 +19,7 @@ import ( "fmt" "math/rand" "reflect" + "strings" "sync" "testing" "time" @@ -28,6 +29,8 @@ import ( "github.com/coreos/etcd/clientv3/leasing" "github.com/coreos/etcd/integration" "github.com/coreos/etcd/pkg/testutil" + + "google.golang.org/grpc/transport" ) func TestLeasingPutGet(t *testing.T) { @@ -1083,8 +1086,8 @@ func TestLeasingOwnerPutError(t *testing.T) { clus.Members[0].Stop(t) ctx, cancel := context.WithTimeout(context.TODO(), 100*time.Millisecond) defer cancel() - if resp, err := lkv.Put(ctx, "k", "v"); err == nil { - t.Fatalf("expected error, got response %+v", resp) + if resp, err := lkv.Put(ctx, "k", "v"); err != context.DeadlineExceeded && !strings.Contains(err.Error(), "transport is closing") { + t.Fatalf("expected %v or %v, got %v, response %+v", context.DeadlineExceeded, transport.ErrConnClosing, err, resp) } } @@ -1104,8 +1107,8 @@ func TestLeasingOwnerDeleteError(t *testing.T) { clus.Members[0].Stop(t) ctx, cancel := context.WithTimeout(context.TODO(), 100*time.Millisecond) defer cancel() - if resp, err := lkv.Delete(ctx, "k"); err == nil { - t.Fatalf("expected error, got response %+v", resp) + if resp, err := lkv.Delete(ctx, "k"); err != context.DeadlineExceeded && !strings.Contains(err.Error(), "transport is closing") { + t.Fatalf("expected %v or %v, got %v, response %+v", context.DeadlineExceeded, transport.ErrConnClosing, err, resp) } } @@ -1121,8 +1124,8 @@ func TestLeasingNonOwnerPutError(t *testing.T) { clus.Members[0].Stop(t) ctx, cancel := context.WithTimeout(context.TODO(), 100*time.Millisecond) defer cancel() - if resp, err := lkv.Put(ctx, "k", "v"); err == nil { - t.Fatalf("expected error, got response %+v", resp) + if resp, err := lkv.Put(ctx, "k", "v"); err != context.DeadlineExceeded && !strings.Contains(err.Error(), "transport is closing") { + t.Fatalf("expected %v or %v, got %v, response %+v", context.DeadlineExceeded, transport.ErrConnClosing, err, resp) } } From 1e217efa3a7e7136785d59cdd11268ea2da2e477 Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Tue, 17 Oct 2017 22:02:23 -0700 Subject: [PATCH 16/17] clientv3/integration: increase time-out for endpoint switch in TestKVGetResetLoneEndpoint Since we have added additional wait/sync when error "there is no available address". Signed-off-by: Gyu-Ho Lee --- clientv3/integration/kv_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clientv3/integration/kv_test.go b/clientv3/integration/kv_test.go index 161fb2525f0..b549096f1e0 100644 --- a/clientv3/integration/kv_test.go +++ b/clientv3/integration/kv_test.go @@ -881,7 +881,7 @@ func TestKVGetResetLoneEndpoint(t *testing.T) { // have Get try to reconnect donec := make(chan struct{}) go func() { - ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) + ctx, cancel := context.WithTimeout(context.TODO(), 8*time.Second) if _, err := cli.Get(ctx, "abc", clientv3.WithSerializable()); err != nil { t.Fatal(err) } From 7c9cf817a1a19ed33a5a0df7695eea07023b6571 Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Wed, 18 Oct 2017 18:10:37 -0700 Subject: [PATCH 17/17] clientv3/integration: increase timeout in watch connection close tests Integration tests have dial timeout 5-sec, and it's possible that balancer retry logic waits 5-sec and test times out because gRPC calls grpc.downErr function after connection wait starts in retrial part. Just increasing time-out should be ok. In most cases, grpc.downErr gets called before starting the wait. e.g. === RUN TestWatchErrConnClosed INFO: 2017/10/18 23:55:39 clientv3/balancer: pin "localhost:91847156765553894590" INFO: 2017/10/18 23:55:39 clientv3/retry: wait 5s for healthy endpoint INFO: 2017/10/18 23:55:39 clientv3/balancer: unpin "localhost:91847156765553894590" ("grpc: the client connection is closing") INFO: 2017/10/18 23:55:39 clientv3/health-balancer: "localhost:91847156765553894590" becomes unhealthy ("grpc: the client connection is closing") --- F.A.I.L: TestWatchErrConnClosed (3.07s) watch_test.go:682: wc.Watch took too long Signed-off-by: Gyu-Ho Lee --- clientv3/integration/watch_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/clientv3/integration/watch_test.go b/clientv3/integration/watch_test.go index dfd2e508341..76109821316 100644 --- a/clientv3/integration/watch_test.go +++ b/clientv3/integration/watch_test.go @@ -678,7 +678,7 @@ func TestWatchErrConnClosed(t *testing.T) { clus.TakeClient(0) select { - case <-time.After(3 * time.Second): + case <-time.After(10 * time.Second): t.Fatal("wc.Watch took too long") case <-donec: } @@ -705,7 +705,7 @@ func TestWatchAfterClose(t *testing.T) { close(donec) }() select { - case <-time.After(3 * time.Second): + case <-time.After(10 * time.Second): t.Fatal("wc.Watch took too long") case <-donec: }