Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clientv3: fix balancer/retry #8710

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion .words
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
ConnectionError
ErrCodeEnhanceYourCalm
ErrConnClosing
ErrEmptyKey
ErrNoSpace
ErrTimeout
FailFast
GoAway
RPC
RPCs
StreamError
backoff
blackholed
cancelable
cancelation
cluster_proxy
defragment
defragmenting
downErr
etcd
gRPC
goroutine
Expand All @@ -25,9 +33,10 @@ mutex
prefetching
protobuf
serializable
statusError
teardown
toRPCErr
too_many_pings
uncontended
unprefixed
unlisting

16 changes: 8 additions & 8 deletions clientv3/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,16 +105,16 @@ type auth struct {
}

func NewAuth(c *Client) Auth {
return &auth{remote: pb.NewAuthClient(c.ActiveConnection())}
return &auth{remote: RetryAuthClient(c)}
}

func (auth *auth) AuthEnable(ctx context.Context) (*AuthEnableResponse, error) {
resp, err := auth.remote.AuthEnable(ctx, &pb.AuthEnableRequest{}, grpc.FailFast(false))
resp, err := auth.remote.AuthEnable(ctx, &pb.AuthEnableRequest{})
return (*AuthEnableResponse)(resp), toErr(ctx, err)
}

func (auth *auth) AuthDisable(ctx context.Context) (*AuthDisableResponse, error) {
resp, err := auth.remote.AuthDisable(ctx, &pb.AuthDisableRequest{}, grpc.FailFast(false))
resp, err := auth.remote.AuthDisable(ctx, &pb.AuthDisableRequest{})
return (*AuthDisableResponse)(resp), toErr(ctx, err)
}

Expand All @@ -139,12 +139,12 @@ func (auth *auth) UserGrantRole(ctx context.Context, user string, role string) (
}

func (auth *auth) UserGet(ctx context.Context, name string) (*AuthUserGetResponse, error) {
resp, err := auth.remote.UserGet(ctx, &pb.AuthUserGetRequest{Name: name}, grpc.FailFast(false))
resp, err := auth.remote.UserGet(ctx, &pb.AuthUserGetRequest{Name: name})
return (*AuthUserGetResponse)(resp), toErr(ctx, err)
}

func (auth *auth) UserList(ctx context.Context) (*AuthUserListResponse, error) {
resp, err := auth.remote.UserList(ctx, &pb.AuthUserListRequest{}, grpc.FailFast(false))
resp, err := auth.remote.UserList(ctx, &pb.AuthUserListRequest{})
return (*AuthUserListResponse)(resp), toErr(ctx, err)
}

Expand All @@ -169,12 +169,12 @@ func (auth *auth) RoleGrantPermission(ctx context.Context, name string, key, ran
}

func (auth *auth) RoleGet(ctx context.Context, role string) (*AuthRoleGetResponse, error) {
resp, err := auth.remote.RoleGet(ctx, &pb.AuthRoleGetRequest{Role: role}, grpc.FailFast(false))
resp, err := auth.remote.RoleGet(ctx, &pb.AuthRoleGetRequest{Role: role})
return (*AuthRoleGetResponse)(resp), toErr(ctx, err)
}

func (auth *auth) RoleList(ctx context.Context) (*AuthRoleListResponse, error) {
resp, err := auth.remote.RoleList(ctx, &pb.AuthRoleListRequest{}, grpc.FailFast(false))
resp, err := auth.remote.RoleList(ctx, &pb.AuthRoleListRequest{})
return (*AuthRoleListResponse)(resp), toErr(ctx, err)
}

Expand Down Expand Up @@ -202,7 +202,7 @@ type authenticator struct {
}

func (auth *authenticator) authenticate(ctx context.Context, name string, password string) (*AuthenticateResponse, error) {
resp, err := auth.remote.Authenticate(ctx, &pb.AuthenticateRequest{Name: name, Password: password}, grpc.FailFast(false))
resp, err := auth.remote.Authenticate(ctx, &pb.AuthenticateRequest{Name: name, Password: password})
return (*AuthenticateResponse)(resp), toErr(ctx, err)
}

Expand Down
19 changes: 14 additions & 5 deletions clientv3/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ type balancer interface {
// pinned returns the current pinned endpoint.
pinned() string
// endpointError handles error from server-side.
endpointError(addr string, err error)
endpointError(host string, err error)
// returns errorInfo of host, if any.
// ok is false, if host is healthy.
isFailed(host string) (ev errorInfo, ok bool)

// up is Up but includes whether the balancer will use the connection.
up(addr grpc.Address) (func(error), bool)
Expand Down Expand Up @@ -152,7 +155,13 @@ func (b *simpleBalancer) pinned() string {
return b.pinAddr
}

func (b *simpleBalancer) endpointError(addr string, err error) { return }
func (b *simpleBalancer) endpointError(host string, err error) {
panic("'endpointError' not implemented")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why panic here?

}

func (b *simpleBalancer) isFailed(host string) (errorInfo, bool) {
panic("'error' not implemented")
}

func getHost2ep(eps []string) map[string]string {
hm := make(map[string]string, len(eps))
Expand Down Expand Up @@ -316,7 +325,7 @@ func (b *simpleBalancer) up(addr grpc.Address) (func(error), bool) {
}
if b.pinAddr != "" {
if logger.V(4) {
logger.Infof("clientv3/balancer: %s is up but not pinned (already pinned %s)", addr.Addr, b.pinAddr)
logger.Infof("clientv3/balancer: %q is up but not pinned (already pinned %q)", addr.Addr, b.pinAddr)
}
return func(err error) {}, false
}
Expand All @@ -325,7 +334,7 @@ func (b *simpleBalancer) up(addr grpc.Address) (func(error), bool) {
b.downc = make(chan struct{})
b.pinAddr = addr.Addr
if logger.V(4) {
logger.Infof("clientv3/balancer: pin %s", addr.Addr)
logger.Infof("clientv3/balancer: pin %q", addr.Addr)
}
// notify client that a connection is up
b.readyOnce.Do(func() { close(b.readyc) })
Expand All @@ -336,7 +345,7 @@ func (b *simpleBalancer) up(addr grpc.Address) (func(error), bool) {
b.pinAddr = ""
b.mu.Unlock()
if logger.V(4) {
logger.Infof("clientv3/balancer: unpin %s (%v)", addr.Addr, err)
logger.Infof("clientv3/balancer: unpin %q (%q)", addr.Addr, err.Error())
}
}, true
}
Expand Down
27 changes: 9 additions & 18 deletions clientv3/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"

pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"google.golang.org/grpc"
)

type (
Expand Down Expand Up @@ -75,27 +74,19 @@ func (c *cluster) MemberRemove(ctx context.Context, id uint64) (*MemberRemoveRes

func (c *cluster) MemberUpdate(ctx context.Context, id uint64, peerAddrs []string) (*MemberUpdateResponse, error) {
// it is safe to retry on update.
for {
r := &pb.MemberUpdateRequest{ID: id, PeerURLs: peerAddrs}
resp, err := c.remote.MemberUpdate(ctx, r, grpc.FailFast(false))
if err == nil {
return (*MemberUpdateResponse)(resp), nil
}
if isHaltErr(ctx, err) {
return nil, toErr(ctx, err)
}
r := &pb.MemberUpdateRequest{ID: id, PeerURLs: peerAddrs}
resp, err := c.remote.MemberUpdate(ctx, r)
if err == nil {
return (*MemberUpdateResponse)(resp), nil
}
return nil, toErr(ctx, err)
}

func (c *cluster) MemberList(ctx context.Context) (*MemberListResponse, error) {
// it is safe to retry on list.
for {
resp, err := c.remote.MemberList(ctx, &pb.MemberListRequest{}, grpc.FailFast(false))
if err == nil {
return (*MemberListResponse)(resp), nil
}
if isHaltErr(ctx, err) {
return nil, toErr(ctx, err)
}
resp, err := c.remote.MemberList(ctx, &pb.MemberListRequest{})
if err == nil {
return (*MemberListResponse)(resp), nil
}
return nil, toErr(ctx, err)
}
86 changes: 55 additions & 31 deletions clientv3/health_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ const unknownService = "unknown service grpc.health.v1.Health"

type healthCheckFunc func(ep string) (bool, error)

type errorInfo struct {
failed time.Time
err error
}

// healthBalancer wraps a balancer so that it uses health checking
// to choose its endpoints.
type healthBalancer struct {
Expand All @@ -48,8 +53,8 @@ type healthBalancer struct {
// eps stores all client endpoints
eps []string

// unhealthy tracks the last unhealthy time of endpoints.
unhealthy map[string]time.Time
// unhealthyHosts tracks the last unhealthy time of endpoints.
Copy link
Contributor

@xiang90 xiang90 Oct 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unhealthy hosts or endpoints?

unhealthyHosts map[string]errorInfo

stopc chan struct{}
stopOnce sync.Once
Expand All @@ -61,13 +66,13 @@ type healthBalancer struct {

func newHealthBalancer(b balancer, timeout time.Duration, hc healthCheckFunc) *healthBalancer {
hb := &healthBalancer{
balancer: b,
healthCheck: hc,
eps: b.endpoints(),
addrs: eps2addrs(b.endpoints()),
host2ep: getHost2ep(b.endpoints()),
unhealthy: make(map[string]time.Time),
stopc: make(chan struct{}),
balancer: b,
healthCheck: hc,
eps: b.endpoints(),
addrs: eps2addrs(b.endpoints()),
host2ep: getHost2ep(b.endpoints()),
unhealthyHosts: make(map[string]errorInfo),
stopc: make(chan struct{}),
}
if timeout < minHealthRetryDuration {
timeout = minHealthRetryDuration
Expand All @@ -94,11 +99,11 @@ func (hb *healthBalancer) Up(addr grpc.Address) func(error) {
// finding healthy endpoint on retry could take several timeouts and redials.
// To avoid wasting retries, gray-list unhealthy endpoints.
hb.mu.Lock()
hb.unhealthy[addr.Addr] = time.Now()
hb.unhealthyHosts[addr.Addr] = errorInfo{failed: time.Now(), err: err}
hb.mu.Unlock()
f(err)
if logger.V(4) {
logger.Infof("clientv3/health-balancer: %s becomes unhealthy (%v)", addr.Addr, err)
logger.Infof("clientv3/health-balancer: %q becomes unhealthy (%q)", addr.Addr, err.Error())
}
}
}
Expand All @@ -120,7 +125,7 @@ func (hb *healthBalancer) updateAddrs(eps ...string) {
addrs, host2ep := eps2addrs(eps), getHost2ep(eps)
hb.mu.Lock()
hb.addrs, hb.eps, hb.host2ep = addrs, eps, host2ep
hb.unhealthy = make(map[string]time.Time)
hb.unhealthyHosts = make(map[string]errorInfo)
hb.mu.Unlock()
hb.balancer.updateAddrs(eps...)
}
Expand All @@ -142,11 +147,18 @@ func (hb *healthBalancer) updateUnhealthy(timeout time.Duration) {
select {
case <-time.After(timeout):
hb.mu.Lock()
for k, v := range hb.unhealthy {
if time.Since(v) > timeout {
delete(hb.unhealthy, k)
for k, v := range hb.unhealthyHosts {
if _, ok := hb.host2ep[k]; !ok {
delete(hb.unhealthyHosts, k)
if logger.V(4) {
logger.Infof("clientv3/health-balancer: removes stale endpoint %q from unhealthy", k)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from unhealthyEndpoints?

}
continue
}
if time.Since(v.failed) > timeout {
delete(hb.unhealthyHosts, k)
if logger.V(4) {
logger.Infof("clientv3/health-balancer: removes %s from unhealthy after %v", k, timeout)
logger.Infof("clientv3/health-balancer: removes %q from unhealthy after %v", k, timeout)
}
}
}
Expand All @@ -166,31 +178,42 @@ func (hb *healthBalancer) liveAddrs() []grpc.Address {
hb.mu.RLock()
defer hb.mu.RUnlock()
hbAddrs := hb.addrs
if len(hb.addrs) == 1 || len(hb.unhealthy) == 0 || len(hb.unhealthy) == len(hb.addrs) {
if len(hb.addrs) == 1 || len(hb.unhealthyHosts) == 0 || len(hb.unhealthyHosts) == len(hb.addrs) {
return hbAddrs
}
addrs := make([]grpc.Address, 0, len(hb.addrs)-len(hb.unhealthy))
addrs := make([]grpc.Address, 0, len(hb.addrs)-len(hb.unhealthyHosts))
for _, addr := range hb.addrs {
if _, unhealthy := hb.unhealthy[addr.Addr]; !unhealthy {
if _, unhealthy := hb.unhealthyHosts[addr.Addr]; !unhealthy {
addrs = append(addrs, addr)
}
}
return addrs
}

func (hb *healthBalancer) endpointError(addr string, err error) {
func (hb *healthBalancer) endpointError(host string, err error) {
hb.mu.Lock()
hb.unhealthy[addr] = time.Now()
hb.unhealthyHosts[host] = errorInfo{failed: time.Now(), err: err}
hb.mu.Unlock()
if logger.V(4) {
logger.Infof("clientv3/health-balancer: marking %s as unhealthy (%v)", addr, err)
logger.Infof("clientv3/health-balancer: marking %q as unhealthy (%q)", host, err.Error())
}
}

func (hb *healthBalancer) isFailed(host string) (ev errorInfo, ok bool) {
hb.mu.RLock()
ev, ok = hb.unhealthyHosts[host]
hb.mu.RUnlock()
return ev, ok
}

func (hb *healthBalancer) mayPin(addr grpc.Address) bool {
hb.mu.RLock()
skip := len(hb.addrs) == 1 || len(hb.unhealthy) == 0 || len(hb.addrs) == len(hb.unhealthy)
failedTime, bad := hb.unhealthy[addr.Addr]
if _, ok := hb.host2ep[addr.Addr]; !ok {
hb.mu.RUnlock()
return false
}
skip := len(hb.addrs) == 1 || len(hb.unhealthyHosts) == 0 || len(hb.addrs) == len(hb.unhealthyHosts)
ef, bad := hb.unhealthyHosts[addr.Addr]
dur := hb.healthCheckTimeout
hb.mu.RUnlock()
if skip || !bad {
Expand All @@ -201,26 +224,27 @@ func (hb *healthBalancer) mayPin(addr grpc.Address) bool {
// 2. balancer 'Up' unpins with grpc: failed with network I/O error
// 3. grpc-healthcheck still SERVING, thus retry to pin
// instead, return before grpc-healthcheck if failed within healthcheck timeout
if elapsed := time.Since(failedTime); elapsed < dur {
if elapsed := time.Since(ef.failed); elapsed < dur {
if logger.V(4) {
logger.Infof("clientv3/health-balancer: %s is up but not pinned (failed %v ago, require minimum %v after failure)", addr.Addr, elapsed, dur)
logger.Infof("clientv3/health-balancer: %q is up but not pinned (failed %v ago, require minimum %v after failure)", addr.Addr, elapsed, dur)
}
return false
}
if ok, _ := hb.healthCheck(addr.Addr); ok {
ok, err := hb.healthCheck(addr.Addr)
if ok {
hb.mu.Lock()
delete(hb.unhealthy, addr.Addr)
delete(hb.unhealthyHosts, addr.Addr)
hb.mu.Unlock()
if logger.V(4) {
logger.Infof("clientv3/health-balancer: %s is healthy (health check success)", addr.Addr)
logger.Infof("clientv3/health-balancer: %q is healthy (health check success)", addr.Addr)
}
return true
}
hb.mu.Lock()
hb.unhealthy[addr.Addr] = time.Now()
hb.unhealthyHosts[addr.Addr] = errorInfo{failed: time.Now(), err: err}
hb.mu.Unlock()
if logger.V(4) {
logger.Infof("clientv3/health-balancer: %s becomes unhealthy (health check failed)", addr.Addr)
logger.Infof("clientv3/health-balancer: %q becomes unhealthy (health check failed)", addr.Addr)
}
return false
}
Expand Down
4 changes: 2 additions & 2 deletions clientv3/integration/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,8 +473,8 @@ func TestKVNewAfterClose(t *testing.T) {

donec := make(chan struct{})
go func() {
if _, err := cli.Get(context.TODO(), "foo"); err != context.Canceled {
t.Fatalf("expected %v, got %v", context.Canceled, err)
if _, err := cli.Get(context.TODO(), "foo"); err != context.Canceled && err != grpc.ErrClientConnClosing {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rpc should only return status type error. why grpc ErrClientConnClosing will be returned?

t.Fatalf("expected %v or %v, got %v", context.Canceled, grpc.ErrClientConnClosing, err)
}
close(donec)
}()
Expand Down
Loading