Skip to content

Commit

Permalink
lessor: extend leases on promote if expires will be rate limited
Browse files Browse the repository at this point in the history
Instead of unconditionally randomizing, extend leases on promotion
if too many leases expire within the same time span. If the server
has few leases or spread out expires, there will be no extension.

Squashed previous commits for #8149.

Author: Anthony Romano <anthony.romano@coreos.com>

This is a combination of 4 commits below:

lease: randomize expiry on initial refresh call

Randomize the very first expiry on lease recovery
to prevent recovered leases from expiring all at
the same time.

Address #8096.

integration: remove lease exist checking on randomized expiry

Lease with TTL 5 should be renewed with randomization,
thus it's still possible to exist after 3 seconds.

lessor: extend leases on promote if expires will be rate limited

Instead of unconditionally randomizing, extend leases on promotion
if too many leases expire within the same time span. If the server
has few leases or spread out expires, there will be no extension.

Revert "integration: remove lease exist checking on randomized expiry"

This reverts commit 95bc33f. The new
lease extension algorithm should pass this test.
  • Loading branch information
gyuho committed Jun 23, 2017
1 parent c14aad0 commit 55de54a
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 4 deletions.
52 changes: 48 additions & 4 deletions lease/lessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,14 @@ const (
// NoLease is a special LeaseID representing the absence of a lease.
NoLease = LeaseID(0)

// maximum number of leases to revoke per iteration
// TODO: make this configurable?
leaseRevokeRate = 1000
forever = monotime.Time(math.MaxInt64)
)

var (
leaseBucketName = []byte("lease")

forever = monotime.Time(math.MaxInt64)
// maximum number of leases to revoke per second; configurable for tests
leaseRevokeRate = 1000

ErrNotPrimary = errors.New("not a primary lessor")
ErrLeaseNotFound = errors.New("lease not found")
Expand Down Expand Up @@ -328,8 +327,53 @@ func (le *lessor) Promote(extend time.Duration) {
for _, l := range le.leaseMap {
l.refresh(extend)
}

if len(le.leaseMap) < leaseRevokeRate {
// no possibility of lease pile-up
return
}

// adjust expiries in case of overlap
leases := make([]*Lease, 0, len(le.leaseMap))
for _, l := range le.leaseMap {
leases = append(leases, l)
}
sort.Sort(leasesByExpiry(leases))

baseWindow := leases[0].Remaining()
nextWindow := baseWindow + time.Second
expires := 0
// have fewer expires than the total revoke rate so piled up leases
// don't consume the entire revoke limit
targetExpiresPerSecond := (3 * leaseRevokeRate) / 4
for _, l := range leases {
remaining := l.Remaining()
if remaining > nextWindow {
baseWindow = remaining
nextWindow = baseWindow + time.Second
expires = 1
continue
}
expires++
if expires <= targetExpiresPerSecond {
continue
}
rateDelay := float64(time.Second) * (float64(expires) / float64(targetExpiresPerSecond))
// If leases are extended by n seconds, leases n seconds ahead of the
// base window should be extended by only one second.
rateDelay -= float64(remaining - baseWindow)
delay := time.Duration(rateDelay)
nextWindow = baseWindow + delay
l.refresh(delay + extend)
}
}

type leasesByExpiry []*Lease

func (le leasesByExpiry) Len() int { return len(le) }
func (le leasesByExpiry) Less(i, j int) bool { return le[i].Remaining() < le[j].Remaining() }
func (le leasesByExpiry) Swap(i, j int) { le[i], le[j] = le[j], le[i] }

func (le *lessor) Demote() {
le.mu.Lock()
defer le.mu.Unlock()
Expand Down
52 changes: 52 additions & 0 deletions lease/lessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,58 @@ func TestLessorRenew(t *testing.T) {
}
}

// TestLessorRenewExtendPileup ensures Lessor extends leases on promotion if too many
// expire at the same time.
func TestLessorRenewExtendPileup(t *testing.T) {
oldRevokeRate := leaseRevokeRate
defer func() { leaseRevokeRate = oldRevokeRate }()
leaseRevokeRate = 10

dir, be := NewTestBackend(t)
defer os.RemoveAll(dir)

le := newLessor(be, minLeaseTTL)
ttl := int64(10)
for i := 1; i <= leaseRevokeRate*10; i++ {
if _, err := le.Grant(LeaseID(2*i), ttl); err != nil {
t.Fatal(err)
}
// ttls that overlap spillover for ttl=10
if _, err := le.Grant(LeaseID(2*i+1), ttl+1); err != nil {
t.Fatal(err)
}
}

// simulate stop and recovery
le.Stop()
be.Close()
bcfg := backend.DefaultBackendConfig()
bcfg.Path = filepath.Join(dir, "be")
be = backend.New(bcfg)
defer be.Close()
le = newLessor(be, minLeaseTTL)

// extend after recovery should extend expiration on lease pile-up
le.Promote(0)

windowCounts := make(map[int64]int)
for _, l := range le.leaseMap {
// round up slightly for baseline ttl
s := int64(l.Remaining().Seconds() + 0.1)
windowCounts[s]++
}

for i := ttl; i < ttl+20; i++ {
c := windowCounts[i]
if c > leaseRevokeRate {
t.Errorf("expected at most %d expiring at %ds, got %d", leaseRevokeRate, i, c)
}
if c < leaseRevokeRate/2 {
t.Errorf("expected at least %d expiring at %ds, got %d", leaseRevokeRate/2, i, c)
}
}
}

func TestLessorDetach(t *testing.T) {
dir, be := NewTestBackend(t)
defer os.RemoveAll(dir)
Expand Down

0 comments on commit 55de54a

Please sign in to comment.