-
Notifications
You must be signed in to change notification settings - Fork 9.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
clientv3: fix balancer/retry #8710
Changes from all commits
4818f23
1a12a64
bc45227
a03f0d1
5351ae2
26291ab
8fac498
d13827f
f90a0b6
d79e655
ff1b2cd
4e0b52d
140cb0c
5bdbcba
1ee5f32
1e217ef
7c9cf81
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,6 +30,11 @@ const unknownService = "unknown service grpc.health.v1.Health" | |
|
||
type healthCheckFunc func(ep string) (bool, error) | ||
|
||
type errorInfo struct { | ||
failed time.Time | ||
err error | ||
} | ||
|
||
// healthBalancer wraps a balancer so that it uses health checking | ||
// to choose its endpoints. | ||
type healthBalancer struct { | ||
|
@@ -48,8 +53,8 @@ type healthBalancer struct { | |
// eps stores all client endpoints | ||
eps []string | ||
|
||
// unhealthy tracks the last unhealthy time of endpoints. | ||
unhealthy map[string]time.Time | ||
// unhealthyHosts tracks the last unhealthy time of endpoints. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. unhealthy hosts or endpoints? |
||
unhealthyHosts map[string]errorInfo | ||
|
||
stopc chan struct{} | ||
stopOnce sync.Once | ||
|
@@ -61,13 +66,13 @@ type healthBalancer struct { | |
|
||
func newHealthBalancer(b balancer, timeout time.Duration, hc healthCheckFunc) *healthBalancer { | ||
hb := &healthBalancer{ | ||
balancer: b, | ||
healthCheck: hc, | ||
eps: b.endpoints(), | ||
addrs: eps2addrs(b.endpoints()), | ||
host2ep: getHost2ep(b.endpoints()), | ||
unhealthy: make(map[string]time.Time), | ||
stopc: make(chan struct{}), | ||
balancer: b, | ||
healthCheck: hc, | ||
eps: b.endpoints(), | ||
addrs: eps2addrs(b.endpoints()), | ||
host2ep: getHost2ep(b.endpoints()), | ||
unhealthyHosts: make(map[string]errorInfo), | ||
stopc: make(chan struct{}), | ||
} | ||
if timeout < minHealthRetryDuration { | ||
timeout = minHealthRetryDuration | ||
|
@@ -94,11 +99,11 @@ func (hb *healthBalancer) Up(addr grpc.Address) func(error) { | |
// finding healthy endpoint on retry could take several timeouts and redials. | ||
// To avoid wasting retries, gray-list unhealthy endpoints. | ||
hb.mu.Lock() | ||
hb.unhealthy[addr.Addr] = time.Now() | ||
hb.unhealthyHosts[addr.Addr] = errorInfo{failed: time.Now(), err: err} | ||
hb.mu.Unlock() | ||
f(err) | ||
if logger.V(4) { | ||
logger.Infof("clientv3/health-balancer: %s becomes unhealthy (%v)", addr.Addr, err) | ||
logger.Infof("clientv3/health-balancer: %q becomes unhealthy (%q)", addr.Addr, err.Error()) | ||
} | ||
} | ||
} | ||
|
@@ -120,7 +125,7 @@ func (hb *healthBalancer) updateAddrs(eps ...string) { | |
addrs, host2ep := eps2addrs(eps), getHost2ep(eps) | ||
hb.mu.Lock() | ||
hb.addrs, hb.eps, hb.host2ep = addrs, eps, host2ep | ||
hb.unhealthy = make(map[string]time.Time) | ||
hb.unhealthyHosts = make(map[string]errorInfo) | ||
hb.mu.Unlock() | ||
hb.balancer.updateAddrs(eps...) | ||
} | ||
|
@@ -142,11 +147,18 @@ func (hb *healthBalancer) updateUnhealthy(timeout time.Duration) { | |
select { | ||
case <-time.After(timeout): | ||
hb.mu.Lock() | ||
for k, v := range hb.unhealthy { | ||
if time.Since(v) > timeout { | ||
delete(hb.unhealthy, k) | ||
for k, v := range hb.unhealthyHosts { | ||
if _, ok := hb.host2ep[k]; !ok { | ||
delete(hb.unhealthyHosts, k) | ||
if logger.V(4) { | ||
logger.Infof("clientv3/health-balancer: removes stale endpoint %q from unhealthy", k) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. from unhealthyEndpoints? |
||
} | ||
continue | ||
} | ||
if time.Since(v.failed) > timeout { | ||
delete(hb.unhealthyHosts, k) | ||
if logger.V(4) { | ||
logger.Infof("clientv3/health-balancer: removes %s from unhealthy after %v", k, timeout) | ||
logger.Infof("clientv3/health-balancer: removes %q from unhealthy after %v", k, timeout) | ||
} | ||
} | ||
} | ||
|
@@ -166,31 +178,42 @@ func (hb *healthBalancer) liveAddrs() []grpc.Address { | |
hb.mu.RLock() | ||
defer hb.mu.RUnlock() | ||
hbAddrs := hb.addrs | ||
if len(hb.addrs) == 1 || len(hb.unhealthy) == 0 || len(hb.unhealthy) == len(hb.addrs) { | ||
if len(hb.addrs) == 1 || len(hb.unhealthyHosts) == 0 || len(hb.unhealthyHosts) == len(hb.addrs) { | ||
return hbAddrs | ||
} | ||
addrs := make([]grpc.Address, 0, len(hb.addrs)-len(hb.unhealthy)) | ||
addrs := make([]grpc.Address, 0, len(hb.addrs)-len(hb.unhealthyHosts)) | ||
for _, addr := range hb.addrs { | ||
if _, unhealthy := hb.unhealthy[addr.Addr]; !unhealthy { | ||
if _, unhealthy := hb.unhealthyHosts[addr.Addr]; !unhealthy { | ||
addrs = append(addrs, addr) | ||
} | ||
} | ||
return addrs | ||
} | ||
|
||
func (hb *healthBalancer) endpointError(addr string, err error) { | ||
func (hb *healthBalancer) endpointError(host string, err error) { | ||
hb.mu.Lock() | ||
hb.unhealthy[addr] = time.Now() | ||
hb.unhealthyHosts[host] = errorInfo{failed: time.Now(), err: err} | ||
hb.mu.Unlock() | ||
if logger.V(4) { | ||
logger.Infof("clientv3/health-balancer: marking %s as unhealthy (%v)", addr, err) | ||
logger.Infof("clientv3/health-balancer: marking %q as unhealthy (%q)", host, err.Error()) | ||
} | ||
} | ||
|
||
func (hb *healthBalancer) isFailed(host string) (ev errorInfo, ok bool) { | ||
hb.mu.RLock() | ||
ev, ok = hb.unhealthyHosts[host] | ||
hb.mu.RUnlock() | ||
return ev, ok | ||
} | ||
|
||
func (hb *healthBalancer) mayPin(addr grpc.Address) bool { | ||
hb.mu.RLock() | ||
skip := len(hb.addrs) == 1 || len(hb.unhealthy) == 0 || len(hb.addrs) == len(hb.unhealthy) | ||
failedTime, bad := hb.unhealthy[addr.Addr] | ||
if _, ok := hb.host2ep[addr.Addr]; !ok { | ||
hb.mu.RUnlock() | ||
return false | ||
} | ||
skip := len(hb.addrs) == 1 || len(hb.unhealthyHosts) == 0 || len(hb.addrs) == len(hb.unhealthyHosts) | ||
ef, bad := hb.unhealthyHosts[addr.Addr] | ||
dur := hb.healthCheckTimeout | ||
hb.mu.RUnlock() | ||
if skip || !bad { | ||
|
@@ -201,26 +224,27 @@ func (hb *healthBalancer) mayPin(addr grpc.Address) bool { | |
// 2. balancer 'Up' unpins with grpc: failed with network I/O error | ||
// 3. grpc-healthcheck still SERVING, thus retry to pin | ||
// instead, return before grpc-healthcheck if failed within healthcheck timeout | ||
if elapsed := time.Since(failedTime); elapsed < dur { | ||
if elapsed := time.Since(ef.failed); elapsed < dur { | ||
if logger.V(4) { | ||
logger.Infof("clientv3/health-balancer: %s is up but not pinned (failed %v ago, require minimum %v after failure)", addr.Addr, elapsed, dur) | ||
logger.Infof("clientv3/health-balancer: %q is up but not pinned (failed %v ago, require minimum %v after failure)", addr.Addr, elapsed, dur) | ||
} | ||
return false | ||
} | ||
if ok, _ := hb.healthCheck(addr.Addr); ok { | ||
ok, err := hb.healthCheck(addr.Addr) | ||
if ok { | ||
hb.mu.Lock() | ||
delete(hb.unhealthy, addr.Addr) | ||
delete(hb.unhealthyHosts, addr.Addr) | ||
hb.mu.Unlock() | ||
if logger.V(4) { | ||
logger.Infof("clientv3/health-balancer: %s is healthy (health check success)", addr.Addr) | ||
logger.Infof("clientv3/health-balancer: %q is healthy (health check success)", addr.Addr) | ||
} | ||
return true | ||
} | ||
hb.mu.Lock() | ||
hb.unhealthy[addr.Addr] = time.Now() | ||
hb.unhealthyHosts[addr.Addr] = errorInfo{failed: time.Now(), err: err} | ||
hb.mu.Unlock() | ||
if logger.V(4) { | ||
logger.Infof("clientv3/health-balancer: %s becomes unhealthy (health check failed)", addr.Addr) | ||
logger.Infof("clientv3/health-balancer: %q becomes unhealthy (health check failed)", addr.Addr) | ||
} | ||
return false | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -473,8 +473,8 @@ func TestKVNewAfterClose(t *testing.T) { | |
|
||
donec := make(chan struct{}) | ||
go func() { | ||
if _, err := cli.Get(context.TODO(), "foo"); err != context.Canceled { | ||
t.Fatalf("expected %v, got %v", context.Canceled, err) | ||
if _, err := cli.Get(context.TODO(), "foo"); err != context.Canceled && err != grpc.ErrClientConnClosing { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. rpc should only return status type error. why grpc ErrClientConnClosing will be returned? |
||
t.Fatalf("expected %v or %v, got %v", context.Canceled, grpc.ErrClientConnClosing, err) | ||
} | ||
close(donec) | ||
}() | ||
|
@@ -881,7 +881,7 @@ func TestKVGetResetLoneEndpoint(t *testing.T) { | |
// have Get try to reconnect | ||
donec := make(chan struct{}) | ||
go func() { | ||
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) | ||
ctx, cancel := context.WithTimeout(context.TODO(), 8*time.Second) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why it takes 8 seconds to reconnect? |
||
if _, err := cli.Get(ctx, "abc", clientv3.WithSerializable()); err != nil { | ||
t.Fatal(err) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why panic here?