-
Notifications
You must be signed in to change notification settings - Fork 23
/
pbuf.go
236 lines (207 loc) · 6.06 KB
/
pbuf.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
package rbuf
// copyright (c) 2016, Jason E. Aten
// license: MIT
import "io"
// PointerRingBuf:
//
// a fixed-size circular ring buffer of interface{}
//
type PointerRingBuf struct {
A []interface{}
N int // MaxView, the total size of A, whether or not in use.
Beg int // start of in-use data in A
Readable int // number of pointers available in A (in use)
}
// constructor. NewPointerRingBuf will allocate internally
// a slice of size sliceN
func NewPointerRingBuf(sliceN int) *PointerRingBuf {
n := sliceN
r := &PointerRingBuf{
N: n,
Beg: 0,
Readable: 0,
}
r.A = make([]interface{}, n, n)
return r
}
// TwoContig returns all readable pointers, but in two separate slices,
// to avoid copying. The two slices are from the same buffer, but
// are not contiguous. Either or both may be empty slices.
func (b *PointerRingBuf) TwoContig() (first []interface{}, second []interface{}) {
extent := b.Beg + b.Readable
if extent <= b.N {
// we fit contiguously in this buffer without wrapping to the other.
// Let second stay an empty slice.
return b.A[b.Beg:(b.Beg + b.Readable)], second
}
return b.A[b.Beg:b.N], b.A[0:(extent % b.N)]
}
// ReadPtrs():
//
// from bytes.Buffer.Read(): Read reads the next len(p) interface{}
// pointers from the buffer or until the buffer is drained. The return
// value n is the number of bytes read. If the buffer has no data
// to return, err is io.EOF (unless len(p) is zero); otherwise it is nil.
func (b *PointerRingBuf) ReadPtrs(p []interface{}) (n int, err error) {
return b.readAndMaybeAdvance(p, true)
}
// ReadWithoutAdvance(): if you want to Read the data and leave
// it in the buffer, so as to peek ahead for example.
func (b *PointerRingBuf) ReadWithoutAdvance(p []interface{}) (n int, err error) {
return b.readAndMaybeAdvance(p, false)
}
func (b *PointerRingBuf) readAndMaybeAdvance(p []interface{}, doAdvance bool) (n int, err error) {
if len(p) == 0 {
return 0, nil
}
if b.Readable == 0 {
return 0, io.EOF
}
extent := b.Beg + b.Readable
if extent <= b.N {
n += copy(p, b.A[b.Beg:extent])
} else {
n += copy(p, b.A[b.Beg:b.N])
if n < len(p) {
n += copy(p[n:], b.A[0:(extent%b.N)])
}
}
if doAdvance {
b.Advance(n)
}
return
}
//
// WritePtrs writes len(p) interface{} values from p to
// the underlying data stream.
// It returns the number of bytes written from p (0 <= n <= len(p))
// and any error encountered that caused the write to stop early.
// Write must return a non-nil error if it returns n < len(p).
//
func (b *PointerRingBuf) WritePtrs(p []interface{}) (n int, err error) {
for {
if len(p) == 0 {
// nothing (left) to copy in; notice we shorten our
// local copy p (below) as we read from it.
return
}
writeCapacity := b.N - b.Readable
if writeCapacity <= 0 {
// we are all full up already.
return n, io.ErrShortWrite
}
if len(p) > writeCapacity {
err = io.ErrShortWrite
// leave err set and
// keep going, write what we can.
}
writeStart := (b.Beg + b.Readable) % b.N
upperLim := intMin(writeStart+writeCapacity, b.N)
k := copy(b.A[writeStart:upperLim], p)
n += k
b.Readable += k
p = p[k:]
// we can fill from b.A[0:something] from
// p's remainder, so loop
}
}
// Reset quickly forgets any data stored in the ring buffer. The
// data is still there, but the ring buffer will ignore it and
// overwrite those buffers as new data comes in.
func (b *PointerRingBuf) Reset() {
b.Beg = 0
b.Readable = 0
}
// Advance(): non-standard, but better than Next(),
// because we don't have to unwrap our buffer and pay the cpu time
// for the copy that unwrapping may need.
// Useful in conjuction/after ReadWithoutAdvance() above.
func (b *PointerRingBuf) Advance(n int) {
if n <= 0 {
return
}
if n > b.Readable {
n = b.Readable
}
b.Readable -= n
b.Beg = (b.Beg + n) % b.N
}
// Adopt(): non-standard.
//
// For efficiency's sake, (possibly) take ownership of
// already allocated slice offered in me.
//
// If me is large we will adopt it, and we will potentially then
// write to the me buffer.
// If we already have a bigger buffer, copy me into the existing
// buffer instead.
func (b *PointerRingBuf) Adopt(me []interface{}) {
n := len(me)
if n > b.N {
b.A = me
b.N = n
b.Beg = 0
b.Readable = n
} else {
// we already have a larger buffer, reuse it.
copy(b.A, me)
b.Beg = 0
b.Readable = n
}
}
// Push writes len(p) pointers from p to the ring.
// It returns the number of elements written from p (0 <= n <= len(p))
// and any error encountered that caused the write to stop early.
// Push must return a non-nil error if it returns n < len(p).
//
func (b *PointerRingBuf) Push(p []interface{}) (n int, err error) {
for {
if len(p) == 0 {
// nothing (left) to copy in; notice we shorten our
// local copy p (below) as we read from it.
return
}
writeCapacity := b.N - b.Readable
if writeCapacity <= 0 {
// we are all full up already.
return n, io.ErrShortWrite
}
if len(p) > writeCapacity {
err = io.ErrShortWrite
// leave err set and
// keep going, write what we can.
}
writeStart := (b.Beg + b.Readable) % b.N
upperLim := intMin(writeStart+writeCapacity, b.N)
k := copy(b.A[writeStart:upperLim], p)
n += k
b.Readable += k
p = p[k:]
// we can fill from b.A[0:something] from
// p's remainder, so loop
}
}
// PushAndMaybeOverwriteOldestData always consumes the full
// slice p, even if that means blowing away the oldest
// unread pointers in the ring to make room. In reality, only the last
// min(len(p),b.N) bytes of p will end up being written to the ring.
//
// This allows the ring to act as a record of the most recent
// b.N bytes of data -- a kind of temporal LRU cache, so the
// speak. The linux kernel's dmesg ring buffer is similar.
//
func (b *PointerRingBuf) PushAndMaybeOverwriteOldestData(p []interface{}) (n int, err error) {
writeCapacity := b.N - b.Readable
if len(p) > writeCapacity {
b.Advance(len(p) - writeCapacity)
}
startPos := 0
if len(p) > b.N {
startPos = len(p) - b.N
}
n, err = b.Push(p[startPos:])
if err != nil {
return n, err
}
return len(p), nil
}