-
Notifications
You must be signed in to change notification settings - Fork 4
/
interface.go
184 lines (169 loc) · 5.77 KB
/
interface.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
package lease
import (
"errors"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/dynamodb"
)
// AttributeType used to explicitly set the DynamoDB data type
// when setting an extra field on a lease object.
type AttributeType int
const (
// StringSet is a string set data type
StringSet AttributeType = iota
// NumberSet is a number set data type
NumberSet
// BinarySet is a binary set data type
BinarySet
)
var (
// ErrTokenNotMatch and ErrLeaseNotHeld could be return only on the Update() call.
//
// If the concurrency token of the passed-in lease doesn't match the
// concurrency token of the authoritative lease, it means the lease was
// lost and regained between when the caller acquired his concurrency
// token and when the caller called update.
ErrTokenNotMatch = errors.New("leaser: concurrency token doesn't match the authoritative lease")
// ErrLeaseNotHeld error will be returns only if the passed-in lease object
// does not held be this worker.
ErrLeaseNotHeld = errors.New("leaser: worker does not hold the passed-in lease object")
// ErrValueNotMatch error will be returns only if you tring to set an extra field on
// a lease object using the SetAs method and the field value does not match the field
// type.
// for example: StringSet type excepts only []string{...}
ErrValueNotMatch = errors.New("leaser: field value does not match the field type")
)
// Lease type contains data pertianing to a Lease.
// Distributed systems may use leases to partition work across a fleet of workers.
// Each unit of work/task identified by a leaseKey and has a corresponding Lease.
// Every worker will contend for all leases - only one worker will successfully take each one.
// The worker should hold the lease until it is ready to stop processing the corresponding unit of work,
// or until it fails.
// When the worker stops holding the lease, another worker will take and hold the lease.
type Lease struct {
Key string `dynamodbav:"leaseKey"`
Owner string `dynamodbav:"leaseOwner"`
Counter int `dynamodbav:"leaseCounter"`
// lastRenewal is used by LeaseTaker to track the last time a lease counter was incremented.
// It is deliberately not persisted in DynamoDB.
lastRenewal time.Time
// concurrencyToken is used to prevent updates to leases that we have lost and re-acquired.
// It is deliberately not persisted in DynamoDB.
concurrencyToken string
// extrafields holds all the fields that not belong to this package.
extrafields map[string]interface{}
// explicitfields holds all the fields that set using SetAs method
explicitfields map[string]*dynamodb.AttributeValue
// removed attributes; used to create the update expression.
removedfields []string
}
// NewLease gets a key(represents the lease key/name) and returns a new Lease object.
func NewLease(key string) Lease {
return Lease{Key: key}
}
// Set extra field(metadata) to the Lease object before you create or update it
// using the Leaser.
//
// Use this method to add meta-data on the lease. for example:
//
// lease.Set("success", true)
// lease.Set("checkpoint", 35465786912)
func (l *Lease) Set(key string, val interface{}) {
if l.extrafields == nil {
l.extrafields = make(map[string]interface{})
}
l.extrafields[key] = val
// make sure that this key does not exists in the explicit fields map
delete(l.explicitfields, key)
}
// SetAs is like the Set method, but with another argument "typ" that explicitly
// sets the DynamoDB data type.
//
// For example:
//
// Set("key", []string{"foo", "bar"}) // add this field as a list
// SetAs("key", []string{"foo", "bar"}, StringSet) // add this field as a string set
//
// Error will be returns only if the field value does not match the field type.
func (l *Lease) SetAs(key string, val interface{}, typ AttributeType) error {
if l.explicitfields == nil {
l.explicitfields = make(map[string]*dynamodb.AttributeValue)
}
ok := false
switch typ {
case StringSet, NumberSet:
var ss []string
if ss, ok = val.([]string); ok {
v := &dynamodb.AttributeValue{
SS: aws.StringSlice(ss),
}
if typ == NumberSet {
v = &dynamodb.AttributeValue{
NS: aws.StringSlice(ss),
}
}
l.explicitfields[key] = v
}
case BinarySet:
var bs [][]byte
if bs, ok = val.([][]byte); ok {
l.explicitfields[key] = &dynamodb.AttributeValue{
BS: bs,
}
}
}
if !ok {
return ErrValueNotMatch
}
// make sure that this key does not exists in the extra fields map
delete(l.extrafields, key)
return nil
}
// Get extra field(metadata) from the Lease object that not belongs to this package.
func (l *Lease) Get(key string) (interface{}, bool) {
if val, ok := l.extrafields[key]; ok {
return val, ok
}
if val, ok := l.explicitfields[key]; ok {
var ret interface{}
if val.NS != nil {
ret = aws.StringValueSlice(val.NS)
} else if val.SS != nil {
ret = aws.StringValueSlice(val.SS)
} else {
ret = val.BS
}
return ret, ok
}
return nil, false
}
// Del deletes extra field(metadata) of the lease object.
func (l *Lease) Del(key string) {
var ok bool
if _, ok = l.extrafields[key]; ok {
delete(l.extrafields, key)
} else if _, ok = l.explicitfields[key]; ok {
delete(l.explicitfields, key)
}
if ok {
l.removedfields = append(l.removedfields, key)
}
}
// isExpired test if the lease renewal is expired from the given time.
func (l *Lease) isExpired(t time.Duration) bool {
return time.Since(l.lastRenewal) > t
}
// hasNoOwner return true if the current owner is null.
func (l *Lease) hasNoOwner() bool {
return l.Owner == "NULL" || l.Owner == ""
}
// Leaser is the interface that wraps the Coordinator methods.
type Leaser interface {
Stop()
Start() error
Delete(Lease) error
Create(Lease) (Lease, error)
Update(Lease) (Lease, error)
ForceUpdate(Lease) (Lease, error)
GetHeldLeases() []Lease
}