-
Notifications
You must be signed in to change notification settings - Fork 1
/
async_executor.go
150 lines (123 loc) · 3.17 KB
/
async_executor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
package asyncexecutor
import (
"sync"
"github.com/pkg/errors"
)
type executor struct {
wg *sync.WaitGroup
globalJobQueue chan *Job
GlobalResponseQueue chan *ResponseObject
workerCounter int
jobCounter int
workers []*Worker
}
type Executor interface {
StartExecutor(int)
AddGlobalJob(Job)
Stop()
WaitAndStop()
waitAllWorkers(chan Job)
}
func NewExecutor(queueSize int) *executor {
return &executor{
&sync.WaitGroup{},
make(chan *Job, queueSize),
make(chan *ResponseObject, queueSize),
0, 0,
[]*Worker{},
}
}
func (exec *executor) StartExecutor(numWorkers int) {
for i := 0; i < numWorkers; i++ {
worker := NewWorker(exec.workerCounter, exec.globalJobQueue, exec.GlobalResponseQueue)
exec.workers = append(exec.workers, worker)
exec.wg.Add(1)
exec.workerCounter++
go worker.start(exec.wg)
}
return
}
func (exec *executor) CreateJob(function callableType, parameters []interface{}) (*Job, error) {
newJobID := exec.jobCounter
newJob, err := NewJob(
newJobID,
function,
¶meterObject{parameters},
)
if err != nil {
return nil, errors.Wrap(err, "Job creation failed.")
}
exec.addGlobalJob(newJob)
exec.jobCounter++
return newJob, nil
}
func (exec *executor) CreateTaskJob(function callableType, parameters []interface{}) (*Job, error) {
newJobID := exec.jobCounter
newJob, err := NewJob(
newJobID,
function,
¶meterObject{parameters},
)
if err != nil {
return nil, errors.Wrap(err, "Job creation failed.")
}
exec.jobCounter++
return newJob, nil
}
func (exec *executor) CreateTask(taskJobList []*Job) *Job {
sizeOfTask := len(taskJobList)
// Iterate over the started workers to search for the most free worker
// or the first one that has enough space for all the jobs from the task.
bestWorker := exec.workers[0]
for _, worker := range exec.workers {
currWorkerAvailablity := cap(worker.responseQueue) - len(worker.responseQueue)
if currWorkerAvailablity >= sizeOfTask {
bestWorker = worker
break
}
if currWorkerAvailablity > cap(bestWorker.responseQueue)-len(bestWorker.responseQueue) {
bestWorker = worker
}
}
for _, job := range taskJobList {
bestWorker.jobQueue <- job
}
return taskJobList[len(taskJobList)-1]
}
func (exec *executor) SetResponseHandler(handler ResponseHandler) {
for _, worker := range exec.workers {
worker.SetResponseHandler(handler)
}
}
func (exec *executor) SetStatusResponseHandler(enabled bool) {
for _, worker := range exec.workers {
worker.responseHandlerEnabled = enabled
}
}
func (exec *executor) addGlobalJob(job *Job) {
exec.globalJobQueue <- job
}
func (exec *executor) Stop() {
exec.waitAllWorkers()
return
}
func (exec *executor) WaitAndStop() int {
jobCounter := 0
for {
select {
case <-exec.GlobalResponseQueue:
jobCounter++
// Maybe save the responses somewhere
default:
exec.Stop()
return jobCounter
}
}
}
func (exec *executor) waitAllWorkers() {
close(exec.globalJobQueue)
for _, worker := range exec.workers {
worker.stop()
}
exec.wg.Wait()
}