-
Notifications
You must be signed in to change notification settings - Fork 93
/
rate.go
257 lines (218 loc) · 9 KB
/
rate.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
package throttled
import (
"context"
"fmt"
"time"
)
const (
// Maximum number of times to retry SetIfNotExists/CompareAndSwap operations
// before returning an error.
maxCASAttempts = 10
)
// A RateLimiterCtx manages limiting the rate of actions by key.
type RateLimiterCtx interface {
// RateLimitCtx checks whether a particular key has exceeded a rate
// limit. It also returns a RateLimitResult to provide additional
// information about the state of the RateLimiter.
//
// If the rate limit has not been exceeded, the underlying storage
// is updated by the supplied quantity. For example, a quantity of
// 1 might be used to rate limit a single request while a greater
// quantity could rate limit based on the size of a file upload in
// megabytes. If quantity is 0, no update is performed allowing
// you to "peek" at the state of the RateLimiter for a given key.
RateLimitCtx(ctx context.Context, key string, quantity int) (bool, RateLimitResult, error)
}
// RateLimitResult represents the state of the RateLimiter for a
// given key at the time of the query. This state can be used, for
// example, to communicate information to the client via HTTP
// headers. Negative values indicate that the attribute is not
// relevant to the implementation or state.
type RateLimitResult struct {
// Limit is the maximum number of requests that could be permitted
// instantaneously for this key starting from an empty state. For
// example, if a rate limiter allows 10 requests per second per
// key, Limit would always be 10.
Limit int
// Remaining is the maximum number of requests that could be
// permitted instantaneously for this key given the current
// state. For example, if a rate limiter allows 10 requests per
// second and has already received 6 requests for this key this
// second, Remaining would be 4.
Remaining int
// ResetAfter is the time until the RateLimiter returns to its
// initial state for a given key. For example, if a rate limiter
// manages requests per second and received one request 200ms ago,
// Reset would return 800ms. You can also think of this as the time
// until Limit and Remaining will be equal.
ResetAfter time.Duration
// RetryAfter is the time until the next request will be permitted.
// It should be -1 unless the rate limit has been exceeded.
RetryAfter time.Duration
}
type limitResult struct {
limited bool
}
func (r *limitResult) Limited() bool { return r.limited }
type rateLimitResult struct {
limitResult
limit, remaining int
reset, retryAfter time.Duration
}
func (r *rateLimitResult) Limit() int { return r.limit }
func (r *rateLimitResult) Remaining() int { return r.remaining }
func (r *rateLimitResult) Reset() time.Duration { return r.reset }
func (r *rateLimitResult) RetryAfter() time.Duration { return r.retryAfter }
// Rate describes a frequency of an activity such as the number of requests
// allowed per minute.
type Rate struct {
period time.Duration // Time between equally spaced requests at the rate
count int // Used internally for deprecated `RateLimit` interface only
}
// RateQuota describes the number of requests allowed per time period.
// MaxRate specified the maximum sustained rate of requests and must
// be greater than zero. MaxBurst defines the number of requests that
// will be allowed to exceed the rate in a single burst and must be
// greater than or equal to zero.
//
// Rate{PerSec(1), 0} would mean that after each request, no more
// requests will be permitted for that client for one second.
// Rate{PerSec(2), 0} permits one request per 0.5 seconds rather than
// two requests in one second. In practice, you probably want to set
// MaxBurst >0 to provide some flexibility to clients that only need
// to make a handful of requests. In fact a MaxBurst of zero will
// *never* permit a request with a quantity greater than one because
// it will immediately exceed the limit.
type RateQuota struct {
MaxRate Rate
MaxBurst int
}
// PerSec represents a number of requests per second.
func PerSec(n int) Rate { return Rate{time.Second / time.Duration(n), n} }
// PerMin represents a number of requests per minute.
func PerMin(n int) Rate { return Rate{time.Minute / time.Duration(n), n} }
// PerHour represents a number of requests per hour.
func PerHour(n int) Rate { return Rate{time.Hour / time.Duration(n), n} }
// PerDay represents a number of requests per day.
func PerDay(n int) Rate { return Rate{24 * time.Hour / time.Duration(n), n} }
// PerDuration represents a number of requests per provided duration.
func PerDuration(n int, d time.Duration) Rate { return Rate{d / time.Duration(n), n} }
// GCRARateLimiterCtx is a RateLimiter that uses the generic cell-rate
// algorithm. The algorithm has been slightly modified from its usual
// form to support limiting with an additional quantity parameter, such
// as for limiting the number of bytes uploaded.
type GCRARateLimiterCtx struct {
limit int
// Think of the DVT as our flexibility:
// How far can you deviate from the nominal equally spaced schedule?
// If you like leaky buckets, think about it as the size of your bucket.
delayVariationTolerance time.Duration
// Think of the emission interval as the time between events
// in the nominal equally spaced schedule. If you like leaky buckets,
// think of it as how frequently the bucket leaks one unit.
emissionInterval time.Duration
store GCRAStoreCtx
// Maximum number of times to retry SetIfNotExists/CompareAndSwap operations
// before returning an error.
maxCASAttemptsLimit int
}
// NewGCRARateLimiterCtx creates a GCRARateLimiterCtx. quota.Count defines
// the maximum number of requests permitted in an instantaneous burst
// and quota.Count / quota.Period defines the maximum sustained
// rate. For example, PerMin(60) permits 60 requests instantly per key
// followed by one request per second indefinitely whereas PerSec(1)
// only permits one request per second with no tolerance for bursts.
func NewGCRARateLimiterCtx(st GCRAStoreCtx, quota RateQuota) (*GCRARateLimiterCtx, error) {
if quota.MaxBurst < 0 {
return nil, fmt.Errorf("invalid RateQuota %#v; MaxBurst must be greater than zero", quota)
}
if quota.MaxRate.period <= 0 {
return nil, fmt.Errorf("invalid RateQuota %#v; MaxRate must be greater than zero", quota)
}
return &GCRARateLimiterCtx{
delayVariationTolerance: quota.MaxRate.period * (time.Duration(quota.MaxBurst) + 1),
emissionInterval: quota.MaxRate.period,
limit: quota.MaxBurst + 1,
store: st,
maxCASAttemptsLimit: maxCASAttempts,
}, nil
}
// SetMaxCASAttemptsLimit allows you to set the maxCASAttempts limit. This is set to 10
// be default.
func (g *GCRARateLimiterCtx) SetMaxCASAttemptsLimit(limit int) {
g.maxCASAttemptsLimit = limit
}
// RateLimitCtx checks whether a particular key has exceeded a rate
// limit. It also returns a RateLimitResult to provide additional
// information about the state of the RateLimiter.
//
// If the rate limit has not been exceeded, the underlying storage is
// updated by the supplied quantity. For example, a quantity of 1
// might be used to rate limit a single request while a greater
// quantity could rate limit based on the size of a file upload in
// megabytes. If quantity is 0, no update is performed allowing you
// to "peek" at the state of the RateLimiter for a given key.
func (g *GCRARateLimiterCtx) RateLimitCtx(ctx context.Context, key string, quantity int) (bool, RateLimitResult, error) {
var tat, newTat, now time.Time
var ttl time.Duration
rlc := RateLimitResult{Limit: g.limit, RetryAfter: -1}
limited := false
i := 0
for {
var err error
var tatVal int64
var updated bool
// tat refers to the theoretical arrival time that would be expected
// from equally spaced requests at exactly the rate limit.
tatVal, now, err = g.store.GetWithTime(ctx, key)
if err != nil {
return false, rlc, err
}
if tatVal == -1 {
tat = now
} else {
tat = time.Unix(0, tatVal)
}
increment := time.Duration(quantity) * g.emissionInterval
if now.After(tat) {
newTat = now.Add(increment)
} else {
newTat = tat.Add(increment)
}
// Block the request if the next permitted time is in the future
allowAt := newTat.Add(-(g.delayVariationTolerance))
if diff := now.Sub(allowAt); diff < 0 {
if increment <= g.delayVariationTolerance {
rlc.RetryAfter = -diff
ttl = tat.Sub(now)
}
limited = true
break
}
ttl = newTat.Sub(now)
if tatVal == -1 {
updated, err = g.store.SetIfNotExistsWithTTL(ctx, key, newTat.UnixNano(), ttl)
} else {
updated, err = g.store.CompareAndSwapWithTTL(ctx, key, tatVal, newTat.UnixNano(), ttl)
}
if err != nil {
return false, rlc, err
}
if updated {
break
}
i++
if i >= g.maxCASAttemptsLimit {
return false, rlc, fmt.Errorf(
"Failed to store updated rate limit data for key %s after %d attempts",
key, i,
)
}
}
next := g.delayVariationTolerance - ttl
if next > -g.emissionInterval {
rlc.Remaining = int(next / g.emissionInterval)
}
rlc.ResetAfter = ttl
return limited, rlc, nil
}