-
Notifications
You must be signed in to change notification settings - Fork 6
/
reader.go
137 lines (122 loc) · 3.2 KB
/
reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
// Copyright 2021 Cosmos Nicolaou. All rights reserved.
// Use of this source code is governed by the Apache-2.0
// license that can be found in the LICENSE file.
package pbzip2
import (
"context"
"io"
"sync"
)
type readerOpts struct {
decOpts []DecompressorOption
scanOpts []ScannerOption
}
// ReaderOption represents an option to NewReader.
type ReaderOption func(o *readerOpts)
// ScannerOptions passes a ScannerOption to the underlying scanner created by
// NewReader.
func ScannerOptions(opts ...ScannerOption) ReaderOption {
return func(o *readerOpts) {
o.scanOpts = append(o.scanOpts, opts...)
}
}
// DecompressionOptions passes a ScannerOption to the underlying decompressor
// created by NewReader.
func DecompressionOptions(opts ...DecompressorOption) ReaderOption {
return func(o *readerOpts) {
o.decOpts = append(o.decOpts, opts...)
}
}
type reader struct {
ctx context.Context
errCh chan error
wg *sync.WaitGroup
dc *Decompressor
}
// NewReader returns an io.Reader that uses a scanner and decompressor to decompress
// bzip2 data concurrently.
func NewReader(ctx context.Context, rd io.Reader, opts ...ReaderOption) io.Reader {
rdOpts := &readerOpts{}
for _, fn := range opts {
fn(rdOpts)
}
sc := NewScanner(rd, rdOpts.scanOpts...)
dc := NewDecompressor(ctx, rdOpts.decOpts...)
errCh := make(chan error, 1)
wg := new(sync.WaitGroup)
wg.Add(1)
go func() {
errCh <- decompress(ctx, sc, dc)
close(errCh)
wg.Done()
}()
return &reader{
ctx: ctx,
errCh: errCh,
dc: dc,
wg: wg,
}
}
// decompress guarantees that it Finish will have been called on the
// decompressor. Any non-nil error it returns should be returned by the
// final call to Read.
func decompress(ctx context.Context, sc *Scanner, dc *Decompressor) error {
if err := scan(ctx, sc, dc); err != nil {
dc.Cancel(err)
dc.Finish()
return err
}
return dc.Finish()
}
// scan runs the scanner against the input stream invoking the decompressor
// to add each block to the set to decompressed.
func scan(ctx context.Context, sc *Scanner, dc *Decompressor) error {
for sc.Scan(ctx) {
block := sc.Block()
if err := dc.Append(block); err != nil {
return err
}
}
return sc.Err()
}
// handleErrorOrCancel returns an error returned by the decompression goroutine
// above or if the context is canceled.
func (rd *reader) handleErrorOrCancel() error {
select {
case err := <-rd.errCh:
return err
case <-rd.ctx.Done():
return rd.ctx.Err()
default:
return nil
}
}
// Read implements io.Reader.
func (rd *reader) Read(buf []byte) (int, error) {
// test for any errors prior to calling Read which may block
// if we don't handle context cancelation here and in particular
// call Cancel on the decompressor.
if err := rd.handleErrorOrCancel(); err != nil {
rd.dc.Cancel(err)
rd.wg.Wait() // wait for internal goroutine to finish.
return 0, err
}
n, err := rd.dc.Read(buf)
if err == nil {
return n, nil
}
rd.wg.Wait() // wait for internal goroutine to finish.
// make sure to catch errors sent after the decompressor is done
// such as a CRC error.
select {
case cerr := <-rd.errCh:
if err != io.EOF {
return n, err
}
if cerr != nil {
return n, cerr
}
default:
}
return n, err
}