diff --git a/.words b/.words index 66f5340b465..6dbf8f3ac1f 100644 --- a/.words +++ b/.words @@ -1,7 +1,14 @@ +ConnectionError ErrCodeEnhanceYourCalm +ErrConnClosing +ErrEmptyKey +ErrNoSpace +ErrTimeout +FailFast GoAway RPC RPCs +StreamError backoff blackholed cancelable @@ -9,6 +16,7 @@ cancelation cluster_proxy defragment defragmenting +downErr etcd gRPC goroutine @@ -25,9 +33,10 @@ mutex prefetching protobuf serializable +statusError teardown +toRPCErr too_many_pings uncontended unprefixed unlisting - diff --git a/clientv3/auth.go b/clientv3/auth.go index a6ab4684140..8df670f163a 100644 --- a/clientv3/auth.go +++ b/clientv3/auth.go @@ -105,16 +105,16 @@ type auth struct { } func NewAuth(c *Client) Auth { - return &auth{remote: pb.NewAuthClient(c.ActiveConnection())} + return &auth{remote: RetryAuthClient(c)} } func (auth *auth) AuthEnable(ctx context.Context) (*AuthEnableResponse, error) { - resp, err := auth.remote.AuthEnable(ctx, &pb.AuthEnableRequest{}, grpc.FailFast(false)) + resp, err := auth.remote.AuthEnable(ctx, &pb.AuthEnableRequest{}) return (*AuthEnableResponse)(resp), toErr(ctx, err) } func (auth *auth) AuthDisable(ctx context.Context) (*AuthDisableResponse, error) { - resp, err := auth.remote.AuthDisable(ctx, &pb.AuthDisableRequest{}, grpc.FailFast(false)) + resp, err := auth.remote.AuthDisable(ctx, &pb.AuthDisableRequest{}) return (*AuthDisableResponse)(resp), toErr(ctx, err) } @@ -139,12 +139,12 @@ func (auth *auth) UserGrantRole(ctx context.Context, user string, role string) ( } func (auth *auth) UserGet(ctx context.Context, name string) (*AuthUserGetResponse, error) { - resp, err := auth.remote.UserGet(ctx, &pb.AuthUserGetRequest{Name: name}, grpc.FailFast(false)) + resp, err := auth.remote.UserGet(ctx, &pb.AuthUserGetRequest{Name: name}) return (*AuthUserGetResponse)(resp), toErr(ctx, err) } func (auth *auth) UserList(ctx context.Context) (*AuthUserListResponse, error) { - resp, err := auth.remote.UserList(ctx, &pb.AuthUserListRequest{}, grpc.FailFast(false)) + resp, err := auth.remote.UserList(ctx, &pb.AuthUserListRequest{}) return (*AuthUserListResponse)(resp), toErr(ctx, err) } @@ -169,12 +169,12 @@ func (auth *auth) RoleGrantPermission(ctx context.Context, name string, key, ran } func (auth *auth) RoleGet(ctx context.Context, role string) (*AuthRoleGetResponse, error) { - resp, err := auth.remote.RoleGet(ctx, &pb.AuthRoleGetRequest{Role: role}, grpc.FailFast(false)) + resp, err := auth.remote.RoleGet(ctx, &pb.AuthRoleGetRequest{Role: role}) return (*AuthRoleGetResponse)(resp), toErr(ctx, err) } func (auth *auth) RoleList(ctx context.Context) (*AuthRoleListResponse, error) { - resp, err := auth.remote.RoleList(ctx, &pb.AuthRoleListRequest{}, grpc.FailFast(false)) + resp, err := auth.remote.RoleList(ctx, &pb.AuthRoleListRequest{}) return (*AuthRoleListResponse)(resp), toErr(ctx, err) } @@ -202,7 +202,7 @@ type authenticator struct { } func (auth *authenticator) authenticate(ctx context.Context, name string, password string) (*AuthenticateResponse, error) { - resp, err := auth.remote.Authenticate(ctx, &pb.AuthenticateRequest{Name: name, Password: password}, grpc.FailFast(false)) + resp, err := auth.remote.Authenticate(ctx, &pb.AuthenticateRequest{Name: name, Password: password}) return (*AuthenticateResponse)(resp), toErr(ctx, err) } diff --git a/clientv3/balancer.go b/clientv3/balancer.go index cf7419b54d1..83b6796aba6 100644 --- a/clientv3/balancer.go +++ b/clientv3/balancer.go @@ -45,7 +45,10 @@ type balancer interface { // pinned returns the current pinned endpoint. pinned() string // endpointError handles error from server-side. - endpointError(addr string, err error) + endpointError(host string, err error) + // returns errorInfo of host, if any. + // ok is false, if host is healthy. + isFailed(host string) (ev errorInfo, ok bool) // up is Up but includes whether the balancer will use the connection. up(addr grpc.Address) (func(error), bool) @@ -152,7 +155,13 @@ func (b *simpleBalancer) pinned() string { return b.pinAddr } -func (b *simpleBalancer) endpointError(addr string, err error) { return } +func (b *simpleBalancer) endpointError(host string, err error) { + panic("'endpointError' not implemented") +} + +func (b *simpleBalancer) isFailed(host string) (errorInfo, bool) { + panic("'error' not implemented") +} func getHost2ep(eps []string) map[string]string { hm := make(map[string]string, len(eps)) @@ -316,7 +325,7 @@ func (b *simpleBalancer) up(addr grpc.Address) (func(error), bool) { } if b.pinAddr != "" { if logger.V(4) { - logger.Infof("clientv3/balancer: %s is up but not pinned (already pinned %s)", addr.Addr, b.pinAddr) + logger.Infof("clientv3/balancer: %q is up but not pinned (already pinned %q)", addr.Addr, b.pinAddr) } return func(err error) {}, false } @@ -325,7 +334,7 @@ func (b *simpleBalancer) up(addr grpc.Address) (func(error), bool) { b.downc = make(chan struct{}) b.pinAddr = addr.Addr if logger.V(4) { - logger.Infof("clientv3/balancer: pin %s", addr.Addr) + logger.Infof("clientv3/balancer: pin %q", addr.Addr) } // notify client that a connection is up b.readyOnce.Do(func() { close(b.readyc) }) @@ -336,7 +345,7 @@ func (b *simpleBalancer) up(addr grpc.Address) (func(error), bool) { b.pinAddr = "" b.mu.Unlock() if logger.V(4) { - logger.Infof("clientv3/balancer: unpin %s (%v)", addr.Addr, err) + logger.Infof("clientv3/balancer: unpin %q (%q)", addr.Addr, err.Error()) } }, true } diff --git a/clientv3/cluster.go b/clientv3/cluster.go index bbecaaca74e..8beba58a67b 100644 --- a/clientv3/cluster.go +++ b/clientv3/cluster.go @@ -18,7 +18,6 @@ import ( "context" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" - "google.golang.org/grpc" ) type ( @@ -75,27 +74,19 @@ func (c *cluster) MemberRemove(ctx context.Context, id uint64) (*MemberRemoveRes func (c *cluster) MemberUpdate(ctx context.Context, id uint64, peerAddrs []string) (*MemberUpdateResponse, error) { // it is safe to retry on update. - for { - r := &pb.MemberUpdateRequest{ID: id, PeerURLs: peerAddrs} - resp, err := c.remote.MemberUpdate(ctx, r, grpc.FailFast(false)) - if err == nil { - return (*MemberUpdateResponse)(resp), nil - } - if isHaltErr(ctx, err) { - return nil, toErr(ctx, err) - } + r := &pb.MemberUpdateRequest{ID: id, PeerURLs: peerAddrs} + resp, err := c.remote.MemberUpdate(ctx, r) + if err == nil { + return (*MemberUpdateResponse)(resp), nil } + return nil, toErr(ctx, err) } func (c *cluster) MemberList(ctx context.Context) (*MemberListResponse, error) { // it is safe to retry on list. - for { - resp, err := c.remote.MemberList(ctx, &pb.MemberListRequest{}, grpc.FailFast(false)) - if err == nil { - return (*MemberListResponse)(resp), nil - } - if isHaltErr(ctx, err) { - return nil, toErr(ctx, err) - } + resp, err := c.remote.MemberList(ctx, &pb.MemberListRequest{}) + if err == nil { + return (*MemberListResponse)(resp), nil } + return nil, toErr(ctx, err) } diff --git a/clientv3/health_balancer.go b/clientv3/health_balancer.go index dd4958c8084..bc47f866100 100644 --- a/clientv3/health_balancer.go +++ b/clientv3/health_balancer.go @@ -30,6 +30,11 @@ const unknownService = "unknown service grpc.health.v1.Health" type healthCheckFunc func(ep string) (bool, error) +type errorInfo struct { + failed time.Time + err error +} + // healthBalancer wraps a balancer so that it uses health checking // to choose its endpoints. type healthBalancer struct { @@ -48,8 +53,8 @@ type healthBalancer struct { // eps stores all client endpoints eps []string - // unhealthy tracks the last unhealthy time of endpoints. - unhealthy map[string]time.Time + // unhealthyHosts tracks the last unhealthy time of endpoints. + unhealthyHosts map[string]errorInfo stopc chan struct{} stopOnce sync.Once @@ -61,13 +66,13 @@ type healthBalancer struct { func newHealthBalancer(b balancer, timeout time.Duration, hc healthCheckFunc) *healthBalancer { hb := &healthBalancer{ - balancer: b, - healthCheck: hc, - eps: b.endpoints(), - addrs: eps2addrs(b.endpoints()), - host2ep: getHost2ep(b.endpoints()), - unhealthy: make(map[string]time.Time), - stopc: make(chan struct{}), + balancer: b, + healthCheck: hc, + eps: b.endpoints(), + addrs: eps2addrs(b.endpoints()), + host2ep: getHost2ep(b.endpoints()), + unhealthyHosts: make(map[string]errorInfo), + stopc: make(chan struct{}), } if timeout < minHealthRetryDuration { timeout = minHealthRetryDuration @@ -94,11 +99,11 @@ func (hb *healthBalancer) Up(addr grpc.Address) func(error) { // finding healthy endpoint on retry could take several timeouts and redials. // To avoid wasting retries, gray-list unhealthy endpoints. hb.mu.Lock() - hb.unhealthy[addr.Addr] = time.Now() + hb.unhealthyHosts[addr.Addr] = errorInfo{failed: time.Now(), err: err} hb.mu.Unlock() f(err) if logger.V(4) { - logger.Infof("clientv3/health-balancer: %s becomes unhealthy (%v)", addr.Addr, err) + logger.Infof("clientv3/health-balancer: %q becomes unhealthy (%q)", addr.Addr, err.Error()) } } } @@ -120,7 +125,7 @@ func (hb *healthBalancer) updateAddrs(eps ...string) { addrs, host2ep := eps2addrs(eps), getHost2ep(eps) hb.mu.Lock() hb.addrs, hb.eps, hb.host2ep = addrs, eps, host2ep - hb.unhealthy = make(map[string]time.Time) + hb.unhealthyHosts = make(map[string]errorInfo) hb.mu.Unlock() hb.balancer.updateAddrs(eps...) } @@ -142,11 +147,18 @@ func (hb *healthBalancer) updateUnhealthy(timeout time.Duration) { select { case <-time.After(timeout): hb.mu.Lock() - for k, v := range hb.unhealthy { - if time.Since(v) > timeout { - delete(hb.unhealthy, k) + for k, v := range hb.unhealthyHosts { + if _, ok := hb.host2ep[k]; !ok { + delete(hb.unhealthyHosts, k) + if logger.V(4) { + logger.Infof("clientv3/health-balancer: removes stale endpoint %q from unhealthy", k) + } + continue + } + if time.Since(v.failed) > timeout { + delete(hb.unhealthyHosts, k) if logger.V(4) { - logger.Infof("clientv3/health-balancer: removes %s from unhealthy after %v", k, timeout) + logger.Infof("clientv3/health-balancer: removes %q from unhealthy after %v", k, timeout) } } } @@ -166,31 +178,42 @@ func (hb *healthBalancer) liveAddrs() []grpc.Address { hb.mu.RLock() defer hb.mu.RUnlock() hbAddrs := hb.addrs - if len(hb.addrs) == 1 || len(hb.unhealthy) == 0 || len(hb.unhealthy) == len(hb.addrs) { + if len(hb.addrs) == 1 || len(hb.unhealthyHosts) == 0 || len(hb.unhealthyHosts) == len(hb.addrs) { return hbAddrs } - addrs := make([]grpc.Address, 0, len(hb.addrs)-len(hb.unhealthy)) + addrs := make([]grpc.Address, 0, len(hb.addrs)-len(hb.unhealthyHosts)) for _, addr := range hb.addrs { - if _, unhealthy := hb.unhealthy[addr.Addr]; !unhealthy { + if _, unhealthy := hb.unhealthyHosts[addr.Addr]; !unhealthy { addrs = append(addrs, addr) } } return addrs } -func (hb *healthBalancer) endpointError(addr string, err error) { +func (hb *healthBalancer) endpointError(host string, err error) { hb.mu.Lock() - hb.unhealthy[addr] = time.Now() + hb.unhealthyHosts[host] = errorInfo{failed: time.Now(), err: err} hb.mu.Unlock() if logger.V(4) { - logger.Infof("clientv3/health-balancer: marking %s as unhealthy (%v)", addr, err) + logger.Infof("clientv3/health-balancer: marking %q as unhealthy (%q)", host, err.Error()) } } +func (hb *healthBalancer) isFailed(host string) (ev errorInfo, ok bool) { + hb.mu.RLock() + ev, ok = hb.unhealthyHosts[host] + hb.mu.RUnlock() + return ev, ok +} + func (hb *healthBalancer) mayPin(addr grpc.Address) bool { hb.mu.RLock() - skip := len(hb.addrs) == 1 || len(hb.unhealthy) == 0 || len(hb.addrs) == len(hb.unhealthy) - failedTime, bad := hb.unhealthy[addr.Addr] + if _, ok := hb.host2ep[addr.Addr]; !ok { + hb.mu.RUnlock() + return false + } + skip := len(hb.addrs) == 1 || len(hb.unhealthyHosts) == 0 || len(hb.addrs) == len(hb.unhealthyHosts) + ef, bad := hb.unhealthyHosts[addr.Addr] dur := hb.healthCheckTimeout hb.mu.RUnlock() if skip || !bad { @@ -201,26 +224,27 @@ func (hb *healthBalancer) mayPin(addr grpc.Address) bool { // 2. balancer 'Up' unpins with grpc: failed with network I/O error // 3. grpc-healthcheck still SERVING, thus retry to pin // instead, return before grpc-healthcheck if failed within healthcheck timeout - if elapsed := time.Since(failedTime); elapsed < dur { + if elapsed := time.Since(ef.failed); elapsed < dur { if logger.V(4) { - logger.Infof("clientv3/health-balancer: %s is up but not pinned (failed %v ago, require minimum %v after failure)", addr.Addr, elapsed, dur) + logger.Infof("clientv3/health-balancer: %q is up but not pinned (failed %v ago, require minimum %v after failure)", addr.Addr, elapsed, dur) } return false } - if ok, _ := hb.healthCheck(addr.Addr); ok { + ok, err := hb.healthCheck(addr.Addr) + if ok { hb.mu.Lock() - delete(hb.unhealthy, addr.Addr) + delete(hb.unhealthyHosts, addr.Addr) hb.mu.Unlock() if logger.V(4) { - logger.Infof("clientv3/health-balancer: %s is healthy (health check success)", addr.Addr) + logger.Infof("clientv3/health-balancer: %q is healthy (health check success)", addr.Addr) } return true } hb.mu.Lock() - hb.unhealthy[addr.Addr] = time.Now() + hb.unhealthyHosts[addr.Addr] = errorInfo{failed: time.Now(), err: err} hb.mu.Unlock() if logger.V(4) { - logger.Infof("clientv3/health-balancer: %s becomes unhealthy (health check failed)", addr.Addr) + logger.Infof("clientv3/health-balancer: %q becomes unhealthy (health check failed)", addr.Addr) } return false } diff --git a/clientv3/integration/kv_test.go b/clientv3/integration/kv_test.go index 62d915a858f..b549096f1e0 100644 --- a/clientv3/integration/kv_test.go +++ b/clientv3/integration/kv_test.go @@ -473,8 +473,8 @@ func TestKVNewAfterClose(t *testing.T) { donec := make(chan struct{}) go func() { - if _, err := cli.Get(context.TODO(), "foo"); err != context.Canceled { - t.Fatalf("expected %v, got %v", context.Canceled, err) + if _, err := cli.Get(context.TODO(), "foo"); err != context.Canceled && err != grpc.ErrClientConnClosing { + t.Fatalf("expected %v or %v, got %v", context.Canceled, grpc.ErrClientConnClosing, err) } close(donec) }() @@ -881,7 +881,7 @@ func TestKVGetResetLoneEndpoint(t *testing.T) { // have Get try to reconnect donec := make(chan struct{}) go func() { - ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) + ctx, cancel := context.WithTimeout(context.TODO(), 8*time.Second) if _, err := cli.Get(ctx, "abc", clientv3.WithSerializable()); err != nil { t.Fatal(err) } diff --git a/clientv3/integration/leasing_test.go b/clientv3/integration/leasing_test.go index b56478361d9..98a9b58771c 100644 --- a/clientv3/integration/leasing_test.go +++ b/clientv3/integration/leasing_test.go @@ -19,6 +19,7 @@ import ( "fmt" "math/rand" "reflect" + "strings" "sync" "testing" "time" @@ -28,6 +29,8 @@ import ( "github.com/coreos/etcd/clientv3/leasing" "github.com/coreos/etcd/integration" "github.com/coreos/etcd/pkg/testutil" + + "google.golang.org/grpc/transport" ) func TestLeasingPutGet(t *testing.T) { @@ -1083,8 +1086,8 @@ func TestLeasingOwnerPutError(t *testing.T) { clus.Members[0].Stop(t) ctx, cancel := context.WithTimeout(context.TODO(), 100*time.Millisecond) defer cancel() - if resp, err := lkv.Put(ctx, "k", "v"); err == nil { - t.Fatalf("expected error, got response %+v", resp) + if resp, err := lkv.Put(ctx, "k", "v"); err != context.DeadlineExceeded && !strings.Contains(err.Error(), "transport is closing") { + t.Fatalf("expected %v or %v, got %v, response %+v", context.DeadlineExceeded, transport.ErrConnClosing, err, resp) } } @@ -1104,8 +1107,8 @@ func TestLeasingOwnerDeleteError(t *testing.T) { clus.Members[0].Stop(t) ctx, cancel := context.WithTimeout(context.TODO(), 100*time.Millisecond) defer cancel() - if resp, err := lkv.Delete(ctx, "k"); err == nil { - t.Fatalf("expected error, got response %+v", resp) + if resp, err := lkv.Delete(ctx, "k"); err != context.DeadlineExceeded && !strings.Contains(err.Error(), "transport is closing") { + t.Fatalf("expected %v or %v, got %v, response %+v", context.DeadlineExceeded, transport.ErrConnClosing, err, resp) } } @@ -1121,8 +1124,8 @@ func TestLeasingNonOwnerPutError(t *testing.T) { clus.Members[0].Stop(t) ctx, cancel := context.WithTimeout(context.TODO(), 100*time.Millisecond) defer cancel() - if resp, err := lkv.Put(ctx, "k", "v"); err == nil { - t.Fatalf("expected error, got response %+v", resp) + if resp, err := lkv.Put(ctx, "k", "v"); err != context.DeadlineExceeded && !strings.Contains(err.Error(), "transport is closing") { + t.Fatalf("expected %v or %v, got %v, response %+v", context.DeadlineExceeded, transport.ErrConnClosing, err, resp) } } diff --git a/clientv3/integration/network_partition_test.go b/clientv3/integration/network_partition_test.go index 573b1778c12..fe1831a9440 100644 --- a/clientv3/integration/network_partition_test.go +++ b/clientv3/integration/network_partition_test.go @@ -22,6 +22,7 @@ import ( "time" "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/integration" "github.com/coreos/etcd/pkg/testutil" ) @@ -33,7 +34,7 @@ func TestNetworkPartitionBalancerPut(t *testing.T) { testNetworkPartitionBalancer(t, func(cli *clientv3.Client, ctx context.Context) error { _, err := cli.Put(ctx, "a", "b") return err - }) + }, []error{context.DeadlineExceeded, etcdserver.ErrTimeout}) } // TestNetworkPartitionBalancerGet tests when one member becomes isolated, @@ -43,10 +44,10 @@ func TestNetworkPartitionBalancerGet(t *testing.T) { testNetworkPartitionBalancer(t, func(cli *clientv3.Client, ctx context.Context) error { _, err := cli.Get(ctx, "a") return err - }) + }, []error{context.DeadlineExceeded}) } -func testNetworkPartitionBalancer(t *testing.T, op func(*clientv3.Client, context.Context) error) { +func testNetworkPartitionBalancer(t *testing.T, op func(*clientv3.Client, context.Context) error, errs []error) { defer testutil.AfterTest(t) clus := integration.NewClusterV3(t, &integration.ClusterConfig{ @@ -82,8 +83,15 @@ func testNetworkPartitionBalancer(t *testing.T, op func(*clientv3.Client, contex if err == nil { break } - if err != context.DeadlineExceeded { - t.Fatalf("#%d: expected %v, got %v", i, context.DeadlineExceeded, err) + match := false + for _, e := range errs { + if err == e { + match = true + break + } + } + if !match { + t.Fatalf("#%d: expected %+v, got %v", i, errs, err) } // give enough time for endpoint switch // TODO: remove random sleep by syncing directly with balancer diff --git a/clientv3/integration/watch_test.go b/clientv3/integration/watch_test.go index dfd2e508341..76109821316 100644 --- a/clientv3/integration/watch_test.go +++ b/clientv3/integration/watch_test.go @@ -678,7 +678,7 @@ func TestWatchErrConnClosed(t *testing.T) { clus.TakeClient(0) select { - case <-time.After(3 * time.Second): + case <-time.After(10 * time.Second): t.Fatal("wc.Watch took too long") case <-donec: } @@ -705,7 +705,7 @@ func TestWatchAfterClose(t *testing.T) { close(donec) }() select { - case <-time.After(3 * time.Second): + case <-time.After(10 * time.Second): t.Fatal("wc.Watch took too long") case <-donec: } diff --git a/clientv3/kv.go b/clientv3/kv.go index ead08a0827f..160a48110d8 100644 --- a/clientv3/kv.go +++ b/clientv3/kv.go @@ -18,8 +18,6 @@ import ( "context" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" - - "google.golang.org/grpc" ) type ( @@ -90,15 +88,17 @@ func (resp *TxnResponse) OpResponse() OpResponse { } type kv struct { - remote pb.KVClient + readWrite pb.KVClient + readOnly pb.KVClient } func NewKV(c *Client) KV { - return &kv{remote: RetryKVClient(c)} + readWrite, readOnly := RetryKVClient(c) + return &kv{readWrite: readWrite, readOnly: readOnly} } func NewKVFromKVClient(remote pb.KVClient) KV { - return &kv{remote: remote} + return &kv{readWrite: remote, readOnly: remote} } func (kv *kv) Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error) { @@ -117,7 +117,7 @@ func (kv *kv) Delete(ctx context.Context, key string, opts ...OpOption) (*Delete } func (kv *kv) Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error) { - resp, err := kv.remote.Compact(ctx, OpCompact(rev, opts...).toRequest()) + resp, err := kv.readWrite.Compact(ctx, OpCompact(rev, opts...).toRequest()) if err != nil { return nil, toErr(ctx, err) } @@ -132,53 +132,36 @@ func (kv *kv) Txn(ctx context.Context) Txn { } func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) { - for { - resp, err := kv.do(ctx, op) - if err == nil { - return resp, nil - } - - if isHaltErr(ctx, err) { - return resp, toErr(ctx, err) - } - // do not retry on modifications - if op.isWrite() { - return resp, toErr(ctx, err) - } - } -} - -func (kv *kv) do(ctx context.Context, op Op) (OpResponse, error) { var err error switch op.t { case tRange: var resp *pb.RangeResponse - resp, err = kv.remote.Range(ctx, op.toRangeRequest(), grpc.FailFast(false)) + resp, err = kv.readWrite.Range(ctx, op.toRangeRequest()) if err == nil { return OpResponse{get: (*GetResponse)(resp)}, nil } case tPut: var resp *pb.PutResponse r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID), PrevKv: op.prevKV, IgnoreValue: op.ignoreValue, IgnoreLease: op.ignoreLease} - resp, err = kv.remote.Put(ctx, r) + resp, err = kv.readWrite.Put(ctx, r) if err == nil { return OpResponse{put: (*PutResponse)(resp)}, nil } case tDeleteRange: var resp *pb.DeleteRangeResponse r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end, PrevKv: op.prevKV} - resp, err = kv.remote.DeleteRange(ctx, r) + resp, err = kv.readWrite.DeleteRange(ctx, r) if err == nil { return OpResponse{del: (*DeleteResponse)(resp)}, nil } case tTxn: var resp *pb.TxnResponse - resp, err = kv.remote.Txn(ctx, op.toTxnRequest()) + resp, err = kv.readWrite.Txn(ctx, op.toTxnRequest()) if err == nil { return OpResponse{txn: (*TxnResponse)(resp)}, nil } default: panic("Unknown op") } - return OpResponse{}, err + return OpResponse{}, toErr(ctx, err) } diff --git a/clientv3/lease.go b/clientv3/lease.go index e476db5be2e..6c2571ee04b 100644 --- a/clientv3/lease.go +++ b/clientv3/lease.go @@ -22,7 +22,6 @@ import ( "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" - "google.golang.org/grpc" "google.golang.org/grpc/metadata" ) @@ -183,72 +182,55 @@ func NewLeaseFromLeaseClient(remote pb.LeaseClient, keepAliveTimeout time.Durati } func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error) { - for { - r := &pb.LeaseGrantRequest{TTL: ttl} - resp, err := l.remote.LeaseGrant(ctx, r) - if err == nil { - gresp := &LeaseGrantResponse{ - ResponseHeader: resp.GetHeader(), - ID: LeaseID(resp.ID), - TTL: resp.TTL, - Error: resp.Error, - } - return gresp, nil - } - if isHaltErr(ctx, err) { - return nil, toErr(ctx, err) + r := &pb.LeaseGrantRequest{TTL: ttl} + resp, err := l.remote.LeaseGrant(ctx, r) + if err == nil { + gresp := &LeaseGrantResponse{ + ResponseHeader: resp.GetHeader(), + ID: LeaseID(resp.ID), + TTL: resp.TTL, + Error: resp.Error, } + return gresp, nil } + return nil, toErr(ctx, err) } func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error) { - for { - r := &pb.LeaseRevokeRequest{ID: int64(id)} - resp, err := l.remote.LeaseRevoke(ctx, r) - - if err == nil { - return (*LeaseRevokeResponse)(resp), nil - } - if isHaltErr(ctx, err) { - return nil, toErr(ctx, err) - } + r := &pb.LeaseRevokeRequest{ID: int64(id)} + resp, err := l.remote.LeaseRevoke(ctx, r) + if err == nil { + return (*LeaseRevokeResponse)(resp), nil } + return nil, toErr(ctx, err) } func (l *lessor) TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error) { - for { - r := toLeaseTimeToLiveRequest(id, opts...) - resp, err := l.remote.LeaseTimeToLive(ctx, r, grpc.FailFast(false)) - if err == nil { - gresp := &LeaseTimeToLiveResponse{ - ResponseHeader: resp.GetHeader(), - ID: LeaseID(resp.ID), - TTL: resp.TTL, - GrantedTTL: resp.GrantedTTL, - Keys: resp.Keys, - } - return gresp, nil - } - if isHaltErr(ctx, err) { - return nil, toErr(ctx, err) + r := toLeaseTimeToLiveRequest(id, opts...) + resp, err := l.remote.LeaseTimeToLive(ctx, r) + if err == nil { + gresp := &LeaseTimeToLiveResponse{ + ResponseHeader: resp.GetHeader(), + ID: LeaseID(resp.ID), + TTL: resp.TTL, + GrantedTTL: resp.GrantedTTL, + Keys: resp.Keys, } + return gresp, nil } + return nil, toErr(ctx, err) } func (l *lessor) Leases(ctx context.Context) (*LeaseLeasesResponse, error) { - for { - resp, err := l.remote.LeaseLeases(ctx, &pb.LeaseLeasesRequest{}, grpc.FailFast(false)) - if err == nil { - leases := make([]LeaseStatus, len(resp.Leases)) - for i := range resp.Leases { - leases[i] = LeaseStatus{ID: LeaseID(resp.Leases[i].ID)} - } - return &LeaseLeasesResponse{ResponseHeader: resp.GetHeader(), Leases: leases}, nil - } - if isHaltErr(ctx, err) { - return nil, toErr(ctx, err) + resp, err := l.remote.LeaseLeases(ctx, &pb.LeaseLeasesRequest{}) + if err == nil { + leases := make([]LeaseStatus, len(resp.Leases)) + for i := range resp.Leases { + leases[i] = LeaseStatus{ID: LeaseID(resp.Leases[i].ID)} } + return &LeaseLeasesResponse{ResponseHeader: resp.GetHeader(), Leases: leases}, nil } + return nil, toErr(ctx, err) } func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) { @@ -292,9 +274,10 @@ func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAl } func (l *lessor) KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) { + // manually retry in case "resp==nil && err==nil" for { resp, err := l.keepAliveOnce(ctx, id) - if err == nil { + if resp != nil && err == nil { if resp.TTL <= 0 { err = rpctypes.ErrLeaseNotFound } @@ -389,7 +372,7 @@ func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAlive cctx, cancel := context.WithCancel(ctx) defer cancel() - stream, err := l.remote.LeaseKeepAlive(cctx, grpc.FailFast(false)) + stream, err := l.remote.LeaseKeepAlive(cctx) if err != nil { return nil, toErr(ctx, err) } @@ -430,10 +413,9 @@ func (l *lessor) recvKeepAliveLoop() (gerr error) { if canceledByCaller(l.stopCtx, err) { return err } - } else { + } else if stream != nil { for { resp, err := stream.Recv() - if err != nil { if canceledByCaller(l.stopCtx, err) { return err @@ -461,8 +443,8 @@ func (l *lessor) recvKeepAliveLoop() (gerr error) { // resetRecv opens a new lease stream and starts sending keep alive requests. func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) { sctx, cancel := context.WithCancel(l.stopCtx) - stream, err := l.remote.LeaseKeepAlive(sctx, grpc.FailFast(false)) - if err != nil { + stream, err := l.remote.LeaseKeepAlive(sctx) + if stream == nil || err != nil { cancel() return nil, err } diff --git a/clientv3/maintenance.go b/clientv3/maintenance.go index 988a5f7c288..25abc9c9100 100644 --- a/clientv3/maintenance.go +++ b/clientv3/maintenance.go @@ -19,8 +19,6 @@ import ( "io" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" - - "google.golang.org/grpc" ) type ( @@ -77,9 +75,9 @@ func NewMaintenance(c *Client) Maintenance { return nil, nil, err } cancel := func() { conn.Close() } - return pb.NewMaintenanceClient(conn), cancel, nil + return RetryMaintenanceClient(c, conn), cancel, nil }, - remote: pb.NewMaintenanceClient(c.conn), + remote: RetryMaintenanceClient(c, c.conn), } } @@ -98,15 +96,11 @@ func (m *maintenance) AlarmList(ctx context.Context) (*AlarmResponse, error) { MemberID: 0, // all Alarm: pb.AlarmType_NONE, // all } - for { - resp, err := m.remote.Alarm(ctx, req, grpc.FailFast(false)) - if err == nil { - return (*AlarmResponse)(resp), nil - } - if isHaltErr(ctx, err) { - return nil, toErr(ctx, err) - } + resp, err := m.remote.Alarm(ctx, req) + if err == nil { + return (*AlarmResponse)(resp), nil } + return nil, toErr(ctx, err) } func (m *maintenance) AlarmDisarm(ctx context.Context, am *AlarmMember) (*AlarmResponse, error) { @@ -132,7 +126,7 @@ func (m *maintenance) AlarmDisarm(ctx context.Context, am *AlarmMember) (*AlarmR return &ret, nil } - resp, err := m.remote.Alarm(ctx, req, grpc.FailFast(false)) + resp, err := m.remote.Alarm(ctx, req) if err == nil { return (*AlarmResponse)(resp), nil } @@ -145,7 +139,7 @@ func (m *maintenance) Defragment(ctx context.Context, endpoint string) (*Defragm return nil, toErr(ctx, err) } defer cancel() - resp, err := remote.Defragment(ctx, &pb.DefragmentRequest{}, grpc.FailFast(false)) + resp, err := remote.Defragment(ctx, &pb.DefragmentRequest{}) if err != nil { return nil, toErr(ctx, err) } @@ -158,7 +152,7 @@ func (m *maintenance) Status(ctx context.Context, endpoint string) (*StatusRespo return nil, toErr(ctx, err) } defer cancel() - resp, err := remote.Status(ctx, &pb.StatusRequest{}, grpc.FailFast(false)) + resp, err := remote.Status(ctx, &pb.StatusRequest{}) if err != nil { return nil, toErr(ctx, err) } @@ -171,7 +165,7 @@ func (m *maintenance) HashKV(ctx context.Context, endpoint string, rev int64) (* return nil, toErr(ctx, err) } defer cancel() - resp, err := remote.HashKV(ctx, &pb.HashKVRequest{Revision: rev}, grpc.FailFast(false)) + resp, err := remote.HashKV(ctx, &pb.HashKVRequest{Revision: rev}) if err != nil { return nil, toErr(ctx, err) } @@ -179,7 +173,7 @@ func (m *maintenance) HashKV(ctx context.Context, endpoint string, rev int64) (* } func (m *maintenance) Snapshot(ctx context.Context) (io.ReadCloser, error) { - ss, err := m.remote.Snapshot(ctx, &pb.SnapshotRequest{}, grpc.FailFast(false)) + ss, err := m.remote.Snapshot(ctx, &pb.SnapshotRequest{}) if err != nil { return nil, toErr(ctx, err) } @@ -206,6 +200,6 @@ func (m *maintenance) Snapshot(ctx context.Context) (io.ReadCloser, error) { } func (m *maintenance) MoveLeader(ctx context.Context, transfereeID uint64) (*MoveLeaderResponse, error) { - resp, err := m.remote.MoveLeader(ctx, &pb.MoveLeaderRequest{TargetID: transfereeID}, grpc.FailFast(false)) + resp, err := m.remote.MoveLeader(ctx, &pb.MoveLeaderRequest{TargetID: transfereeID}) return (*MoveLeaderResponse)(resp), toErr(ctx, err) } diff --git a/clientv3/retry.go b/clientv3/retry.go index d33fff92d98..8d4a3fc7d6e 100644 --- a/clientv3/retry.go +++ b/clientv3/retry.go @@ -16,6 +16,7 @@ package clientv3 import ( "context" + "time" "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" @@ -23,70 +24,253 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "google.golang.org/grpc/transport" ) -type rpcFunc func(ctx context.Context) error -type retryRpcFunc func(context.Context, rpcFunc) error -type retryStopErrFunc func(error) bool +type rpcFunc func(context.Context) error + +// TODO: clean up gRPC error handling when "FailFast=false" is fixed. +// See https://github.com/grpc/grpc-go/issues/1532. +func (c *Client) do( + rpcCtx context.Context, + pinned string, + write bool, + f rpcFunc) (unhealthy, connectWait, switchEp, retryEp bool, err error) { + err = f(rpcCtx) + if err == nil { + unhealthy, connectWait, switchEp, retryEp = false, false, false, false + return unhealthy, connectWait, switchEp, retryEp, nil + } -func isReadStopError(err error) bool { - eErr := rpctypes.Error(err) - // always stop retry on etcd errors - if _, ok := eErr.(rpctypes.EtcdError); ok { - return true + if logger.V(4) { + logger.Infof("clientv3/do: error %q on pinned endpoint %q (write %v)", err.Error(), pinned, write) + } + + rerr := rpctypes.Error(err) + if ev, ok := rerr.(rpctypes.EtcdError); ok { + if ev.Code() != codes.Unavailable { + // error from etcd server with non codes.Unavailable + // then no endpoint switch and no retry + // e.g. rpctypes.ErrEmptyKey, rpctypes.ErrNoSpace + unhealthy, connectWait, switchEp, retryEp = false, false, false, false + return unhealthy, connectWait, switchEp, retryEp, err + } + // error from etcd server with codes.Unavailable + // then endpoint switch and retry + // e.g. rpctypes.ErrTimeout + unhealthy, connectWait, switchEp, retryEp = true, false, true, true + if write { // only retry immutable RPCs ("put at-most-once semantics") + retryEp = false + } + return unhealthy, connectWait, switchEp, retryEp, err } - // only retry if unavailable - ev, _ := status.FromError(err) - return ev.Code() != codes.Unavailable -} -func isWriteStopError(err error) bool { - ev, _ := status.FromError(err) - if ev.Code() != codes.Unavailable { - return true + // if failed to establish new stream or server is not reachable, + // then endpoint switch and retry + // e.g. transport.ErrConnClosing + if tv, ok := err.(transport.ConnectionError); ok && tv.Temporary() { + unhealthy, connectWait, switchEp, retryEp = true, false, true, true + return unhealthy, connectWait, switchEp, retryEp, err } - return rpctypes.ErrorDesc(err) != "there is no address available" + + // if unknown status error from gRPC, then endpoint switch and no retry + s, ok := status.FromError(err) + if !ok { + unhealthy, connectWait, switchEp, retryEp = true, false, true, false + return unhealthy, connectWait, switchEp, retryEp, err + } + + // assume transport.ConnectionError, transport.StreamError, or others from gRPC + // converts to grpc/status.(*statusError) by grpc/toRPCErr + // (e.g. transport.ErrConnClosing when server closed transport, failing node) + // if known status error from gRPC with following codes, + // then endpoint switch and retry + if s.Code() == codes.Unavailable || + s.Code() == codes.Internal || + s.Code() == codes.DeadlineExceeded { + unhealthy, connectWait, switchEp, retryEp = true, false, true, true + switch s.Message() { + case "there is no address available": + // pinned == "" or endpoint unpinned right before/during RPC send + // no need to mark empty endpoint + unhealthy = false + + // RPC was sent with empty pinned address: + // 1. initial connection has not been established (never pinned) + // 2. an endpoint has just been unpinned from an error + // Both cases expect to pin a new endpoint when Up is called. + // Thus, should be retried. + retryEp = true + if logger.V(4) { + logger.Infof("clientv3/do: there was no pinned endpoint (will be retried)") + } + + // case A. + // gRPC is being too slow to start transport (e.g. errors, backoff), + // then endpoint switch. Otherwise, it can block forever to connect wait. + // This case is safe to reset/drain all current connections. + switchEp = true + + // case B. + // gRPC is just about to establish transport within a moment, + // where endpoint switch would enforce connection drain on the healthy + // endpoint, which otherwise could be pinned and connected. + // The healthy endpoint gets pinned but unpinned right after with + // an error "grpc: the connection is drained", from endpoint switch. + // Thus, not safe to trigger endpoint switch; new healthy endpoint + // will be marked as unhealthy and not be pinned for another >3 seconds. + // Not acceptable when this healthy endpoint was the only available one! + // Workaround is wait up to dial timeout, and then endpoint switch + // only when the connection still has not been "Up" (still no pinned endpoint). + // TODO: remove this when gRPC has better way to track connection status + connectWait = true + + case "grpc: the connection is closing", + "grpc: the connection is unavailable": + // gRPC v1.7 retries on these errors with FailFast=false + // etcd client sets FailFast=true, thus retry manually + unhealthy, connectWait, switchEp, retryEp = true, false, true, true + + default: + // only retry immutable RPCs ("put at-most-once semantics") + // mutable RPCs should not be retried if unknown errors + if write { + unhealthy, connectWait, switchEp, retryEp = true, false, true, false + } + } + } + + return unhealthy, connectWait, switchEp, retryEp, err } -func (c *Client) newRetryWrapper(isStop retryStopErrFunc) retryRpcFunc { +type retryRPCFunc func(context.Context, rpcFunc) error + +const minDialDuration = 3 * time.Second + +func (c *Client) newRetryWrapper(write bool) retryRPCFunc { + dialTimeout := c.cfg.DialTimeout + if dialTimeout < minDialDuration { + dialTimeout = minDialDuration + } + dialWait := func(rpcCtx context.Context) (up bool, err error) { + select { + case <-c.balancer.ConnectNotify(): + if logger.V(4) { + logger.Infof("clientv3/retry: new healthy endpoint %q is up!", c.balancer.pinned()) + } + return true, nil + case <-time.After(dialTimeout): + return false, nil + case <-rpcCtx.Done(): + return false, rpcCtx.Err() + case <-c.ctx.Done(): + return false, c.ctx.Err() + } + } return func(rpcCtx context.Context, f rpcFunc) error { for { - select { - case <-c.balancer.ConnectNotify(): - case <-rpcCtx.Done(): - return rpcCtx.Err() - case <-c.ctx.Done(): - return c.ctx.Err() - } pinned := c.balancer.pinned() - err := f(rpcCtx) - if err == nil { + staleEp := pinned != "" && c.balancer.endpoint(pinned) == "" + eps := c.balancer.endpoints() + singleEp, multiEp := len(eps) == 1, len(eps) > 1 + + var unhealthy, connectWait, switchEp, retryEp bool + var err error + if pinned == "" { + // no endpoint has not been up, then wait for connection up and retry + unhealthy, connectWait, switchEp, retryEp = false, true, true, true + } else if staleEp { + // if stale, then endpoint switch and retry + unhealthy, switchEp, retryEp = false, true, true + // should wait in case this endpoint is stale from "SetEndpoints" + // which resets all connections, thus expecting new healthy endpoint "Up" + connectWait = true + if logger.V(4) { + logger.Infof("clientv3/retry: found stale endpoint %q (switching and retrying)", pinned) + } + } else { // endpoint is up-to-date + unhealthy, connectWait, switchEp, retryEp, err = c.do(rpcCtx, pinned, write, f) + } + if !unhealthy && !connectWait && !switchEp && !retryEp && err == nil { return nil } - if logger.V(4) { - logger.Infof("clientv3/retry: error %v on pinned endpoint %s", err, pinned) + + // single endpoint with failed gRPC connection, before RPC is sent + // should neither do endpoint switch nor retry + // because client might have manually closed the connection + // and there's no other endpoint to switch + // e.g. grpc.ErrClientConnClosing + if singleEp && connectWait { + ep := eps[0] + _, host, _ := parseEndpoint(ep) + ev, ok := c.balancer.isFailed(host) + if ok { + // error returned to gRPC balancer "Up" error function + // before RPC is sent (error is typed "grpc.downErr") + if ev.err.Error() == grpc.ErrClientConnClosing.Error() { + if logger.V(4) { + logger.Infof("clientv3/retry: single endpoint %q with error %q (returning)", host, ev.err.Error()) + } + return grpc.ErrClientConnClosing + } + if logger.V(4) { + logger.Infof("clientv3/retry: single endpoint %q with error %q", host, ev.err.Error()) + } + } } - // mark this before endpoint switch is triggered - c.balancer.endpointError(pinned, err) - notify := c.balancer.ConnectNotify() - if s, ok := status.FromError(err); ok && s.Code() == codes.Unavailable { - c.balancer.next() + + // mark as unhealthy, only when there are multiple endpoints + if multiEp && unhealthy { + c.balancer.endpointError(pinned, err) } - if isStop(err) { - return err + + // wait for next healthy endpoint up + // before draining all current connections + if connectWait { + if logger.V(4) { + logger.Infof("clientv3/retry: wait %v for healthy endpoint", dialTimeout) + } + up, derr := dialWait(rpcCtx) + if derr != nil { + return derr + } + if up { // connection is up, no need to switch endpoint + continue + } + if logger.V(4) { + logger.Infof("clientv3/retry: took too long to reset transport (switching endpoints)") + } } - select { - case <-notify: - case <-rpcCtx.Done(): - return rpcCtx.Err() - case <-c.ctx.Done(): - return c.ctx.Err() + + // trigger endpoint switch in balancer + if switchEp { + c.balancer.next() } + + // wait for another endpoint to come up + if retryEp { + if logger.V(4) { + logger.Infof("clientv3/retry: wait %v for connection before retry", dialTimeout) + } + up, derr := dialWait(rpcCtx) + if derr != nil { + return derr + } + if up { // retry with new endpoint + continue + } + if logger.V(4) { + logger.Infof("clientv3/retry: took too long for connection up (retrying)") + } + } + + // TODO: remove duplicate error handling inside toErr + return toErr(rpcCtx, err) } } } -func (c *Client) newAuthRetryWrapper() retryRpcFunc { +func (c *Client) newAuthRetryWrapper() retryRPCFunc { return func(rpcCtx context.Context, f rpcFunc) error { for { pinned := c.balancer.pinned() @@ -94,15 +278,18 @@ func (c *Client) newAuthRetryWrapper() retryRpcFunc { if err == nil { return nil } - if logger.V(4) { - logger.Infof("clientv3/auth-retry: error %v on pinned endpoint %s", err, pinned) - } // always stop retry on etcd errors other than invalid auth token if rpctypes.Error(err) == rpctypes.ErrInvalidAuthToken { gterr := c.getToken(rpcCtx) if gterr != nil { + if logger.V(4) { + logger.Infof("clientv3/auth-retry: error %q(%q) on pinned endpoint %q (returning)", err.Error(), gterr.Error(), pinned) + } return err // return the original error for simplicity } + if logger.V(4) { + logger.Infof("clientv3/auth-retry: error %q on pinned endpoint %q (retrying)", err, pinned) + } continue } return err @@ -110,21 +297,28 @@ func (c *Client) newAuthRetryWrapper() retryRpcFunc { } } -// RetryKVClient implements a KVClient that uses the client's FailFast retry policy. -func RetryKVClient(c *Client) pb.KVClient { - readRetry := c.newRetryWrapper(isReadStopError) - writeRetry := c.newRetryWrapper(isWriteStopError) - conn := pb.NewKVClient(c.conn) - retryBasic := &retryKVClient{&retryWriteKVClient{conn, writeRetry}, readRetry} +// RetryKVClient implements a KVClient. +func RetryKVClient(c *Client) (readWrite, readOnly pb.KVClient) { + readRetry := c.newRetryWrapper(false) + writeRetry := c.newRetryWrapper(true) retryAuthWrapper := c.newAuthRetryWrapper() - return &retryKVClient{ + kvc := pb.NewKVClient(c.conn) + + retryBasic := &retryKVClient{&retryWriteKVClient{kvc, writeRetry}, readRetry} + readWrite = &retryKVClient{ &retryWriteKVClient{retryBasic, retryAuthWrapper}, - retryAuthWrapper} + retryAuthWrapper, + } + + retryRead := &retryReadKVClient{kvc, readRetry} + readOnly = &retryReadKVClient{retryRead, retryAuthWrapper} + + return readWrite, readOnly } type retryKVClient struct { *retryWriteKVClient - readRetry retryRpcFunc + readRetry retryRPCFunc } func (rkv *retryKVClient) Range(ctx context.Context, in *pb.RangeRequest, opts ...grpc.CallOption) (resp *pb.RangeResponse, err error) { @@ -137,11 +331,11 @@ func (rkv *retryKVClient) Range(ctx context.Context, in *pb.RangeRequest, opts . type retryWriteKVClient struct { pb.KVClient - retryf retryRpcFunc + writeRetry retryRPCFunc } func (rkv *retryWriteKVClient) Put(ctx context.Context, in *pb.PutRequest, opts ...grpc.CallOption) (resp *pb.PutResponse, err error) { - err = rkv.retryf(ctx, func(rctx context.Context) error { + err = rkv.writeRetry(ctx, func(rctx context.Context) error { resp, err = rkv.KVClient.Put(rctx, in, opts...) return err }) @@ -149,7 +343,7 @@ func (rkv *retryWriteKVClient) Put(ctx context.Context, in *pb.PutRequest, opts } func (rkv *retryWriteKVClient) DeleteRange(ctx context.Context, in *pb.DeleteRangeRequest, opts ...grpc.CallOption) (resp *pb.DeleteRangeResponse, err error) { - err = rkv.retryf(ctx, func(rctx context.Context) error { + err = rkv.writeRetry(ctx, func(rctx context.Context) error { resp, err = rkv.KVClient.DeleteRange(rctx, in, opts...) return err }) @@ -157,7 +351,20 @@ func (rkv *retryWriteKVClient) DeleteRange(ctx context.Context, in *pb.DeleteRan } func (rkv *retryWriteKVClient) Txn(ctx context.Context, in *pb.TxnRequest, opts ...grpc.CallOption) (resp *pb.TxnResponse, err error) { - err = rkv.retryf(ctx, func(rctx context.Context) error { + err = rkv.writeRetry(ctx, func(rctx context.Context) error { + resp, err = rkv.KVClient.Txn(rctx, in, opts...) + return err + }) + return resp, err +} + +type retryReadKVClient struct { + pb.KVClient + readRetry retryRPCFunc +} + +func (rkv *retryReadKVClient) Txn(ctx context.Context, in *pb.TxnRequest, opts ...grpc.CallOption) (resp *pb.TxnResponse, err error) { + err = rkv.readRetry(ctx, func(rctx context.Context) error { resp, err = rkv.KVClient.Txn(rctx, in, opts...) return err }) @@ -165,29 +372,65 @@ func (rkv *retryWriteKVClient) Txn(ctx context.Context, in *pb.TxnRequest, opts } func (rkv *retryWriteKVClient) Compact(ctx context.Context, in *pb.CompactionRequest, opts ...grpc.CallOption) (resp *pb.CompactionResponse, err error) { - err = rkv.retryf(ctx, func(rctx context.Context) error { + err = rkv.writeRetry(ctx, func(rctx context.Context) error { resp, err = rkv.KVClient.Compact(rctx, in, opts...) return err }) return resp, err } +// RetryWatchClient implements a WatchClient. +func RetryWatchClient(c *Client) pb.WatchClient { + readRetry := c.newRetryWrapper(false) + wc := pb.NewWatchClient(c.conn) + return &retryWatchClient{wc, readRetry} +} + +type retryWatchClient struct { + pb.WatchClient + readRetry retryRPCFunc +} + +func (rwc *retryWatchClient) Watch(ctx context.Context, opts ...grpc.CallOption) (stream pb.Watch_WatchClient, err error) { + err = rwc.readRetry(ctx, func(rctx context.Context) error { + stream, err = rwc.WatchClient.Watch(rctx, opts...) + return err + }) + return stream, err +} + type retryLeaseClient struct { pb.LeaseClient - retryf retryRpcFunc + readRetry retryRPCFunc } -// RetryLeaseClient implements a LeaseClient that uses the client's FailFast retry policy. +// RetryLeaseClient implements a LeaseClient. func RetryLeaseClient(c *Client) pb.LeaseClient { - retry := &retryLeaseClient{ - pb.NewLeaseClient(c.conn), - c.newRetryWrapper(isReadStopError), - } - return &retryLeaseClient{retry, c.newAuthRetryWrapper()} + readRetry := c.newRetryWrapper(false) + lc := pb.NewLeaseClient(c.conn) + retryBasic := &retryLeaseClient{lc, readRetry} + retryAuthWrapper := c.newAuthRetryWrapper() + return &retryLeaseClient{retryBasic, retryAuthWrapper} +} + +func (rlc *retryLeaseClient) LeaseTimeToLive(ctx context.Context, in *pb.LeaseTimeToLiveRequest, opts ...grpc.CallOption) (resp *pb.LeaseTimeToLiveResponse, err error) { + err = rlc.readRetry(ctx, func(rctx context.Context) error { + resp, err = rlc.LeaseClient.LeaseTimeToLive(rctx, in, opts...) + return err + }) + return resp, err +} + +func (rlc *retryLeaseClient) LeaseLeases(ctx context.Context, in *pb.LeaseLeasesRequest, opts ...grpc.CallOption) (resp *pb.LeaseLeasesResponse, err error) { + err = rlc.readRetry(ctx, func(rctx context.Context) error { + resp, err = rlc.LeaseClient.LeaseLeases(rctx, in, opts...) + return err + }) + return resp, err } func (rlc *retryLeaseClient) LeaseGrant(ctx context.Context, in *pb.LeaseGrantRequest, opts ...grpc.CallOption) (resp *pb.LeaseGrantResponse, err error) { - err = rlc.retryf(ctx, func(rctx context.Context) error { + err = rlc.readRetry(ctx, func(rctx context.Context) error { resp, err = rlc.LeaseClient.LeaseGrant(rctx, in, opts...) return err }) @@ -196,141 +439,279 @@ func (rlc *retryLeaseClient) LeaseGrant(ctx context.Context, in *pb.LeaseGrantRe } func (rlc *retryLeaseClient) LeaseRevoke(ctx context.Context, in *pb.LeaseRevokeRequest, opts ...grpc.CallOption) (resp *pb.LeaseRevokeResponse, err error) { - err = rlc.retryf(ctx, func(rctx context.Context) error { + err = rlc.readRetry(ctx, func(rctx context.Context) error { resp, err = rlc.LeaseClient.LeaseRevoke(rctx, in, opts...) return err }) return resp, err } -type retryClusterClient struct { - pb.ClusterClient - retryf retryRpcFunc +func (rlc *retryLeaseClient) LeaseKeepAlive(ctx context.Context, opts ...grpc.CallOption) (stream pb.Lease_LeaseKeepAliveClient, err error) { + err = rlc.readRetry(ctx, func(rctx context.Context) error { + stream, err = rlc.LeaseClient.LeaseKeepAlive(rctx, opts...) + return err + }) + return stream, err } -// RetryClusterClient implements a ClusterClient that uses the client's FailFast retry policy. +// RetryClusterClient implements a ClusterClient. func RetryClusterClient(c *Client) pb.ClusterClient { - return &retryClusterClient{pb.NewClusterClient(c.conn), c.newRetryWrapper(isWriteStopError)} + readRetry := c.newRetryWrapper(false) + writeRetry := c.newRetryWrapper(true) + cc := pb.NewClusterClient(c.conn) + return &retryClusterClient{&retryWriteClusterClient{cc, writeRetry}, readRetry} } -func (rcc *retryClusterClient) MemberAdd(ctx context.Context, in *pb.MemberAddRequest, opts ...grpc.CallOption) (resp *pb.MemberAddResponse, err error) { - err = rcc.retryf(ctx, func(rctx context.Context) error { +type retryClusterClient struct { + *retryWriteClusterClient + readRetry retryRPCFunc +} + +func (rcc *retryClusterClient) MemberList(ctx context.Context, in *pb.MemberListRequest, opts ...grpc.CallOption) (resp *pb.MemberListResponse, err error) { + err = rcc.readRetry(ctx, func(rctx context.Context) error { + resp, err = rcc.ClusterClient.MemberList(rctx, in, opts...) + return err + }) + return resp, err +} + +type retryWriteClusterClient struct { + pb.ClusterClient + writeRetry retryRPCFunc +} + +func (rcc *retryWriteClusterClient) MemberAdd(ctx context.Context, in *pb.MemberAddRequest, opts ...grpc.CallOption) (resp *pb.MemberAddResponse, err error) { + err = rcc.writeRetry(ctx, func(rctx context.Context) error { resp, err = rcc.ClusterClient.MemberAdd(rctx, in, opts...) return err }) return resp, err } -func (rcc *retryClusterClient) MemberRemove(ctx context.Context, in *pb.MemberRemoveRequest, opts ...grpc.CallOption) (resp *pb.MemberRemoveResponse, err error) { - err = rcc.retryf(ctx, func(rctx context.Context) error { +func (rcc *retryWriteClusterClient) MemberRemove(ctx context.Context, in *pb.MemberRemoveRequest, opts ...grpc.CallOption) (resp *pb.MemberRemoveResponse, err error) { + err = rcc.writeRetry(ctx, func(rctx context.Context) error { resp, err = rcc.ClusterClient.MemberRemove(rctx, in, opts...) return err }) return resp, err } -func (rcc *retryClusterClient) MemberUpdate(ctx context.Context, in *pb.MemberUpdateRequest, opts ...grpc.CallOption) (resp *pb.MemberUpdateResponse, err error) { - err = rcc.retryf(ctx, func(rctx context.Context) error { +func (rcc *retryWriteClusterClient) MemberUpdate(ctx context.Context, in *pb.MemberUpdateRequest, opts ...grpc.CallOption) (resp *pb.MemberUpdateResponse, err error) { + err = rcc.writeRetry(ctx, func(rctx context.Context) error { resp, err = rcc.ClusterClient.MemberUpdate(rctx, in, opts...) return err }) return resp, err } -type retryAuthClient struct { - pb.AuthClient - retryf retryRpcFunc +// RetryMaintenanceClient implements a Maintenance. +func RetryMaintenanceClient(c *Client, conn *grpc.ClientConn) pb.MaintenanceClient { + readRetry := c.newRetryWrapper(false) + writeRetry := c.newRetryWrapper(true) + mc := pb.NewMaintenanceClient(conn) + return &retryMaintenanceClient{&retryWriteMaintenanceClient{mc, writeRetry}, readRetry} +} + +type retryMaintenanceClient struct { + pb.MaintenanceClient + readRetry retryRPCFunc +} + +func (rmc *retryMaintenanceClient) Alarm(ctx context.Context, in *pb.AlarmRequest, opts ...grpc.CallOption) (resp *pb.AlarmResponse, err error) { + err = rmc.readRetry(ctx, func(rctx context.Context) error { + resp, err = rmc.MaintenanceClient.Alarm(rctx, in, opts...) + return err + }) + return resp, err +} + +func (rmc *retryMaintenanceClient) Status(ctx context.Context, in *pb.StatusRequest, opts ...grpc.CallOption) (resp *pb.StatusResponse, err error) { + err = rmc.readRetry(ctx, func(rctx context.Context) error { + resp, err = rmc.MaintenanceClient.Status(rctx, in, opts...) + return err + }) + return resp, err +} + +func (rmc *retryMaintenanceClient) Hash(ctx context.Context, in *pb.HashRequest, opts ...grpc.CallOption) (resp *pb.HashResponse, err error) { + err = rmc.readRetry(ctx, func(rctx context.Context) error { + resp, err = rmc.MaintenanceClient.Hash(rctx, in, opts...) + return err + }) + return resp, err +} + +func (rmc *retryMaintenanceClient) HashKV(ctx context.Context, in *pb.HashKVRequest, opts ...grpc.CallOption) (resp *pb.HashKVResponse, err error) { + err = rmc.readRetry(ctx, func(rctx context.Context) error { + resp, err = rmc.MaintenanceClient.HashKV(rctx, in, opts...) + return err + }) + return resp, err +} + +func (rmc *retryMaintenanceClient) Snapshot(ctx context.Context, in *pb.SnapshotRequest, opts ...grpc.CallOption) (stream pb.Maintenance_SnapshotClient, err error) { + err = rmc.readRetry(ctx, func(rctx context.Context) error { + stream, err = rmc.MaintenanceClient.Snapshot(rctx, in, opts...) + return err + }) + return stream, err } -// RetryAuthClient implements a AuthClient that uses the client's FailFast retry policy. +type retryWriteMaintenanceClient struct { + pb.MaintenanceClient + writeRetry retryRPCFunc +} + +func (rmc *retryWriteMaintenanceClient) Defragment(ctx context.Context, in *pb.DefragmentRequest, opts ...grpc.CallOption) (resp *pb.DefragmentResponse, err error) { + err = rmc.writeRetry(ctx, func(rctx context.Context) error { + resp, err = rmc.MaintenanceClient.Defragment(rctx, in, opts...) + return err + }) + return resp, err +} + +func (rmc *retryWriteMaintenanceClient) MoveLeader(ctx context.Context, in *pb.MoveLeaderRequest, opts ...grpc.CallOption) (resp *pb.MoveLeaderResponse, err error) { + err = rmc.writeRetry(ctx, func(rctx context.Context) error { + resp, err = rmc.MaintenanceClient.MoveLeader(rctx, in, opts...) + return err + }) + return resp, err +} + +// RetryAuthClient implements a AuthClient. func RetryAuthClient(c *Client) pb.AuthClient { - return &retryAuthClient{pb.NewAuthClient(c.conn), c.newRetryWrapper(isWriteStopError)} + readRetry := c.newRetryWrapper(false) + writeRetry := c.newRetryWrapper(true) + ac := pb.NewAuthClient(c.conn) + return &retryAuthClient{&retryWriteAuthClient{ac, writeRetry}, readRetry} +} + +type retryAuthClient struct { + *retryWriteAuthClient + readRetry retryRPCFunc } -func (rac *retryAuthClient) AuthEnable(ctx context.Context, in *pb.AuthEnableRequest, opts ...grpc.CallOption) (resp *pb.AuthEnableResponse, err error) { - err = rac.retryf(ctx, func(rctx context.Context) error { +func (rac *retryAuthClient) UserList(ctx context.Context, in *pb.AuthUserListRequest, opts ...grpc.CallOption) (resp *pb.AuthUserListResponse, err error) { + err = rac.readRetry(ctx, func(rctx context.Context) error { + resp, err = rac.AuthClient.UserList(rctx, in, opts...) + return err + }) + return resp, err +} + +func (rac *retryAuthClient) RoleGet(ctx context.Context, in *pb.AuthRoleGetRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleGetResponse, err error) { + err = rac.readRetry(ctx, func(rctx context.Context) error { + resp, err = rac.AuthClient.RoleGet(rctx, in, opts...) + return err + }) + return resp, err +} + +func (rac *retryAuthClient) RoleList(ctx context.Context, in *pb.AuthRoleListRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleListResponse, err error) { + err = rac.readRetry(ctx, func(rctx context.Context) error { + resp, err = rac.AuthClient.RoleList(rctx, in, opts...) + return err + }) + return resp, err +} + +type retryWriteAuthClient struct { + pb.AuthClient + writeRetry retryRPCFunc +} + +func (rac *retryWriteAuthClient) AuthEnable(ctx context.Context, in *pb.AuthEnableRequest, opts ...grpc.CallOption) (resp *pb.AuthEnableResponse, err error) { + err = rac.writeRetry(ctx, func(rctx context.Context) error { resp, err = rac.AuthClient.AuthEnable(rctx, in, opts...) return err }) return resp, err } -func (rac *retryAuthClient) AuthDisable(ctx context.Context, in *pb.AuthDisableRequest, opts ...grpc.CallOption) (resp *pb.AuthDisableResponse, err error) { - err = rac.retryf(ctx, func(rctx context.Context) error { +func (rac *retryWriteAuthClient) AuthDisable(ctx context.Context, in *pb.AuthDisableRequest, opts ...grpc.CallOption) (resp *pb.AuthDisableResponse, err error) { + err = rac.writeRetry(ctx, func(rctx context.Context) error { resp, err = rac.AuthClient.AuthDisable(rctx, in, opts...) return err }) return resp, err } -func (rac *retryAuthClient) UserAdd(ctx context.Context, in *pb.AuthUserAddRequest, opts ...grpc.CallOption) (resp *pb.AuthUserAddResponse, err error) { - err = rac.retryf(ctx, func(rctx context.Context) error { +func (rac *retryWriteAuthClient) UserAdd(ctx context.Context, in *pb.AuthUserAddRequest, opts ...grpc.CallOption) (resp *pb.AuthUserAddResponse, err error) { + err = rac.writeRetry(ctx, func(rctx context.Context) error { resp, err = rac.AuthClient.UserAdd(rctx, in, opts...) return err }) return resp, err } -func (rac *retryAuthClient) UserDelete(ctx context.Context, in *pb.AuthUserDeleteRequest, opts ...grpc.CallOption) (resp *pb.AuthUserDeleteResponse, err error) { - err = rac.retryf(ctx, func(rctx context.Context) error { +func (rac *retryWriteAuthClient) UserDelete(ctx context.Context, in *pb.AuthUserDeleteRequest, opts ...grpc.CallOption) (resp *pb.AuthUserDeleteResponse, err error) { + err = rac.writeRetry(ctx, func(rctx context.Context) error { resp, err = rac.AuthClient.UserDelete(rctx, in, opts...) return err }) return resp, err } -func (rac *retryAuthClient) UserChangePassword(ctx context.Context, in *pb.AuthUserChangePasswordRequest, opts ...grpc.CallOption) (resp *pb.AuthUserChangePasswordResponse, err error) { - err = rac.retryf(ctx, func(rctx context.Context) error { +func (rac *retryWriteAuthClient) UserChangePassword(ctx context.Context, in *pb.AuthUserChangePasswordRequest, opts ...grpc.CallOption) (resp *pb.AuthUserChangePasswordResponse, err error) { + err = rac.writeRetry(ctx, func(rctx context.Context) error { resp, err = rac.AuthClient.UserChangePassword(rctx, in, opts...) return err }) return resp, err } -func (rac *retryAuthClient) UserGrantRole(ctx context.Context, in *pb.AuthUserGrantRoleRequest, opts ...grpc.CallOption) (resp *pb.AuthUserGrantRoleResponse, err error) { - err = rac.retryf(ctx, func(rctx context.Context) error { +func (rac *retryWriteAuthClient) UserGrantRole(ctx context.Context, in *pb.AuthUserGrantRoleRequest, opts ...grpc.CallOption) (resp *pb.AuthUserGrantRoleResponse, err error) { + err = rac.writeRetry(ctx, func(rctx context.Context) error { resp, err = rac.AuthClient.UserGrantRole(rctx, in, opts...) return err }) return resp, err } -func (rac *retryAuthClient) UserRevokeRole(ctx context.Context, in *pb.AuthUserRevokeRoleRequest, opts ...grpc.CallOption) (resp *pb.AuthUserRevokeRoleResponse, err error) { - err = rac.retryf(ctx, func(rctx context.Context) error { +func (rac *retryWriteAuthClient) UserRevokeRole(ctx context.Context, in *pb.AuthUserRevokeRoleRequest, opts ...grpc.CallOption) (resp *pb.AuthUserRevokeRoleResponse, err error) { + err = rac.writeRetry(ctx, func(rctx context.Context) error { resp, err = rac.AuthClient.UserRevokeRole(rctx, in, opts...) return err }) return resp, err } -func (rac *retryAuthClient) RoleAdd(ctx context.Context, in *pb.AuthRoleAddRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleAddResponse, err error) { - err = rac.retryf(ctx, func(rctx context.Context) error { +func (rac *retryWriteAuthClient) RoleAdd(ctx context.Context, in *pb.AuthRoleAddRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleAddResponse, err error) { + err = rac.writeRetry(ctx, func(rctx context.Context) error { resp, err = rac.AuthClient.RoleAdd(rctx, in, opts...) return err }) return resp, err } -func (rac *retryAuthClient) RoleDelete(ctx context.Context, in *pb.AuthRoleDeleteRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleDeleteResponse, err error) { - err = rac.retryf(ctx, func(rctx context.Context) error { +func (rac *retryWriteAuthClient) RoleDelete(ctx context.Context, in *pb.AuthRoleDeleteRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleDeleteResponse, err error) { + err = rac.writeRetry(ctx, func(rctx context.Context) error { resp, err = rac.AuthClient.RoleDelete(rctx, in, opts...) return err }) return resp, err } -func (rac *retryAuthClient) RoleGrantPermission(ctx context.Context, in *pb.AuthRoleGrantPermissionRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleGrantPermissionResponse, err error) { - err = rac.retryf(ctx, func(rctx context.Context) error { +func (rac *retryWriteAuthClient) RoleGrantPermission(ctx context.Context, in *pb.AuthRoleGrantPermissionRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleGrantPermissionResponse, err error) { + err = rac.writeRetry(ctx, func(rctx context.Context) error { resp, err = rac.AuthClient.RoleGrantPermission(rctx, in, opts...) return err }) return resp, err } -func (rac *retryAuthClient) RoleRevokePermission(ctx context.Context, in *pb.AuthRoleRevokePermissionRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleRevokePermissionResponse, err error) { - err = rac.retryf(ctx, func(rctx context.Context) error { +func (rac *retryWriteAuthClient) RoleRevokePermission(ctx context.Context, in *pb.AuthRoleRevokePermissionRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleRevokePermissionResponse, err error) { + err = rac.writeRetry(ctx, func(rctx context.Context) error { resp, err = rac.AuthClient.RoleRevokePermission(rctx, in, opts...) return err }) return resp, err } + +func (rac *retryWriteAuthClient) Authenticate(ctx context.Context, in *pb.AuthenticateRequest, opts ...grpc.CallOption) (resp *pb.AuthenticateResponse, err error) { + err = rac.writeRetry(ctx, func(rctx context.Context) error { + resp, err = rac.AuthClient.Authenticate(rctx, in, opts...) + return err + }) + return resp, err +} diff --git a/clientv3/txn.go b/clientv3/txn.go index ea4ec6160b5..fb5214262ca 100644 --- a/clientv3/txn.go +++ b/clientv3/txn.go @@ -19,8 +19,6 @@ import ( "sync" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" - - "google.golang.org/grpc" ) // Txn is the interface that wraps mini-transactions. @@ -136,30 +134,18 @@ func (txn *txn) Else(ops ...Op) Txn { func (txn *txn) Commit() (*TxnResponse, error) { txn.mu.Lock() defer txn.mu.Unlock() - for { - resp, err := txn.commit() - if err == nil { - return resp, err - } - if isHaltErr(txn.ctx, err) { - return nil, toErr(txn.ctx, err) - } - if txn.isWrite { - return nil, toErr(txn.ctx, err) - } - } -} -func (txn *txn) commit() (*TxnResponse, error) { r := &pb.TxnRequest{Compare: txn.cmps, Success: txn.sus, Failure: txn.fas} - var opts []grpc.CallOption - if !txn.isWrite { - opts = []grpc.CallOption{grpc.FailFast(false)} + var resp *pb.TxnResponse + var err error + if txn.isWrite { + resp, err = txn.kv.readWrite.Txn(txn.ctx, r) + } else { + resp, err = txn.kv.readOnly.Txn(txn.ctx, r) } - resp, err := txn.kv.remote.Txn(txn.ctx, r, opts...) if err != nil { - return nil, err + return nil, toErr(txn.ctx, err) } return (*TxnResponse)(resp), nil } diff --git a/clientv3/watch.go b/clientv3/watch.go index cfa47812691..2b31c4f6af6 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -188,7 +188,7 @@ type watcherStream struct { } func NewWatcher(c *Client) Watcher { - return NewWatchFromWatchClient(pb.NewWatchClient(c.conn)) + return NewWatchFromWatchClient(RetryWatchClient(c)) } func NewWatchFromWatchClient(wc pb.WatchClient) Watcher { @@ -761,8 +761,9 @@ func (w *watchGrpcStream) joinSubstreams() { } } -// openWatchClient retries opening a watch client until success or halt. +// openWatchClient opens a watch client. func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) { + // manually retry in case "ws==nil && err==nil" for { select { case <-w.ctx.Done(): @@ -772,7 +773,7 @@ func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) return nil, err default: } - if ws, err = w.remote.Watch(w.ctx, grpc.FailFast(false)); ws != nil && err == nil { + if ws, err = w.remote.Watch(w.ctx); ws != nil && err == nil { break } if isHaltErr(w.ctx, err) { diff --git a/cmd/vendor/google.golang.org/grpc/transport/handler_server.go b/cmd/vendor/google.golang.org/grpc/transport/handler_server.go index 0489fada52e..31a260cfc91 100644 --- a/cmd/vendor/google.golang.org/grpc/transport/handler_server.go +++ b/cmd/vendor/google.golang.org/grpc/transport/handler_server.go @@ -183,6 +183,7 @@ func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) erro ht.mu.Unlock() return nil } + ht.streamDone = true ht.mu.Unlock() err := ht.do(func() { ht.writeCommonHeaders(s) @@ -223,9 +224,6 @@ func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) erro } }) close(ht.writes) - ht.mu.Lock() - ht.streamDone = true - ht.mu.Unlock() return err } diff --git a/glide.yaml b/glide.yaml index a5cbd20821d..1488aaa6141 100644 --- a/glide.yaml +++ b/glide.yaml @@ -97,7 +97,7 @@ import: subpackages: - rate - package: google.golang.org/grpc - version: v1.6.0 + version: v1.6.0 with 22c3f92f5faea8db492fb0f5ae4daf0d2752b19e (temporary) subpackages: - codes - credentials