forked from tsenart/tb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
bucket.go
130 lines (115 loc) · 3.21 KB
/
bucket.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package tb
import (
"math"
"sync/atomic"
"time"
)
// Bucket defines a generic lock-free implementation of a Token Bucket.
type Bucket struct {
inc int64
tokens int64
capacity int64
freq time.Duration
closing chan struct{}
}
// NewBucket returns a full Bucket with c capacity and starts a filling
// go-routine which ticks every freq. The number of tokens added on each tick
// is computed dynamically to be even across the duration of a second.
//
// If freq == -1 then the filling go-routine won't be started. Otherwise,
// If freq < 1/c seconds, then it will be adjusted to 1/c seconds.
func NewBucket(c int64, freq time.Duration) *Bucket {
b := &Bucket{tokens: c, capacity: c, closing: make(chan struct{})}
if freq == -1 {
return b
} else if evenFreq := time.Duration(1e9 / c); freq < evenFreq {
freq = evenFreq
}
b.freq = freq
b.inc = int64(math.Floor(.5 + (float64(c) * freq.Seconds())))
go b.fill()
return b
}
// Take attempts to take n tokens out of the bucket.
// If tokens == 0, nothing will be taken.
// If n <= tokens, n tokens will be taken.
// If n > tokens, all tokens will be taken.
//
// This method is thread-safe.
func (b *Bucket) Take(n int64) (taken int64) {
for {
if tokens := atomic.LoadInt64(&b.tokens); tokens == 0 {
return 0
} else if n <= tokens {
if !atomic.CompareAndSwapInt64(&b.tokens, tokens, tokens-n) {
continue
}
return n
} else if atomic.CompareAndSwapInt64(&b.tokens, tokens, 0) { // Spill
return tokens
}
}
}
// Put attempts to add n tokens to the bucket.
// If tokens == capacity, nothing will be added.
// If n <= capacity - tokens, n tokens will be added.
// If n > capacity - tokens, capacity - tokens will be added.
//
// This method is thread-safe.
func (b *Bucket) Put(n int64) (added int64) {
for {
if tokens := atomic.LoadInt64(&b.tokens); tokens == b.capacity {
return 0
} else if left := b.capacity - tokens; n <= left {
if !atomic.CompareAndSwapInt64(&b.tokens, tokens, tokens+n) {
continue
}
return n
} else if atomic.CompareAndSwapInt64(&b.tokens, tokens, b.capacity) {
return left
}
}
}
// Wait waits for n amount of tokens to be available.
// If n tokens are immediatelly available it doesn't sleep.
// Otherwise, it sleeps the minimum amount of time required for the remaining
// tokens to be available. It returns the wait duration.
//
// This method is thread-safe.
func (b *Bucket) Wait(n int64) time.Duration {
var rem int64
if rem = n - b.Take(n); rem == 0 {
return 0
}
var wait time.Duration
for rem > 0 {
sleep := b.wait(rem)
wait += sleep
time.Sleep(sleep)
rem -= b.Take(rem)
}
return wait
}
// Close stops the filling go-routine given it was started.
func (b *Bucket) Close() error {
close(b.closing)
return nil
}
// wait returns the minimum amount of time required for n tokens to be available.
// if n > capacity, n will be adjusted to capacity
func (b *Bucket) wait(n int64) time.Duration {
return time.Duration(int64(math.Ceil(math.Min(float64(n), float64(b.capacity))/float64(b.inc))) *
b.freq.Nanoseconds())
}
func (b *Bucket) fill() {
ticker := time.NewTicker(b.freq)
defer ticker.Stop()
for _ = range ticker.C {
select {
case <-b.closing:
return
default:
b.Put(b.inc)
}
}
}