Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clientv3: fix balancer/retry #8710

Closed
wants to merge 17 commits into from
Closed
Changes from 1 commit
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 39 additions & 28 deletions clientv3/health_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ type healthBalancer struct {
// eps stores all client endpoints
eps []string

// unhealthy tracks the last unhealthy time of endpoints.
unhealthy map[string]time.Time
// unhealthyHosts tracks the last unhealthy time of endpoints.
Copy link
Contributor

@xiang90 xiang90 Oct 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unhealthy hosts or endpoints?

unhealthyHosts map[string]time.Time

stopc chan struct{}
stopOnce sync.Once
Expand All @@ -61,13 +61,13 @@ type healthBalancer struct {

func newHealthBalancer(b balancer, timeout time.Duration, hc healthCheckFunc) *healthBalancer {
hb := &healthBalancer{
balancer: b,
healthCheck: hc,
eps: b.endpoints(),
addrs: eps2addrs(b.endpoints()),
host2ep: getHost2ep(b.endpoints()),
unhealthy: make(map[string]time.Time),
stopc: make(chan struct{}),
balancer: b,
healthCheck: hc,
eps: b.endpoints(),
addrs: eps2addrs(b.endpoints()),
host2ep: getHost2ep(b.endpoints()),
unhealthyHosts: make(map[string]time.Time),
stopc: make(chan struct{}),
}
if timeout < minHealthRetryDuration {
timeout = minHealthRetryDuration
Expand All @@ -94,11 +94,11 @@ func (hb *healthBalancer) Up(addr grpc.Address) func(error) {
// finding healthy endpoint on retry could take several timeouts and redials.
// To avoid wasting retries, gray-list unhealthy endpoints.
hb.mu.Lock()
hb.unhealthy[addr.Addr] = time.Now()
hb.unhealthyHosts[addr.Addr] = time.Now()
hb.mu.Unlock()
f(err)
if logger.V(4) {
logger.Infof("clientv3/health-balancer: %s becomes unhealthy (%v)", addr.Addr, err)
logger.Infof("clientv3/health-balancer: %q becomes unhealthy (%v)", addr.Addr, err)
}
}
}
Expand All @@ -120,7 +120,7 @@ func (hb *healthBalancer) updateAddrs(eps ...string) {
addrs, host2ep := eps2addrs(eps), getHost2ep(eps)
hb.mu.Lock()
hb.addrs, hb.eps, hb.host2ep = addrs, eps, host2ep
hb.unhealthy = make(map[string]time.Time)
hb.unhealthyHosts = make(map[string]time.Time)
hb.mu.Unlock()
hb.balancer.updateAddrs(eps...)
}
Expand All @@ -142,11 +142,18 @@ func (hb *healthBalancer) updateUnhealthy(timeout time.Duration) {
select {
case <-time.After(timeout):
hb.mu.Lock()
for k, v := range hb.unhealthy {
for k, v := range hb.unhealthyHosts {
if _, ok := hb.host2ep[k]; !ok {
delete(hb.unhealthyHosts, k)
if logger.V(4) {
logger.Infof("clientv3/health-balancer: removes stale endpoint %q from unhealthy", k)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from unhealthyEndpoints?

}
continue
}
if time.Since(v) > timeout {
delete(hb.unhealthy, k)
delete(hb.unhealthyHosts, k)
if logger.V(4) {
logger.Infof("clientv3/health-balancer: removes %s from unhealthy after %v", k, timeout)
logger.Infof("clientv3/health-balancer: removes %q from unhealthy after %v", k, timeout)
}
}
}
Expand All @@ -166,31 +173,35 @@ func (hb *healthBalancer) liveAddrs() []grpc.Address {
hb.mu.RLock()
defer hb.mu.RUnlock()
hbAddrs := hb.addrs
if len(hb.addrs) == 1 || len(hb.unhealthy) == 0 || len(hb.unhealthy) == len(hb.addrs) {
if len(hb.addrs) == 1 || len(hb.unhealthyHosts) == 0 || len(hb.unhealthyHosts) == len(hb.addrs) {
return hbAddrs
}
addrs := make([]grpc.Address, 0, len(hb.addrs)-len(hb.unhealthy))
addrs := make([]grpc.Address, 0, len(hb.addrs)-len(hb.unhealthyHosts))
for _, addr := range hb.addrs {
if _, unhealthy := hb.unhealthy[addr.Addr]; !unhealthy {
if _, unhealthy := hb.unhealthyHosts[addr.Addr]; !unhealthy {
addrs = append(addrs, addr)
}
}
return addrs
}

func (hb *healthBalancer) endpointError(addr string, err error) {
func (hb *healthBalancer) endpointError(host string, err error) {
hb.mu.Lock()
hb.unhealthy[addr] = time.Now()
hb.unhealthyHosts[host] = time.Now()
hb.mu.Unlock()
if logger.V(4) {
logger.Infof("clientv3/health-balancer: marking %s as unhealthy (%v)", addr, err)
logger.Infof("clientv3/health-balancer: marking %q as unhealthy (%v)", host, err)
}
}

func (hb *healthBalancer) mayPin(addr grpc.Address) bool {
hb.mu.RLock()
skip := len(hb.addrs) == 1 || len(hb.unhealthy) == 0 || len(hb.addrs) == len(hb.unhealthy)
failedTime, bad := hb.unhealthy[addr.Addr]
if _, ok := hb.host2ep[addr.Addr]; !ok {
hb.mu.RUnlock()
return false
}
skip := len(hb.addrs) == 1 || len(hb.unhealthyHosts) == 0 || len(hb.addrs) == len(hb.unhealthyHosts)
failedTime, bad := hb.unhealthyHosts[addr.Addr]
dur := hb.healthCheckTimeout
hb.mu.RUnlock()
if skip || !bad {
Expand All @@ -203,24 +214,24 @@ func (hb *healthBalancer) mayPin(addr grpc.Address) bool {
// instead, return before grpc-healthcheck if failed within healthcheck timeout
if elapsed := time.Since(failedTime); elapsed < dur {
if logger.V(4) {
logger.Infof("clientv3/health-balancer: %s is up but not pinned (failed %v ago, require minimum %v after failure)", addr.Addr, elapsed, dur)
logger.Infof("clientv3/health-balancer: %q is up but not pinned (failed %v ago, require minimum %v after failure)", addr.Addr, elapsed, dur)
}
return false
}
if ok, _ := hb.healthCheck(addr.Addr); ok {
hb.mu.Lock()
delete(hb.unhealthy, addr.Addr)
delete(hb.unhealthyHosts, addr.Addr)
hb.mu.Unlock()
if logger.V(4) {
logger.Infof("clientv3/health-balancer: %s is healthy (health check success)", addr.Addr)
logger.Infof("clientv3/health-balancer: %q is healthy (health check success)", addr.Addr)
}
return true
}
hb.mu.Lock()
hb.unhealthy[addr.Addr] = time.Now()
hb.unhealthyHosts[addr.Addr] = time.Now()
hb.mu.Unlock()
if logger.V(4) {
logger.Infof("clientv3/health-balancer: %s becomes unhealthy (health check failed)", addr.Addr)
logger.Infof("clientv3/health-balancer: %q becomes unhealthy (health check failed)", addr.Addr)
}
return false
}
Expand Down