-
Notifications
You must be signed in to change notification settings - Fork 9.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
lease: force leader to apply its pending committed index for lease op… #7015
lease: force leader to apply its pending committed index for lease op… #7015
Conversation
lh = leasehttp.NewHandler(l) | ||
l := s.Lessor() | ||
w := s.ApplyWait() | ||
if l != nil && w != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when do we get applyWait == nil
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we will have a nil pointer exception in leaseHTTP. Probably need to add an extra check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ignore my previous comment, i mis-read it. applyWait == nil happens if we don't call EtcdServer.Start() from here https://github.com/coreos/etcd/blob/master/etcdserver/server.go#L526
|
||
for i := 0; i < 30; i++ { | ||
if err := <-errc; err != nil { | ||
cancel() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this cancel
isn't necessary; the Fatal will panic and the deferred cancel
will run
errc := make(chan error) | ||
|
||
for i := 0; i < 10; i++ { | ||
go func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for j := 0; j < 3; j++ {
go func() { errc <- stressLeaseRenew(ctx, clus.Client(j) }()
}
@@ -22,6 +22,7 @@ import ( | |||
"golang.org/x/net/context" | |||
"google.golang.org/grpc/metadata" | |||
|
|||
"github.com/coreos/etcd/clientv3" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
integration/ tests don't use clientv3
} | ||
} | ||
|
||
func stressLeaseRenew(ctx context.Context, cli *clientv3.Client) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if ctx
is passed in, the rpcs usually should respect that ctx
instead of using a fresh TODO() context
// TestV3LeaseTimeToLiveStress keeps creating lease and retriving it immediately to ensure the lease can be retrived. | ||
// it was oberserved that the immediate lease retrival after granting a lease from follower resulted lease not found. | ||
// related issue https://github.com/coreos/etcd/issues/6978 | ||
func TestV3LeaseTimeToLiveStress(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is nearly the same thing as the other test, the only difference is whether it calls stressLeaseTimeToLive
or stressLeaseRenew
; this can be simplified into a generic test func testLeaseStress(stresser func(context.Context, leaseClient))
which would be called by both TestV3Lease
functions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the reproducibility of this test? can this reproduce the issue every time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@xiang90 it is quite reproducible. the test fails very fast within one second or two even if i give it a 5 seconds timeout.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
awesome. thanks.
@@ -1613,6 +1615,10 @@ func (s *EtcdServer) getCommittedIndex() uint64 { | |||
return atomic.LoadUint64(&s.committedIndex) | |||
} | |||
|
|||
func (s *EtcdServer) GetCommittedIndex() uint64 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exporting internal details (e.g., indexes) like this really should be avoided; it leads high coupling and poor cohesion
@@ -565,6 +565,8 @@ func (s *EtcdServer) RaftHandler() http.Handler { return s.r.transport.Handler() | |||
|
|||
func (s *EtcdServer) Lessor() lease.Lessor { return s.lessor } | |||
|
|||
func (s *EtcdServer) ApplyWait() wait.WaitTime { return s.applyWait } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this shouldn't be exported to the outside world if possible, it's an internal implementation detail. Try
func (s* EtcdServer) ApplyNotify() <-chan {
return s.applyWait.Wait(atomic.LoadUint64(&s.committedIndex)) }
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's is much better. will change.
) | ||
|
||
// NewHandler returns an http Handler for lease renewals | ||
func NewHandler(l lease.Lessor) http.Handler { | ||
return &leaseHandler{l} | ||
func NewHandler(l lease.Lessor, w wait.WaitTime, i Indexer) http.Handler { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bad interface; it forces the handler to reason/know about indexes and WaitTime, then introduces a lot of interfaces to pretend it's an appropriate abstraction. Having func NewHandler(l lease.Lessor, waitch func() <-chan)
instead would be so much simpler; it can be called like NewHandler(l, func() <-chan { return s.ApplyNotify()})
.
select { | ||
case <-h.w.Wait(cci): | ||
case <-time.After(applyTimeout): | ||
http.Error(w, "waiting for node to catch up its applied index has timed out", http.StatusInternalServerError) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be an actual error code like the other errors; otherwise, this will cause the client lease to shutdown despite this most likely being a transient error
@@ -76,3 +122,32 @@ func TestTimeToLiveHTTP(t *testing.T) { | |||
t.Fatalf("granted TTL expected 5, got %d", resp.LeaseTimeToLiveResponse.GrantedTTL) | |||
} | |||
} | |||
|
|||
func TestTimeToLiveHTTPTimeout(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is very similar to TestRenewHTTPTimeout
both should probably call into a common function that has Renew/TimeToLive passed in
27b7560
to
d8849cb
Compare
|
||
func stressLeaseTimeToLive(tctx context.Context, lc pb.LeaseClient) error { | ||
for tctx.Err() == nil { | ||
resp, gerr := lc.LeaseGrant(context.Background(), &pb.LeaseGrantRequest{TTL: 60}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I couldn't use tctx
because tctx is associated with a timeout. i don't want LeaseGrant
and LeaseTimeToLive
to throw err due to timed out context.
} | ||
|
||
func stressLeaseRenew(tctx context.Context, lc pb.LeaseClient) error { | ||
cctx, cancel := context.WithCancel(context.Background()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I couldn't use tctx
because tctx is associated with a timeout. i don't want LeaseGrant
, Send()
, Recv()
and to throw err due to timed out context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func stressLeaseRenew(tctx context.Context, lc pb.LeaseClient) (reterr error) {
defer func() {
if tctx.Err() != nil {
reterr = nil
}
}()
...
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's nice. good to know.
All fixed. PLTAL @heyitsanthony |
le.Promote(time.Second) | ||
l, err := le.Grant(1, int64(5)) | ||
if err != nil { | ||
t.Fatalf("failed to create lease: %v", err) | ||
} | ||
|
||
ts := httptest.NewServer(NewHandler(le)) | ||
ts := httptest.NewServer(NewHandler(le, func() <-chan struct{} { return w.Wait(2) })) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for a time list; it's not being used for anything aside from getting a closed channel:
func waitReady() <-chan {
ch := make(chan struct)
close(ch)
return ch
}
ts := httptest.NewServer(NewHandler(le, waitReady))
w := wait.NewTimeList() | ||
// trigger committed index 2 to simulate the case that node has catched up to the latest CommittedIndex 2 returned by indexerMock | ||
w.Trigger(2) | ||
ts := httptest.NewServer(NewHandler(le, func() <-chan struct{} { return w.Wait(2) })) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ts := httptest.NewServer(NewHandler(le, waitReady))
t.Fatalf("failed to create lease: %v", err) | ||
} | ||
|
||
ts := httptest.NewServer(NewHandler(le, func() <-chan struct{} { return w.Wait(2) })) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
again, no need for a time list, it only needs to simulate a timeout:
ts := httptest.NewServer(NewHandler(le, func() <-chan struct{} { return nil }))
} | ||
|
||
func stressLeaseRenew(tctx context.Context, lc pb.LeaseClient) error { | ||
cctx, cancel := context.WithCancel(context.Background()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func stressLeaseRenew(tctx context.Context, lc pb.LeaseClient) (reterr error) {
defer func() {
if tctx.Err() != nil {
reterr = nil
}
}()
...
}
@@ -76,3 +85,42 @@ func TestTimeToLiveHTTP(t *testing.T) { | |||
t.Fatalf("granted TTL expected 5, got %d", resp.LeaseTimeToLiveResponse.GrantedTTL) | |||
} | |||
} | |||
|
|||
func testTimeout(t *testing.T, f func(*lease.Lease, *httptest.Server) error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func testApplyTimeout(t *testing.T, f func(*lease.Lease, string) error)
@@ -31,6 +31,8 @@ var ( | |||
ErrGRPCLeaseNotFound = grpc.Errorf(codes.NotFound, "etcdserver: requested lease not found") | |||
ErrGRPCLeaseExist = grpc.Errorf(codes.FailedPrecondition, "etcdserver: lease already exists") | |||
|
|||
ErrGRPCLeaseHTTPTimeout = grpc.Errorf(codes.Unavailable, "etcdserver: lease http request timed out") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After looking through EtcdServer.LeaseRenew and EtcdServer.LeaseTimeToLive again, it seems like this error can't possibly be returned to the client, so I was wrong about the client shutdown problem. Probably better not to define it as an rpc error if it won't leave the etcd server.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the LeaseHTTPTimeout do happen, how should the error being handled? right now, i have it set up so that the error will propagate to client as a grpc error with codes.Unavailable. then the client should retry the request on this error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Huh? How is propagating to the grpc client? It goes through LeaseRenew/LeaseTimeToLive in EtcdServer which will retry so long as the request context hasn't timed out. If the context times out, EtcdServer will return ErrTimeout
back to the client.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@heyitsanthony, you are right. i didn't see the loop logic in EtcdServer LeaseRenew. I'll get rid of the grpc error then.
b7aa0b3
to
a727aa0
Compare
All Fixed. PTAL @heyitsanthony |
7bf2163
to
0c64dcc
Compare
…erations suppose a lease granting request from a follower goes through and followed by a lease look up or renewal, the leader might not apply the lease grant request locally. So the leader might not find the lease from the lease look up or renewal request which will result lease not found error. To fix this issue, we force the leader to apply its pending commited index before looking up lease. FIX etcd-io#6978
0c64dcc
to
c81d108
Compare
All fixed. PTAL @xiang90 @heyitsanthony. CI passed. |
@@ -188,6 +188,8 @@ func (r *raftNode) start(rh *raftReadyHandler) { | |||
return | |||
} | |||
|
|||
updateCommittedIndex(&ap, rh) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move this before line 184? or the etcd routine might apply entries before we update commit index. then applied index might be greater than commit index.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch! will change
@@ -596,7 +598,8 @@ type etcdProgress struct { | |||
// and helps decouple state machine logic from Raft algorithms. | |||
// TODO: add a state machine interface to apply the commit entries and do snapshot/recover | |||
type raftReadyHandler struct { | |||
leadershipUpdate func() | |||
leadershipUpdate func() | |||
committedIdxUpdate func(uint64) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updateCommittedIndex.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's better. I also like to keep a consistent naming convention. should I also make leadershipUpdate -> updateLeadership
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably. we should do it in another pr though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make sense. will do!
c81d108
to
2faf72f
Compare
All fixed. PTAL @xiang90 |
defer to @heyitsanthony |
lgtm |
suppose a lease granting request from a follower goes through and followed by a lease look up or renewal, the leader might not apply the lease grant request locally. So the leader might not find the lease from the lease look up or renewal request which will result lease not found error. To fix this issue, we force the leader to apply its pending committed index before looking up lease.
FIX #6978