-
Notifications
You must be signed in to change notification settings - Fork 0
/
opwindow.go
174 lines (154 loc) · 3.91 KB
/
opwindow.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
package inflight
import (
"container/list"
"context"
"fmt"
"sync"
"time"
)
// OpWindow is a windowed, microbatching priority queue.
// Operations for the same ID and time window form a microbatch. Microbatches are dequeued in FIFO order.
// OpWindow provides backpressure for both depth (i.e., number of microbatches in queue) and width (i.e., number of operations in a microbatch).
// OpWindow is safe for concurrent use. Its zero value is not safe to use, use NewOpWindow().
type OpWindow struct {
mu sync.Mutex
q list.List // *queueItem
m map[ID]*queueItem
// These are selectable sync.Cond: use blocking read for Wait() and non-blocking write for Signal().
queueHasItems chan struct{}
queueHasSpace chan struct{}
once sync.Once
done chan struct{}
depth int
width int
windowedBy time.Duration
}
// NewOpWindow creates a new OpWindow.
//
// depth: maximum number of entries in a queue
// width: maximum number of entries in a microbatch.
// windowedBy: window size.
func NewOpWindow(depth, width int, windowedBy time.Duration) *OpWindow {
q := &OpWindow{
queueHasItems: make(chan struct{}),
queueHasSpace: make(chan struct{}),
done: make(chan struct{}),
depth: depth,
width: width,
windowedBy: windowedBy,
m: make(map[ID]*queueItem),
}
q.q.Init()
return q
}
// Close provides graceful shutdown: no new ops will be enqueued.
func (q *OpWindow) Close() {
q.once.Do(func() {
q.mu.Lock()
defer q.mu.Unlock()
close(q.done)
// HACK (2023-12) (mh): Set depth to zero so new entries are rejected.
q.depth = 0
})
}
// Enqueue op into queue, blocking until first of: op is enqueued, ID has hit max width, context is done, or queue is closed.
func (q *OpWindow) Enqueue(ctx context.Context, id ID, op *Op) error {
q.mu.Lock() // locked on returns below
for {
item, ok := q.m[id]
if ok {
if len(item.OpSet.set) >= q.width {
if !item.IsFullClosed {
close(item.IsFull)
item.IsFullClosed = true
}
q.mu.Unlock()
return ErrQueueSaturatedWidth
}
item.OpSet.append(op)
q.mu.Unlock()
return nil
}
if q.q.Len() >= q.depth {
q.mu.Unlock()
select {
case <-ctx.Done():
return fmt.Errorf("%w: %w", ErrQueueSaturatedDepth, ctx.Err())
case <-q.done:
return ErrQueueClosed
case <-q.queueHasSpace:
q.mu.Lock()
continue
}
}
item = &queueItem{
ID: id,
ProcessAt: time.Now().Add(q.windowedBy),
OpSet: newOpSet(op),
IsFull: make(chan struct{}),
}
q.m[id] = item
q.q.PushBack(item)
q.mu.Unlock()
select {
case q.queueHasItems <- struct{}{}:
default:
}
return nil
}
}
// Dequeue removes and returns the oldest OpSet whose window has passed from the queue,
// blocking until first of: OpSet is ready, context is canceled, or queue is closed.
func (q *OpWindow) Dequeue(ctx context.Context) (*OpSet, error) {
q.mu.Lock() // unlocked on returns below
var item *queueItem
for item == nil {
elem := q.q.Front()
if elem == nil {
q.mu.Unlock()
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-q.done:
return nil, ErrQueueClosed
case <-q.queueHasItems:
q.mu.Lock()
continue
}
}
item = q.q.Remove(elem).(*queueItem) // next caller will wait for a different item
}
waitFor := time.Until(item.ProcessAt)
if waitFor > 0 {
q.mu.Unlock() //
// NOTE (2023-12) (mh): Do we need to pool these?
timer := time.NewTimer(waitFor)
defer timer.Stop()
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-q.done:
// process right away
case <-item.IsFull:
// process once full, regardless of windowing
case <-timer.C:
}
q.mu.Lock()
}
ops := item.OpSet
delete(q.m, item.ID)
q.mu.Unlock()
item = nil // gc
select {
case q.queueHasSpace <- struct{}{}:
default:
}
return ops, nil
}
type queueItem struct {
ID ID
ProcessAt time.Time
OpSet *OpSet
IsFull chan struct{}
IsFullClosed bool
}