-
Notifications
You must be signed in to change notification settings - Fork 9.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
clientv3: fix balancer/retry #8710
Closed
Closed
Changes from 6 commits
Commits
Show all changes
17 commits
Select commit
Hold shift + click to select a range
4818f23
words: whitelist more
gyuho 1a12a64
vendor/grpc-go: cherry-pick WriteStatus fix from v1.7.0
gyuho bc45227
glide: add note to grpc version
gyuho a03f0d1
clientv3/health: handle stale endpoints, rename to 'unhealthyHosts'
gyuho 5351ae2
clientv3: clean up logging, variable names
gyuho 26291ab
clientv3/balancer: fix retry logic
gyuho 8fac498
clientv3: remove redundant retries in KV, set FailFast=true
gyuho d13827f
clientv3: remove redundant retries in Watch, set FailFast=true
gyuho f90a0b6
clientv3: remove redundant retries in Lease, set FailFast=true
gyuho d79e655
clientv3: remove redundant retries in Cluster, set FailFast=true
gyuho ff1b2cd
clientv3: remove redundant retries in Maintenance, set FailFast=true
gyuho 4e0b52d
clientv3: remove redundant retries in Auth, set FailFast=true
gyuho 140cb0c
clientv3/integration: match grpc.ErrClientConnClosing in TestKVNewAft…
gyuho 5bdbcba
clientv3/integration: match ErrTimeout in testNetworkPartitionBalancer
gyuho 1ee5f32
clientv3/integration: match errors in leasing tests
gyuho 1e217ef
clientv3/integration: increase time-out for endpoint switch in TestKV…
gyuho 7c9cf81
clientv3/integration: increase timeout in watch connection close tests
gyuho File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,6 +30,11 @@ const unknownService = "unknown service grpc.health.v1.Health" | |
|
||
type healthCheckFunc func(ep string) (bool, error) | ||
|
||
type errorInfo struct { | ||
failed time.Time | ||
err error | ||
} | ||
|
||
// healthBalancer wraps a balancer so that it uses health checking | ||
// to choose its endpoints. | ||
type healthBalancer struct { | ||
|
@@ -48,8 +53,8 @@ type healthBalancer struct { | |
// eps stores all client endpoints | ||
eps []string | ||
|
||
// unhealthy tracks the last unhealthy time of endpoints. | ||
unhealthy map[string]time.Time | ||
// unhealthyHosts tracks the last unhealthy time of endpoints. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. unhealthy hosts or endpoints? |
||
unhealthyHosts map[string]errorInfo | ||
|
||
stopc chan struct{} | ||
stopOnce sync.Once | ||
|
@@ -61,13 +66,13 @@ type healthBalancer struct { | |
|
||
func newHealthBalancer(b balancer, timeout time.Duration, hc healthCheckFunc) *healthBalancer { | ||
hb := &healthBalancer{ | ||
balancer: b, | ||
healthCheck: hc, | ||
eps: b.endpoints(), | ||
addrs: eps2addrs(b.endpoints()), | ||
host2ep: getHost2ep(b.endpoints()), | ||
unhealthy: make(map[string]time.Time), | ||
stopc: make(chan struct{}), | ||
balancer: b, | ||
healthCheck: hc, | ||
eps: b.endpoints(), | ||
addrs: eps2addrs(b.endpoints()), | ||
host2ep: getHost2ep(b.endpoints()), | ||
unhealthyHosts: make(map[string]errorInfo), | ||
stopc: make(chan struct{}), | ||
} | ||
if timeout < minHealthRetryDuration { | ||
timeout = minHealthRetryDuration | ||
|
@@ -94,11 +99,11 @@ func (hb *healthBalancer) Up(addr grpc.Address) func(error) { | |
// finding healthy endpoint on retry could take several timeouts and redials. | ||
// To avoid wasting retries, gray-list unhealthy endpoints. | ||
hb.mu.Lock() | ||
hb.unhealthy[addr.Addr] = time.Now() | ||
hb.unhealthyHosts[addr.Addr] = errorInfo{failed: time.Now(), err: err} | ||
hb.mu.Unlock() | ||
f(err) | ||
if logger.V(4) { | ||
logger.Infof("clientv3/health-balancer: %s becomes unhealthy (%v)", addr.Addr, err) | ||
logger.Infof("clientv3/health-balancer: %q becomes unhealthy (%q)", addr.Addr, err.Error()) | ||
} | ||
} | ||
} | ||
|
@@ -120,7 +125,7 @@ func (hb *healthBalancer) updateAddrs(eps ...string) { | |
addrs, host2ep := eps2addrs(eps), getHost2ep(eps) | ||
hb.mu.Lock() | ||
hb.addrs, hb.eps, hb.host2ep = addrs, eps, host2ep | ||
hb.unhealthy = make(map[string]time.Time) | ||
hb.unhealthyHosts = make(map[string]errorInfo) | ||
hb.mu.Unlock() | ||
hb.balancer.updateAddrs(eps...) | ||
} | ||
|
@@ -142,11 +147,18 @@ func (hb *healthBalancer) updateUnhealthy(timeout time.Duration) { | |
select { | ||
case <-time.After(timeout): | ||
hb.mu.Lock() | ||
for k, v := range hb.unhealthy { | ||
if time.Since(v) > timeout { | ||
delete(hb.unhealthy, k) | ||
for k, v := range hb.unhealthyHosts { | ||
if _, ok := hb.host2ep[k]; !ok { | ||
delete(hb.unhealthyHosts, k) | ||
if logger.V(4) { | ||
logger.Infof("clientv3/health-balancer: removes stale endpoint %q from unhealthy", k) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. from unhealthyEndpoints? |
||
} | ||
continue | ||
} | ||
if time.Since(v.failed) > timeout { | ||
delete(hb.unhealthyHosts, k) | ||
if logger.V(4) { | ||
logger.Infof("clientv3/health-balancer: removes %s from unhealthy after %v", k, timeout) | ||
logger.Infof("clientv3/health-balancer: removes %q from unhealthy after %v", k, timeout) | ||
} | ||
} | ||
} | ||
|
@@ -166,31 +178,42 @@ func (hb *healthBalancer) liveAddrs() []grpc.Address { | |
hb.mu.RLock() | ||
defer hb.mu.RUnlock() | ||
hbAddrs := hb.addrs | ||
if len(hb.addrs) == 1 || len(hb.unhealthy) == 0 || len(hb.unhealthy) == len(hb.addrs) { | ||
if len(hb.addrs) == 1 || len(hb.unhealthyHosts) == 0 || len(hb.unhealthyHosts) == len(hb.addrs) { | ||
return hbAddrs | ||
} | ||
addrs := make([]grpc.Address, 0, len(hb.addrs)-len(hb.unhealthy)) | ||
addrs := make([]grpc.Address, 0, len(hb.addrs)-len(hb.unhealthyHosts)) | ||
for _, addr := range hb.addrs { | ||
if _, unhealthy := hb.unhealthy[addr.Addr]; !unhealthy { | ||
if _, unhealthy := hb.unhealthyHosts[addr.Addr]; !unhealthy { | ||
addrs = append(addrs, addr) | ||
} | ||
} | ||
return addrs | ||
} | ||
|
||
func (hb *healthBalancer) endpointError(addr string, err error) { | ||
func (hb *healthBalancer) endpointError(host string, err error) { | ||
hb.mu.Lock() | ||
hb.unhealthy[addr] = time.Now() | ||
hb.unhealthyHosts[host] = errorInfo{failed: time.Now(), err: err} | ||
hb.mu.Unlock() | ||
if logger.V(4) { | ||
logger.Infof("clientv3/health-balancer: marking %s as unhealthy (%v)", addr, err) | ||
logger.Infof("clientv3/health-balancer: marking %q as unhealthy (%q)", host, err.Error()) | ||
} | ||
} | ||
|
||
func (hb *healthBalancer) isFailed(host string) (ev errorInfo, ok bool) { | ||
hb.mu.RLock() | ||
ev, ok = hb.unhealthyHosts[host] | ||
hb.mu.RUnlock() | ||
return ev, ok | ||
} | ||
|
||
func (hb *healthBalancer) mayPin(addr grpc.Address) bool { | ||
hb.mu.RLock() | ||
skip := len(hb.addrs) == 1 || len(hb.unhealthy) == 0 || len(hb.addrs) == len(hb.unhealthy) | ||
failedTime, bad := hb.unhealthy[addr.Addr] | ||
if _, ok := hb.host2ep[addr.Addr]; !ok { | ||
hb.mu.RUnlock() | ||
return false | ||
} | ||
skip := len(hb.addrs) == 1 || len(hb.unhealthyHosts) == 0 || len(hb.addrs) == len(hb.unhealthyHosts) | ||
ef, bad := hb.unhealthyHosts[addr.Addr] | ||
dur := hb.healthCheckTimeout | ||
hb.mu.RUnlock() | ||
if skip || !bad { | ||
|
@@ -201,26 +224,27 @@ func (hb *healthBalancer) mayPin(addr grpc.Address) bool { | |
// 2. balancer 'Up' unpins with grpc: failed with network I/O error | ||
// 3. grpc-healthcheck still SERVING, thus retry to pin | ||
// instead, return before grpc-healthcheck if failed within healthcheck timeout | ||
if elapsed := time.Since(failedTime); elapsed < dur { | ||
if elapsed := time.Since(ef.failed); elapsed < dur { | ||
if logger.V(4) { | ||
logger.Infof("clientv3/health-balancer: %s is up but not pinned (failed %v ago, require minimum %v after failure)", addr.Addr, elapsed, dur) | ||
logger.Infof("clientv3/health-balancer: %q is up but not pinned (failed %v ago, require minimum %v after failure)", addr.Addr, elapsed, dur) | ||
} | ||
return false | ||
} | ||
if ok, _ := hb.healthCheck(addr.Addr); ok { | ||
ok, err := hb.healthCheck(addr.Addr) | ||
if ok { | ||
hb.mu.Lock() | ||
delete(hb.unhealthy, addr.Addr) | ||
delete(hb.unhealthyHosts, addr.Addr) | ||
hb.mu.Unlock() | ||
if logger.V(4) { | ||
logger.Infof("clientv3/health-balancer: %s is healthy (health check success)", addr.Addr) | ||
logger.Infof("clientv3/health-balancer: %q is healthy (health check success)", addr.Addr) | ||
} | ||
return true | ||
} | ||
hb.mu.Lock() | ||
hb.unhealthy[addr.Addr] = time.Now() | ||
hb.unhealthyHosts[addr.Addr] = errorInfo{failed: time.Now(), err: err} | ||
hb.mu.Unlock() | ||
if logger.V(4) { | ||
logger.Infof("clientv3/health-balancer: %s becomes unhealthy (health check failed)", addr.Addr) | ||
logger.Infof("clientv3/health-balancer: %q becomes unhealthy (health check failed)", addr.Addr) | ||
} | ||
return false | ||
} | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why panic here?