-
Notifications
You must be signed in to change notification settings - Fork 6
/
parallel.go
434 lines (396 loc) · 11.8 KB
/
parallel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
// Copyright 2020 Cosmos Nicolaou. All rights reserved.
// Use of this source code is governed by the Apache-2.0
// license that can be found in the LICENSE file.
package pbzip2
import (
"container/heap"
"context"
"fmt"
"io"
"log"
"runtime"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/cosnicolaou/pbzip2/internal/bitstream"
"github.com/cosnicolaou/pbzip2/internal/bzip2"
)
var numDecompressionGoRoutines int64
func updateStreamCRC(streamCRC, blockCRC uint32) uint32 {
return (streamCRC<<1 | streamCRC>>31) ^ blockCRC
}
type decompressorOpts struct {
verbose bool
concurrency int
progressCh chan<- Progress
pool chan struct{}
}
type DecompressorOption func(*decompressorOpts)
// BZVerbose controls verbose logging for decompression,
func BZVerbose(v bool) DecompressorOption {
return func(o *decompressorOpts) {
o.verbose = v
}
}
// BZConcurrency sets the degree of concurrency to use, that is,
// the number of threads used for decompression.
func BZConcurrency(n int) DecompressorOption {
return func(o *decompressorOpts) {
o.concurrency = n
}
}
// BZConcurrencyPool will add a thread safe pool to control concurrency.
// This can be used to limit the total number of active goroutines decompressing concurrently.
// Use CreateConcurrencyPool to create a pool of a certain size that can be shared across several decompressors.
// If not set, no limit will apply.
func BZConcurrencyPool(pool chan struct{}) DecompressorOption {
return func(o *decompressorOpts) {
o.pool = pool
}
}
// CreateConcurrencyPool will create a pool that can be shared among several decompressor
// that will limit the total number of concurrently running decompressors.
// Each decompressor will still only use the number of concurrent decompressors set in BZConcurrency.
// Specifying <= 0 will use runtime.GOMAXPROCS to set a value.
// Caller should not perform any operations on the returned channel.
func CreateConcurrencyPool(maxConcurrent int) chan struct{} {
if maxConcurrent <= 0 {
maxConcurrent = runtime.GOMAXPROCS(0)
}
ch := make(chan struct{}, maxConcurrent)
for i := 0; i < maxConcurrent; i++ {
ch <- struct{}{}
}
return ch
}
// BZSendUpdates sets the channel for sending progress updates over.
func BZSendUpdates(ch chan<- Progress) DecompressorOption {
return func(o *decompressorOpts) {
o.progressCh = ch
}
}
// Decompressor represents a concurrent decompressor for pbzip streams. The
// decompressor is designed to work in conjunction with Scanner and its
// Decompress method must be called with the values returned by the scanner's
// Block method. Each block is then decompressed in parallel and reassembled
// in the original order.
type Decompressor struct {
order uint64 // Must be the first field in a struct to ensure word alignment.
ctx context.Context
workWg sync.WaitGroup
doneWg sync.WaitGroup
workCh chan *blockDesc
doneCh chan *blockDesc
progressCh chan<- Progress
prd *io.PipeReader
pwr *io.PipeWriter
heap *blockHeap
streamCRC uint32
verbose bool
}
// Progress is used to report the progress of decompression. Each report pertains
// to a correctly ordered decompression event.
type Progress struct {
Duration time.Duration
Block uint64
CRC uint32
Compressed, Size int
}
// NewDecompressor creates a new parallel decompressor.
func NewDecompressor(ctx context.Context, opts ...DecompressorOption) *Decompressor {
o := decompressorOpts{
concurrency: runtime.GOMAXPROCS(-1),
}
for _, fn := range opts {
fn(&o)
}
dc := &Decompressor{
ctx: ctx,
doneCh: make(chan *blockDesc, o.concurrency),
workCh: make(chan *blockDesc, o.concurrency),
progressCh: o.progressCh,
heap: &blockHeap{},
}
dc.prd, dc.pwr = io.Pipe()
heap.Init(dc.heap)
dc.workWg.Add(o.concurrency)
dc.doneWg.Add(1)
for i := 0; i < o.concurrency; i++ {
go func() {
atomic.AddInt64(&numDecompressionGoRoutines, 1)
dc.worker(ctx, dc.workCh, dc.doneCh, o.pool)
atomic.AddInt64(&numDecompressionGoRoutines, -1)
dc.workWg.Done()
}()
}
go func() {
atomic.AddInt64(&numDecompressionGoRoutines, 1)
dc.assemble(ctx, dc.doneCh)
atomic.AddInt64(&numDecompressionGoRoutines, -1)
dc.doneWg.Done()
}()
return dc
}
type blockDesc struct {
CompressedBlock
order uint64
err error
uncompressed []byte
duration time.Duration
}
func (b *blockDesc) String() string {
if b == nil {
return "<nil>"
}
out := &strings.Builder{}
fmt.Fprintf(out, "order: %v: %v", b.order, b.CompressedBlock)
return out.String()
}
func (dc *Decompressor) trace(format string, args ...interface{}) {
if dc.verbose {
log.Printf(format, args...)
}
}
func (b *blockDesc) decompress() {
start := time.Now()
rd := bzip2.NewBlockReader(b.StreamBlockSize, b.Data, uint(b.BitOffset)) //#nosec G115 -- This is a false positive, b.BitOffset is always < 32.
b.uncompressed, b.err = io.ReadAll(rd)
b.duration = time.Since(start)
}
func (dc *Decompressor) worker(ctx context.Context, in <-chan *blockDesc, out chan<- *blockDesc, pool chan struct{}) {
for {
select {
// Always wait for a block or for the channel to be closed.
case block := <-in:
if block == nil {
return
}
if pool != nil {
// Wait for a token from the pool.
select {
case <-pool:
case <-ctx.Done():
return
}
}
dc.trace("decompressing: %s", block)
block.decompress()
dc.trace("decompressed: %s (%v), ch %v/%v", block, block.err, len(out), cap(out))
if pool != nil {
pool <- struct{}{}
}
select {
case out <- block:
case <-ctx.Done():
}
case <-ctx.Done():
return
}
}
}
// Append adds the supplied bzip2 block to the set to be decompressed in parallel
// with the results of that decompression being appended to the previously
// appended blocks.
func (dc *Decompressor) Append(cb CompressedBlock) error {
order := atomic.AddUint64(&dc.order, 1)
select {
case dc.workCh <- &blockDesc{
order: order,
CompressedBlock: cb,
}:
case <-dc.ctx.Done():
return dc.ctx.Err()
}
return nil
}
// Cancel can be called to unblock any readers that are reading from
// this decompressor and/or the Finish method.
func (dc *Decompressor) Cancel(err error) {
dc.pwr.CloseWithError(err)
}
// Finish must be called to wait for all of the currently outstanding
// decompression processes to finish and their output to be reassembled.
// It should be called exactly once.
func (dc *Decompressor) Finish() error {
var err error
select {
case <-dc.ctx.Done():
err = dc.ctx.Err()
default:
}
// NOTE, that the the assemble method must read all of the output
// produced by the workers, even in the event of an error. Otherwise
// a deadlock will occur with the workers trying to write blocks to
// the channel that the assemble method is no longer reading from.
close(dc.workCh)
dc.workWg.Wait()
close(dc.doneCh)
dc.doneWg.Wait()
return err
}
type blockHeap []*blockDesc
func (h blockHeap) Len() int { return len(h) }
func (h blockHeap) Less(i, j int) bool { return h[i].order < h[j].order }
func (h blockHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *blockHeap) Push(x interface{}) {
// Push and Pop use pointer receivers because they modify the slice's length,
// not just its contents.
*h = append(*h, x.(*blockDesc))
}
func (h *blockHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}
// tryMergeBlocks attempts to merge two consecutive blocks in the hope that
// they were split because of a false positive detection of the block magic
// byte sequence in the payload of a block. This may happen when processing
// very large amounts of data (eg. PB) the probability is essentially
// that of a specific 6 byte sequence occurring randomly.
// Merging two blocks like this means that it would take two false positives
// within the /same/ block to defeat the code here, which given that blocks
// are relatively small is even less likely to happen.
func (dc *Decompressor) tryMergeBlocks(ctx context.Context, ch <-chan *blockDesc, min *blockDesc) bool {
// wait for the second consecutive block.
for {
// wait for a new block if there none currently in the heap.
for len(*dc.heap) < 1 {
select {
case block, ok := <-ch:
if !ok {
// channel has been closed.
return false
}
heap.Push(dc.heap, block)
case <-ctx.Done():
err := ctx.Err()
dc.trace("tryMergeBlocks: %v", err)
dc.pwr.CloseWithError(err)
return false
}
}
if (*dc.heap)[0].order == min.order+1 {
// successfully found the next block that can be merged
// with the current one.
break
}
// check to see if the channel has been closed, failing to do
// so can lead to hangs since the next block may not exist in
// a corrupted input file.
block, ok := <-ch
if !ok {
// channel has been closed.
return false
} else {
heap.Push(dc.heap, block)
}
}
next := (*dc.heap)[0]
bwr := &bitstream.BitWriter{}
// Note that the first block has an offset in the first byte and a size in
// bits and hence need the sum of those to accurately reflect the size of
// the first block in terms of appending to it.
bwr.Init(min.Data, min.SizeInBits+min.BitOffset, len(min.Data)+len(next.Data)+len(blockMagic)+1)
bwr.Append(blockMagic[:], 0, len(blockMagic)*8)
bwr.Append(next.Data, next.BitOffset, next.SizeInBits)
min.Data, min.SizeInBits = bwr.Data()
min.decompress()
if min.err != nil {
return false
}
// The merge succeeded, remove the block that was merged from the heap.
heap.Remove(dc.heap, 0)
return true
}
func (dc *Decompressor) handlePossibleEOS(min *blockDesc) error {
dc.streamCRC = updateStreamCRC(dc.streamCRC, min.CRC)
if min.EOS {
if got, want := dc.streamCRC, min.StreamCRC; got != want {
return fmt.Errorf("mismatched stream CRCs: calculated=0x%08x != stored=0x%08x", got, want)
}
dc.streamCRC = 0
}
return nil
}
// The assemble method must return after the worker (i.e. writer to ch) has
// completed. In the case of a decompression error, assemble drain that channel
// to prevent a deadlock.
func (dc *Decompressor) waitForChannelToClose(ctx context.Context, ch <-chan *blockDesc) {
for {
select {
case <-ctx.Done():
return
case _, ok := <-ch:
if !ok {
return
}
}
}
}
func (dc *Decompressor) assemble(ctx context.Context, ch <-chan *blockDesc) {
expected := uint64(1)
for {
dc.trace("assemble select")
select {
case block := <-ch:
dc.trace("assemble: %v", block)
if block != nil {
heap.Push(dc.heap, block)
}
for len(*dc.heap) > 0 {
min := (*dc.heap)[0]
if min.order != expected {
break
}
heap.Remove(dc.heap, 0)
expected++
if err := min.err; err != nil {
if !dc.tryMergeBlocks(ctx, ch, min) {
dc.pwr.CloseWithError(err)
dc.waitForChannelToClose(ctx, ch)
return
}
// merge was successful, so bump up the next
// expected block number.
expected++
}
if _, err := dc.pwr.Write(min.uncompressed); err != nil {
dc.pwr.CloseWithError(err)
dc.waitForChannelToClose(ctx, ch)
return
}
if err := dc.handlePossibleEOS(min); err != nil {
dc.pwr.CloseWithError(err)
dc.waitForChannelToClose(ctx, ch)
return
}
if dc.progressCh != nil && ctx.Err() == nil {
dc.progressCh <- Progress{
Duration: min.duration,
Block: min.order,
CRC: min.CRC,
Compressed: len(min.Data),
Size: len(min.uncompressed),
}
}
}
if block == nil && len(*dc.heap) == 0 {
dc.pwr.Close()
dc.waitForChannelToClose(ctx, ch)
return
}
case <-ctx.Done():
err := ctx.Err()
dc.trace("assemble: %v", err)
dc.pwr.CloseWithError(err)
return
}
}
}
// Read implements io.Reader on the decompressed stream.
func (dc *Decompressor) Read(buf []byte) (int, error) {
return dc.prd.Read(buf)
}