-
Notifications
You must be signed in to change notification settings - Fork 4
/
renewer.go
102 lines (93 loc) · 2.5 KB
/
renewer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package lease
import (
"strings"
"sync"
)
// Renewer used by the LeaseCoordinator to renew leases held by the system.
// Each LeaseCoordinator instance corresponds to one worker and uses exactly one LeaseRenewer
// to manage lease renewal for that worker.
type Renewer interface {
Renew() error
GetHeldLeases() []Lease
}
// leaseHolder is the default implementation of Renewer that uses DynamoDB
// via LeaseManager
type leaseHolder struct {
sync.RWMutex
*Config
manager Manager
heldLeases map[string]*Lease
}
// Attempt to renew all currently held leases.
func (l *leaseHolder) Renew() error {
leases, err := l.manager.ListLeases()
if err != nil {
return err
}
// remove leases that deleted from the DynamoDB table.
var lostLeases []string
for key := range l.heldLeases {
exist := false
for _, lease := range leases {
if lease.Key == key {
exist = true
}
}
if !exist {
l.Lock()
delete(l.heldLeases, key)
l.Unlock()
lostLeases = append(lostLeases, key)
}
}
if n := len(lostLeases); n > 0 {
l.Logger.Debugf("Worker %s lost %d leases due deprecation: %s",
l.WorkerId,
n,
strings.Join(lostLeases, ", "))
}
// remove all the leases that stoled from this worker, or renew the leases
// that we still hold.
for _, lease := range leases {
if lease.Owner == l.WorkerId {
// if we took this lease and it's not holds by this renewer
l.Lock()
l.heldLeases[lease.Key] = lease
l.Unlock()
if err := l.manager.RenewLease(lease); err != nil {
l.Logger.Debugf("Worker %s could not renew lease with key %s", l.WorkerId, lease.Key)
}
} else {
if _, ok := l.heldLeases[lease.Key]; ok {
l.Logger.Debugf("Worker %s lost lease with key %s", l.WorkerId, lease.Key)
l.Lock()
delete(l.heldLeases, lease.Key)
l.Unlock()
}
}
}
// print the currently held leases belongs to this worker.
if keys := l.keys(); len(keys) > 0 {
l.Logger.Debugf("Worker %s hold leases: %s", l.WorkerId, strings.Join(keys, ", "))
}
return nil
}
// Returns currently held leases.
// A lease is currently held if we successfully renewed it on the last
// run of Renew()
// Lease objects returned are copies and their lease counters will not tick.
func (l *leaseHolder) GetHeldLeases() (leases []Lease) {
l.RLock()
defer l.RUnlock()
for _, lease := range l.heldLeases {
leases = append(leases, *lease)
}
return
}
// keys return all worker's leases
func (l *leaseHolder) keys() (keys []string) {
for k := range l.heldLeases {
keys = append(keys, k)
}
return keys
}