diff --git a/clientv3/integration/lease_test.go b/clientv3/integration/lease_test.go index bd7ee897bbf..c5d6d06c58b 100644 --- a/clientv3/integration/lease_test.go +++ b/clientv3/integration/lease_test.go @@ -314,6 +314,49 @@ func TestLeaseGrantErrConnClosed(t *testing.T) { } } +// TestLeaseKeepAliveFullResponseQueue ensures when response +// queue is full thus dropping keepalive response sends, +// keepalive request is sent with the same rate of TTL / 3. +func TestLeaseKeepAliveFullResponseQueue(t *testing.T) { + defer testutil.AfterTest(t) + + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + lapi := clus.Client(0) + + // expect lease keepalive every 10-second + lresp, err := lapi.Grant(context.Background(), 30) + if err != nil { + t.Fatalf("failed to create lease %v", err) + } + id := lresp.ID + + old := clientv3.LeaseResponseChSize + defer func() { + clientv3.LeaseResponseChSize = old + }() + clientv3.LeaseResponseChSize = 0 + + // never fetch from response queue, and let it become full + _, err = lapi.KeepAlive(context.Background(), id) + if err != nil { + t.Fatalf("failed to keepalive lease %v", err) + } + + // TTL should not be refreshed after 3 seconds + // expect keepalive to be triggered after TTL/3 + time.Sleep(3 * time.Second) + + tr, terr := lapi.TimeToLive(context.Background(), id) + if terr != nil { + t.Fatalf("failed to get lease information %v", terr) + } + if tr.TTL >= 29 { + t.Errorf("unexpected kept-alive lease TTL %d", tr.TTL) + } +} + func TestLeaseGrantNewAfterClose(t *testing.T) { defer testutil.AfterTest(t) diff --git a/clientv3/lease.go b/clientv3/lease.go index 3d5ff4f7226..3d2e897eec6 100644 --- a/clientv3/lease.go +++ b/clientv3/lease.go @@ -22,6 +22,7 @@ import ( "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/metadata" ) @@ -77,8 +78,6 @@ const ( // defaultTTL is the assumed lease TTL used for the first keepalive // deadline before the actual TTL is known to the client. defaultTTL = 5 * time.Second - // a small buffer to store unsent lease responses. - leaseResponseChSize = 16 // NoLease is a lease ID for the absence of a lease. NoLease LeaseID = 0 @@ -86,6 +85,11 @@ const ( retryConnWait = 500 * time.Millisecond ) +// LeaseResponseChSize is the size of buffer to store unsent lease responses. +// WARNING: DO NOT UPDATE. +// Only for testing purposes. +var LeaseResponseChSize = 16 + // ErrKeepAliveHalted is returned if client keep alive loop halts with an unexpected error. // // This usually means that automatic lease renewal via KeepAlive is broken, but KeepAliveOnce will still work as expected. @@ -169,6 +173,8 @@ type lessor struct { firstKeepAliveOnce sync.Once callOpts []grpc.CallOption + + lg *zap.Logger } // keepAlive multiplexes a keepalive for a lease over multiple channels @@ -193,6 +199,7 @@ func NewLeaseFromLeaseClient(remote pb.LeaseClient, c *Client, keepAliveTimeout keepAlives: make(map[LeaseID]*keepAlive), remote: remote, firstKeepAliveTimeout: keepAliveTimeout, + lg: c.lg, } if l.firstKeepAliveTimeout == time.Second { l.firstKeepAliveTimeout = defaultTTL @@ -258,7 +265,7 @@ func (l *lessor) Leases(ctx context.Context) (*LeaseLeasesResponse, error) { } func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) { - ch := make(chan *LeaseKeepAliveResponse, leaseResponseChSize) + ch := make(chan *LeaseKeepAliveResponse, LeaseResponseChSize) l.mu.Lock() // ensure that recvKeepAliveLoop is still running @@ -456,7 +463,6 @@ func (l *lessor) recvKeepAliveLoop() (gerr error) { select { case <-time.After(retryConnWait): - continue case <-l.stopCtx.Done(): return l.stopCtx.Err() } @@ -514,9 +520,16 @@ func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) { for _, ch := range ka.chs { select { case ch <- karesp: - ka.nextKeepAlive = nextKeepAlive default: + if l.lg != nil { + l.lg.Warn("lease keepalive response queue is full; dropping response send", + zap.Int("queue-size", len(ch)), + zap.Int("queue-capacity", cap(ch)), + ) + } } + // still advance in order to rate-limit keep-alive sends + ka.nextKeepAlive = nextKeepAlive } } @@ -565,7 +578,7 @@ func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) { } select { - case <-time.After(500 * time.Millisecond): + case <-time.After(retryConnWait): case <-stream.Context().Done(): return case <-l.donec: