-
Notifications
You must be signed in to change notification settings - Fork 9.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
clientv3: fix balancer/retry #8710
Conversation
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
Cherry-pick 22c3f92f5faea8db492fb0f5ae4daf0d2752b19e. Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
1. Handle stale endpoints in health balancer. 2. Rename 'unhealthy' to 'unhealthyHosts' to make it clear. 3. Use quote format string to log empty hosts. Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
…erClose Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
…GetResetLoneEndpoint Since we have added additional wait/sync when error "there is no available address". Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
Integration tests have dial timeout 5-sec, and it's possible that balancer retry logic waits 5-sec and test times out because gRPC calls grpc.downErr function after connection wait starts in retrial part. Just increasing time-out should be ok. In most cases, grpc.downErr gets called before starting the wait. e.g. === RUN TestWatchErrConnClosed INFO: 2017/10/18 23:55:39 clientv3/balancer: pin "localhost:91847156765553894590" INFO: 2017/10/18 23:55:39 clientv3/retry: wait 5s for healthy endpoint INFO: 2017/10/18 23:55:39 clientv3/balancer: unpin "localhost:91847156765553894590" ("grpc: the client connection is closing") INFO: 2017/10/18 23:55:39 clientv3/health-balancer: "localhost:91847156765553894590" becomes unhealthy ("grpc: the client connection is closing") --- F.A.I.L: TestWatchErrConnClosed (3.07s) watch_test.go:682: wc.Watch took too long Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
430070b
to
7c9cf81
Compare
@@ -152,7 +152,9 @@ func (b *simpleBalancer) pinned() string { | |||
return b.pinAddr | |||
} | |||
|
|||
func (b *simpleBalancer) endpointError(addr string, err error) { return } | |||
func (b *simpleBalancer) endpointError(host string, err error) { | |||
panic("'endpointError' not implemented") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why panic here?
return func(rpcCtx context.Context, f rpcFunc) error { | ||
for { | ||
pinned := c.balancer.pinned() | ||
err := f(rpcCtx) | ||
if err == nil { | ||
return nil | ||
} | ||
if logger.V(4) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should keep it here
// always stop retry on etcd errors other than invalid auth token | ||
if rpctypes.Error(err) == rpctypes.ErrInvalidAuthToken { | ||
gterr := c.getToken(rpcCtx) | ||
if gterr != nil { | ||
if logger.V(4) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log the error here and log we cannot retry due to this error.
@@ -48,8 +48,8 @@ type healthBalancer struct { | |||
// eps stores all client endpoints | |||
eps []string | |||
|
|||
// unhealthy tracks the last unhealthy time of endpoints. | |||
unhealthy map[string]time.Time | |||
// unhealthyHosts tracks the last unhealthy time of endpoints. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unhealthy hosts or endpoints?
if _, ok := hb.host2ep[k]; !ok { | ||
delete(hb.unhealthyHosts, k) | ||
if logger.V(4) { | ||
logger.Infof("clientv3/health-balancer: removes stale endpoint %q from unhealthy", k) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from unhealthyEndpoints?
@@ -473,8 +473,8 @@ func TestKVNewAfterClose(t *testing.T) { | |||
|
|||
donec := make(chan struct{}) | |||
go func() { | |||
if _, err := cli.Get(context.TODO(), "foo"); err != context.Canceled { | |||
t.Fatalf("expected %v, got %v", context.Canceled, err) | |||
if _, err := cli.Get(context.TODO(), "foo"); err != context.Canceled && err != grpc.ErrClientConnClosing { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rpc should only return status type error. why grpc ErrClientConnClosing will be returned?
@@ -1083,8 +1086,8 @@ func TestLeasingOwnerPutError(t *testing.T) { | |||
clus.Members[0].Stop(t) | |||
ctx, cancel := context.WithTimeout(context.TODO(), 100*time.Millisecond) | |||
defer cancel() | |||
if resp, err := lkv.Put(ctx, "k", "v"); err == nil { | |||
t.Fatalf("expected error, got response %+v", resp) | |||
if resp, err := lkv.Put(ctx, "k", "v"); err != context.DeadlineExceeded && !strings.Contains(err.Error(), "transport is closing") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rpc call should only return context type or status type error.
@@ -881,7 +881,7 @@ func TestKVGetResetLoneEndpoint(t *testing.T) { | |||
// have Get try to reconnect | |||
donec := make(chan struct{}) | |||
go func() { | |||
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) | |||
ctx, cancel := context.WithTimeout(context.TODO(), 8*time.Second) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why it takes 8 seconds to reconnect?
|
||
const minDialDuration = 3 * time.Second | ||
|
||
func (c *Client) newRetryWrapper(write bool) retryRPCFunc { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this becomes far more complicated than what we discussed.
retry should only care about retry. we should put the initial wait connection logic in another func.
@@ -678,7 +678,7 @@ func TestWatchErrConnClosed(t *testing.T) { | |||
clus.TakeClient(0) | |||
|
|||
select { | |||
case <-time.After(3 * time.Second): | |||
case <-time.After(10 * time.Second): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if dial timeout is 5 seconds, then 6 seconds should be enough? also can we simply reduce dial timeout?
we will send new prs to replace this. this one is too messy. closing. |
Fixing/Improving
rpctypes.EtcdError
handling with health balancergrpc/grpc-go
error handling (e.g.transport.ConnectionError
,grpc.downErr
)SetEndpoints
)grpc.FailFast=true
as defaultAuth
,Maintenance
(e.g.MoveLeader
as mutable RPC)"there is no address available"
then wait for connection and retry)%q
)TODO: Investigate CI failures?