-
Notifications
You must be signed in to change notification settings - Fork 9
/
worker.go
61 lines (52 loc) · 1.12 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package csprcollector
import (
"log"
"time"
)
func NewWorker(id int, workerQueue chan chan CSPRequest, output Output) Worker {
return Worker{
ID: id,
Work: make(chan CSPRequest),
WorkerQueue: workerQueue,
Output: output,
}
}
type Worker struct {
ID int
Work chan CSPRequest
WorkerQueue chan chan CSPRequest
Output Output
}
func (w *Worker) Start() {
go func() {
w.requeue()
var buffer []CSPRequest
ticker := time.NewTicker(time.Second * time.Duration(5))
for {
select {
case work := <-w.Work:
log.Printf("Worker #%d: Received work request.", w.ID)
buffer = append(buffer, work)
if len(buffer) >= 50 {
log.Printf("Worker #%d: Buffer Flush.", w.ID)
w.Flush(buffer)
buffer = nil
}
w.requeue()
case <-ticker.C:
if len(buffer) > 0 {
log.Printf("Worker #%d: Ticked Flush.", w.ID)
w.Flush(buffer)
buffer = nil
}
}
}
}()
}
func (w *Worker) Flush(requests []CSPRequest) {
log.Printf("Flush %d entries.", len(requests))
w.Output.Write(requests)
}
func (w *Worker) requeue() {
w.WorkerQueue <- w.Work
}