-
Notifications
You must be signed in to change notification settings - Fork 0
/
bsonex.go
118 lines (106 loc) · 1.83 KB
/
bsonex.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package bsonex
import (
"bufio"
"io"
"sync"
)
type BSONEX struct {
BSON
offset int64
runnerID int
}
func (b *BSONEX) Offset() int64 {
return b.offset
}
func (b *BSONEX) RunnerID() int {
return b.runnerID
}
func (b *BSONEX) Size() int {
return len(b.BSON)
}
func (b BSONEX) String() string {
return string(b.MustToJson())
}
func NewDecoder(r io.Reader) *Decoder {
return &Decoder{bufio.NewReaderSize(r, 4<<20)}
}
type Decoder struct {
r io.Reader
}
func (d *Decoder) ForEach(f func(b BSONEX) error) (err error) {
var offset int64
for {
one, err := d.ReadOne()
if err != nil {
if err == io.EOF {
break
}
return err
}
err = f(BSONEX{BSON: one, offset: offset})
offset += int64(len(one))
if err != nil {
return err
}
}
return
}
func (d *Decoder) Do(parallel int, f func(b BSONEX) error) (err error) {
if parallel <= 1 {
return d.ForEach(f)
}
ch := make(chan []*BSONEX, parallel*2)
errCh := make(chan error, parallel)
var wg sync.WaitGroup
wg.Add(parallel)
defer wg.Wait()
for i := 0; i < parallel; i++ {
go func(id int) {
defer wg.Done()
for bs := range ch {
for _, b := range bs {
b.runnerID = id
err := f(*b)
if err != nil {
errCh <- err
return
}
}
}
}(i)
}
defer close(ch)
var bs []*BSONEX
var offset int64
for {
one, err := d.ReadOne()
if err != nil {
if err == io.EOF {
break
}
return err
}
bs = append(bs, &BSONEX{BSON: one, offset: offset})
offset += int64(len(one))
if len(bs) == 100 {
select {
case ch <- bs:
bs = nil
case err = <-errCh:
return err
}
}
}
ch <- bs
return
}
func (d *Decoder) Decode(v interface{}) (err error) {
one, err := d.ReadOne()
if err != nil {
return
}
return Unmarshal(one, v)
}
func (d *Decoder) ReadOne() (one []byte, err error) {
return ReadOne(d.r)
}