Skip to content

Commit

Permalink
Merge pull request #9952 from gyuho/fix-keepalive
Browse files Browse the repository at this point in the history
clientv3: fix keepalive send interval when response queue is full
  • Loading branch information
xiang90 authored Jul 23, 2018
2 parents 104b6a3 + e93fb56 commit e4e3471
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 6 deletions.
43 changes: 43 additions & 0 deletions clientv3/integration/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,49 @@ func TestLeaseGrantErrConnClosed(t *testing.T) {
}
}

// TestLeaseKeepAliveFullResponseQueue ensures when response
// queue is full thus dropping keepalive response sends,
// keepalive request is sent with the same rate of TTL / 3.
func TestLeaseKeepAliveFullResponseQueue(t *testing.T) {
defer testutil.AfterTest(t)

clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)

lapi := clus.Client(0)

// expect lease keepalive every 10-second
lresp, err := lapi.Grant(context.Background(), 30)
if err != nil {
t.Fatalf("failed to create lease %v", err)
}
id := lresp.ID

old := clientv3.LeaseResponseChSize
defer func() {
clientv3.LeaseResponseChSize = old
}()
clientv3.LeaseResponseChSize = 0

// never fetch from response queue, and let it become full
_, err = lapi.KeepAlive(context.Background(), id)
if err != nil {
t.Fatalf("failed to keepalive lease %v", err)
}

// TTL should not be refreshed after 3 seconds
// expect keepalive to be triggered after TTL/3
time.Sleep(3 * time.Second)

tr, terr := lapi.TimeToLive(context.Background(), id)
if terr != nil {
t.Fatalf("failed to get lease information %v", terr)
}
if tr.TTL >= 29 {
t.Errorf("unexpected kept-alive lease TTL %d", tr.TTL)
}
}

func TestLeaseGrantNewAfterClose(t *testing.T) {
defer testutil.AfterTest(t)

Expand Down
25 changes: 19 additions & 6 deletions clientv3/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"

"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
Expand Down Expand Up @@ -77,15 +78,18 @@ const (
// defaultTTL is the assumed lease TTL used for the first keepalive
// deadline before the actual TTL is known to the client.
defaultTTL = 5 * time.Second
// a small buffer to store unsent lease responses.
leaseResponseChSize = 16
// NoLease is a lease ID for the absence of a lease.
NoLease LeaseID = 0

// retryConnWait is how long to wait before retrying request due to an error
retryConnWait = 500 * time.Millisecond
)

// LeaseResponseChSize is the size of buffer to store unsent lease responses.
// WARNING: DO NOT UPDATE.
// Only for testing purposes.
var LeaseResponseChSize = 16

// ErrKeepAliveHalted is returned if client keep alive loop halts with an unexpected error.
//
// This usually means that automatic lease renewal via KeepAlive is broken, but KeepAliveOnce will still work as expected.
Expand Down Expand Up @@ -169,6 +173,8 @@ type lessor struct {
firstKeepAliveOnce sync.Once

callOpts []grpc.CallOption

lg *zap.Logger
}

// keepAlive multiplexes a keepalive for a lease over multiple channels
Expand All @@ -193,6 +199,7 @@ func NewLeaseFromLeaseClient(remote pb.LeaseClient, c *Client, keepAliveTimeout
keepAlives: make(map[LeaseID]*keepAlive),
remote: remote,
firstKeepAliveTimeout: keepAliveTimeout,
lg: c.lg,
}
if l.firstKeepAliveTimeout == time.Second {
l.firstKeepAliveTimeout = defaultTTL
Expand Down Expand Up @@ -258,7 +265,7 @@ func (l *lessor) Leases(ctx context.Context) (*LeaseLeasesResponse, error) {
}

func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) {
ch := make(chan *LeaseKeepAliveResponse, leaseResponseChSize)
ch := make(chan *LeaseKeepAliveResponse, LeaseResponseChSize)

l.mu.Lock()
// ensure that recvKeepAliveLoop is still running
Expand Down Expand Up @@ -456,7 +463,6 @@ func (l *lessor) recvKeepAliveLoop() (gerr error) {

select {
case <-time.After(retryConnWait):
continue
case <-l.stopCtx.Done():
return l.stopCtx.Err()
}
Expand Down Expand Up @@ -514,9 +520,16 @@ func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) {
for _, ch := range ka.chs {
select {
case ch <- karesp:
ka.nextKeepAlive = nextKeepAlive
default:
if l.lg != nil {
l.lg.Warn("lease keepalive response queue is full; dropping response send",
zap.Int("queue-size", len(ch)),
zap.Int("queue-capacity", cap(ch)),
)
}
}
// still advance in order to rate-limit keep-alive sends
ka.nextKeepAlive = nextKeepAlive
}
}

Expand Down Expand Up @@ -565,7 +578,7 @@ func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) {
}

select {
case <-time.After(500 * time.Millisecond):
case <-time.After(retryConnWait):
case <-stream.Context().Done():
return
case <-l.donec:
Expand Down

0 comments on commit e4e3471

Please sign in to comment.