-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
concurrent_map.go
133 lines (121 loc) · 2.7 KB
/
concurrent_map.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package cstream
import (
"context"
"sync/atomic"
)
// Map is concurrent map function with a concurrency limit.
type Map[I, O any] struct {
ctx context.Context
goroutine int // TODO: support dynamic goroutine limit (create Limiter instance)
input []I
iteratee func(item I, index int) O
in chan func() O
out chan O
result []O
done chan struct{}
quit chan struct{}
isRun atomic.Bool
}
// NewMap creates a new concurrent map with the given context, concurrency limit, input slice, and callback function.
func NewMap[I, O any](ctx context.Context, goroutine int, input []I, iteratee func(item I, index int) O) *Map[I, O] {
in := make(chan func() O)
out := make(chan O, len(input))
done := make(chan struct{})
quit := make(chan struct{})
m := &Map[I, O]{
ctx: ctx,
input: input,
iteratee: iteratee,
in: in,
out: out,
result: make([]O, 0, len(input)),
done: done,
quit: quit,
goroutine: goroutine,
}
m.run()
return m
}
func (m *Map[I, O]) run() {
if m.isRun.Load() {
return
}
m.isRun.Store(true)
// Start the input stream.
// After all the input is processed or got canceled, close the input channel to stop the stream.
go func() {
defer close(m.in)
for i, item := range m.input {
i, item := i, item
select {
case <-m.done:
return
case <-m.quit:
return
case m.in <- func() O {
return m.iteratee(item, i)
}:
}
}
}()
// Start the stream.
// Stream will stop when the input channel is closed.
go func() {
defer close(m.done)
defer close(m.out)
defer m.isRun.Store(false)
stream(m.ctx, m.goroutine, m.in, m.out)
}()
}
// Close stops the concurrent map.
// Will block until concurrent map is closed.
func (m *Map[I, O]) Close() {
if !m.isRun.Load() {
return
}
select {
case <-m.done:
case m.quit <- struct{}{}:
<-m.done
}
}
// IsDone returns true if the concurrent map is done or closed.
func (m *Map[I, O]) IsDone() bool {
select {
case <-m.done:
return true
default:
return false
}
}
// IsRunning returns true if the stream is running.
func (m *Map[I, O]) IsRunning() bool {
return m.isRun.Load()
}
// Wait blocks until map is done or the context is canceled.
func (m *Map[I, O]) Wait() error {
if !m.isRun.Load() {
return nil
}
select {
case <-m.done:
return nil
case <-m.ctx.Done():
return m.ctx.Err()
}
}
// Result returns the results of the parallel map.
// It will block until all the tasks are completed.
func (m *Map[I, O]) Result(ctx context.Context) ([]O, error) {
for {
select {
case o, ok := <-m.out:
if !ok {
return m.result, nil
}
m.result = append(m.result, o)
case <-ctx.Done():
return m.result, ctx.Err()
}
}
}