-
Notifications
You must be signed in to change notification settings - Fork 2
/
bulk_worker.go
112 lines (99 loc) · 2.27 KB
/
bulk_worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package dorisloader
import (
"context"
)
type bulkWorker struct {
p *BulkProcessor
i int
bulkActions int
bulkSize int
service *BulkService
flushC chan struct{}
flushAckC chan struct{}
}
// newBulkWorker creates a new bulkWorker instance.
func newBulkWorker(p *BulkProcessor, i int) *bulkWorker {
return &bulkWorker{
p: p,
i: i,
bulkActions: p.bulkActions,
bulkSize: p.bulkSize,
service: NewBulkService(p.c).DB(p.db).Table(p.table),
flushC: make(chan struct{}),
flushAckC: make(chan struct{}),
}
}
// work waits for bulk requests and manual flush calls on the respective
// channels and is invoked as a goroutine when the bulk processor is started.
func (w *bulkWorker) work(ctx context.Context) {
defer func() {
w.p.workerWg.Done()
close(w.flushAckC)
close(w.flushC)
}()
var stop bool
for !stop {
var err error
select {
case row, open := <-w.p.rows:
if open {
w.service.Add(row)
if w.commitRequired() {
err = w.commit(ctx)
}
} else {
// Channel closed: Stop.
stop = true
if w.service.NumberOfRows() > 0 {
err = w.commit(ctx)
}
}
case <-w.flushC:
// Commit outstanding requests
if w.service.NumberOfRows() > 0 {
err = w.commit(ctx)
}
w.flushAckC <- struct{}{}
}
if err != nil {
if !stop {
// TODO
}
}
}
}
// commit commits the bulk requests in the given service,
// invoking callbacks as specified.
func (w *bulkWorker) commit(ctx context.Context) error {
//var res *BulkResponse
// commitFunc will commit bulk requests and, on failure, be retried
// via exponential backoff
commitFunc := func() error {
var err error
// Save requests because they will be reset in service.Do
_, err = w.service.Do(ctx)
if err != nil {
return err
}
return nil
}
// notifyFunc will be called if retry fails
notifyFunc := func(err error) {
// TODO
}
// Commit bulk requests
err := RetryNotify(commitFunc, w.p.backoff, notifyFunc)
if err != nil {
// TODO
}
return err
}
func (w *bulkWorker) commitRequired() bool {
if w.bulkActions > 0 && w.service.NumberOfRows() >= w.bulkActions {
return true
}
if w.bulkSize > 0 && w.service.EstimatedSizeInBytes() >= int64(w.bulkSize) {
return true
}
return false
}