Skip to content

Commit

Permalink
clientv3: don't halt lease client if there is a lease error
Browse files Browse the repository at this point in the history
  • Loading branch information
Anthony Romano committed Apr 25, 2017
1 parent f254e38 commit 747f7d0
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 70 deletions.
15 changes: 8 additions & 7 deletions clientv3/example_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,13 @@ func ExampleLease_keepAlive() {
}

// the key 'foo' will be kept forever
ch, kaerr := cli.KeepAlive(context.TODO(), resp.ID)
if kaerr != nil {
log.Fatal(kaerr)
}
ch := cli.KeepAlive(context.TODO(), resp.ID)

ka := <-ch
if ka.Err != nil {
log.Fatal(ka.Err)
}

fmt.Println("ttl:", ka.TTL)
// Output: ttl: 5
}
Expand All @@ -131,9 +132,9 @@ func ExampleLease_keepAliveOnce() {
}

// to renew the lease only once
ka, kaerr := cli.KeepAliveOnce(context.TODO(), resp.ID)
if kaerr != nil {
log.Fatal(kaerr)
ka := cli.KeepAliveOnce(context.TODO(), resp.ID)
if ka.Err != nil {
log.Fatal(ka.Err)
}

fmt.Println("ttl:", ka.TTL)
Expand Down
149 changes: 86 additions & 63 deletions clientv3/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@ type LeaseGrantResponse struct {
// LeaseKeepAliveResponse is used to convert the protobuf keepalive response.
type LeaseKeepAliveResponse struct {
*pb.ResponseHeader
ID LeaseID
TTL int64
ID LeaseID
TTL int64
Err error
Deadline time.Time
}

// LeaseTimeToLiveResponse is used to convert the protobuf lease timetolive response.
Expand Down Expand Up @@ -70,23 +72,11 @@ const (
NoLease LeaseID = 0

// retryConnWait is how long to wait before retrying on a lost leader
// or keep alive loop failure.
retryConnWait = 500 * time.Millisecond
)

// ErrKeepAliveHalted is returned if client keep alive loop halts with an unexpected error.
//
// This usually means that automatic lease renewal via KeepAlive is broken, but KeepAliveOnce will still work as expected.
type ErrKeepAliveHalted struct {
Reason error
}

func (e ErrKeepAliveHalted) Error() string {
s := "etcdclient: leases keep alive halted"
if e.Reason != nil {
s += ": " + e.Reason.Error()
}
return s
}
type LeaseKeepAliveChan <-chan LeaseKeepAliveResponse

type Lease interface {
// Grant creates a new lease.
Expand All @@ -98,12 +88,24 @@ type Lease interface {
// TimeToLive retrieves the lease information of the given lease ID.
TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error)

// KeepAlive keeps the given lease alive forever.
KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error)

// KeepAliveOnce renews the lease once. In most of the cases, Keepalive
// should be used instead of KeepAliveOnce.
KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error)
// KeepAlive keeps the given lease alive forever. If the keepalive response posted to
// the channel is not consumed immediately, the lease client will continue sending keep alive requests
// to the etcd server at least every second until latest response is consumed.
//
// The KeepAlive channel closes if the underlying keep alive stream is interrupted in some
// way the client cannot handle itself; the error will be posted in the last keep
// alive message before closing. If there is no keepalive response within the
// lease's time-out, the channel will close with no error. In most cases calling
// KeepAlive again will re-establish keepalives with the target lease if it has not
// expired.
KeepAlive(ctx context.Context, id LeaseID) LeaseKeepAliveChan

// KeepAliveOnce renews the lease once. The response corresponds to the
// first message from calling KeepAlive. If the response has a recoverable
// error, KeepAliveOnce will retry the RPC with a new keep alive message.
//
// In most of the cases, Keepalive should be used instead of KeepAliveOnce.
KeepAliveOnce(ctx context.Context, id LeaseID) LeaseKeepAliveResponse

// Close releases all resources Lease keeps for efficient communication
// with the etcd server.
Expand All @@ -113,9 +115,8 @@ type Lease interface {
type lessor struct {
mu sync.Mutex // guards all fields

// donec is closed and loopErr is set when recvKeepAliveLoop stops
donec chan struct{}
loopErr error
// donec is closed when all goroutines are torn down from Close()
donec chan struct{}

remote pb.LeaseClient

Expand All @@ -137,7 +138,7 @@ type lessor struct {

// keepAlive multiplexes a keepalive for a lease over multiple channels
type keepAlive struct {
chs []chan<- *LeaseKeepAliveResponse
chs []chan<- LeaseKeepAliveResponse
ctxs []context.Context
// deadline is the time the keep alive channels close if no response
deadline time.Time
Expand Down Expand Up @@ -219,24 +220,22 @@ func (l *lessor) TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption
}
}

func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) {
ch := make(chan *LeaseKeepAliveResponse, leaseResponseChSize)
func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) LeaseKeepAliveChan {
ch := make(chan LeaseKeepAliveResponse, leaseResponseChSize)

l.mu.Lock()
// ensure that recvKeepAliveLoop is still running
select {
case <-l.donec:
err := l.loopErr
l.mu.Unlock()
close(ch)
return ch, ErrKeepAliveHalted{Reason: err}
return ch
default:
}
ka, ok := l.keepAlives[id]
if !ok {
// create fresh keep alive
ka = &keepAlive{
chs: []chan<- *LeaseKeepAliveResponse{ch},
chs: []chan<- LeaseKeepAliveResponse{ch},
ctxs: []context.Context{ctx},
deadline: time.Now().Add(l.firstKeepAliveTimeout),
nextKeepAlive: time.Now(),
Expand All @@ -252,24 +251,51 @@ func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAl

go l.keepAliveCtxCloser(id, ctx, ka.donec)
l.firstKeepAliveOnce.Do(func() {
go l.recvKeepAliveLoop()
go func() {
defer func() {
l.mu.Lock()
for _, ka := range l.keepAlives {
ka.Close(nil)
}
close(l.donec)
l.mu.Unlock()
}()

for l.stopCtx.Err() == nil {
err := l.recvKeepAliveLoop()
if l.stopCtx.Err() != nil && err == l.stopCtx.Err() {
// don't issue any error on ordinary shutdown path
break
}
l.mu.Lock()
for _, ka := range l.keepAlives {
ka.Close(err)
}
l.keepAlives = make(map[LeaseID]*keepAlive)
l.mu.Unlock()
select {
case <-l.stopCtx.Done():
case <-time.After(retryConnWait):
}
}
}()
go l.deadlineLoop()
})

return ch, nil
return ch
}

func (l *lessor) KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) {
func (l *lessor) KeepAliveOnce(ctx context.Context, id LeaseID) LeaseKeepAliveResponse {
for {
resp, err := l.keepAliveOnce(ctx, id)
if err == nil {
resp := l.keepAliveOnce(ctx, id)
if resp.Err == nil {
if resp.TTL <= 0 {
err = rpctypes.ErrLeaseNotFound
resp.Err = rpctypes.ErrLeaseNotFound
}
return resp, err
return resp
}
if isHaltErr(ctx, err) {
return nil, toErr(ctx, err)
if isHaltErr(ctx, resp.Err) {
return resp
}
}
}
Expand Down Expand Up @@ -339,7 +365,7 @@ func (l *lessor) closeRequireLeader() {
continue
}
// remove all channels that required a leader from keepalive
newChs := make([]chan<- *LeaseKeepAliveResponse, len(ka.chs)-reqIdxs)
newChs := make([]chan<- LeaseKeepAliveResponse, len(ka.chs)-reqIdxs)
newCtxs := make([]context.Context, len(newChs))
newIdx := 0
for i := range ka.chs {
Expand All @@ -353,45 +379,34 @@ func (l *lessor) closeRequireLeader() {
}
}

func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) {
func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) LeaseKeepAliveResponse {
cctx, cancel := context.WithCancel(ctx)
defer cancel()

stream, err := l.remote.LeaseKeepAlive(cctx, grpc.FailFast(false))
if err != nil {
return nil, toErr(ctx, err)
return LeaseKeepAliveResponse{Err: toErr(ctx, err)}
}

err = stream.Send(&pb.LeaseKeepAliveRequest{ID: int64(id)})
if err != nil {
return nil, toErr(ctx, err)
return LeaseKeepAliveResponse{Err: toErr(ctx, err)}
}

resp, rerr := stream.Recv()
if rerr != nil {
return nil, toErr(ctx, rerr)
return LeaseKeepAliveResponse{Err: toErr(ctx, rerr)}
}

karesp := &LeaseKeepAliveResponse{
return LeaseKeepAliveResponse{
ResponseHeader: resp.GetHeader(),
ID: LeaseID(resp.ID),
TTL: resp.TTL,
Deadline: time.Now().Add(time.Duration(resp.TTL) * time.Second),
}
return karesp, nil
}

func (l *lessor) recvKeepAliveLoop() (gerr error) {
defer func() {
l.mu.Lock()
close(l.donec)
l.loopErr = gerr
for _, ka := range l.keepAlives {
ka.Close()
}
l.keepAlives = make(map[LeaseID]*keepAlive)
l.mu.Unlock()
}()

stream, serr := l.resetRecv()
for serr == nil {
resp, err := stream.Recv()
Expand Down Expand Up @@ -443,6 +458,7 @@ func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) {
ResponseHeader: resp.GetHeader(),
ID: LeaseID(resp.ID),
TTL: resp.TTL,
Deadline: time.Now().Add(time.Duration(resp.TTL) * time.Second),
}

l.mu.Lock()
Expand All @@ -456,7 +472,7 @@ func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) {
if karesp.TTL <= 0 {
// lease expired; close all keep alive channels
delete(l.keepAlives, karesp.ID)
ka.Close()
ka.Close(nil)
return
}

Expand All @@ -465,7 +481,7 @@ func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) {
ka.deadline = time.Now().Add(time.Duration(karesp.TTL) * time.Second)
for _, ch := range ka.chs {
select {
case ch <- karesp:
case ch <- *karesp:
ka.nextKeepAlive = nextKeepAlive
default:
}
Expand All @@ -486,7 +502,7 @@ func (l *lessor) deadlineLoop() {
for id, ka := range l.keepAlives {
if ka.deadline.Before(now) {
// waited too long for response; lease may be expired
ka.Close()
ka.Close(nil)
delete(l.keepAlives, id)
}
}
Expand Down Expand Up @@ -528,9 +544,16 @@ func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) {
}
}

func (ka *keepAlive) Close() {
func (ka *keepAlive) Close(err error) {
close(ka.donec)
for _, ch := range ka.chs {
if err != nil {
// try to post error if buffer space available
select {
case ch <- LeaseKeepAliveResponse{Err: err}:
default:
}
}
close(ch)
}
}

0 comments on commit 747f7d0

Please sign in to comment.