forked from kandoo/beehive
-
Notifications
You must be signed in to change notification settings - Fork 0
/
queen_test.go
120 lines (99 loc) · 2.13 KB
/
queen_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package beehive
import (
"fmt"
"strconv"
"testing"
)
func TestQueenMultipleKeys(t *testing.T) {
h := newHiveForTest()
ch := make(chan uint64)
mapf := func(msg Msg, ctx MapContext) MappedCells {
return MappedCells{{"D", msg.Data().(string)}, {"D", "K"}}
}
rcvf := func(msg Msg, ctx RcvContext) error {
ch <- ctx.ID()
return nil
}
a := h.NewApp("multikey")
a.HandleFunc("", mapf, rcvf)
l := 10
for i := 0; i < l; i++ {
h.Emit(fmt.Sprintf("test%d", i))
}
go h.Start()
defer h.Stop()
first := <-ch
for i := 0; i < l-1; i++ {
bee := <-ch
if bee != first {
t.Errorf("invalid bee receives the %d'th message: get=%d want=%d", i,
bee, first)
}
}
}
type qeeBenchHandler struct {
last string
done chan struct{}
}
func (h qeeBenchHandler) Map(msg Msg, ctx MapContext) MappedCells {
return MappedCells{{"d", msg.Data().(string)}}
}
func (h qeeBenchHandler) Rcv(msg Msg, ctx RcvContext) error {
if msg.Data().(string) == h.last {
close(h.done)
}
return nil
}
func doBenchmarkQueenBeeCreation(b *testing.B, hiveN int) {
b.StopTimer()
done := make(chan struct{})
handler := qeeBenchHandler{
last: strconv.Itoa(b.N - 1),
done: done,
}
var hives []Hive
for i := 0; i < hiveN; i++ {
var h Hive
if i == 0 {
h = newHiveForTest()
} else {
h = newHiveForTest(PeerAddrs(hives[0].Config().Addr))
}
a := h.NewApp("qeeBenchApp")
a.Handle("", handler)
go h.Start()
waitTilStareted(h)
defer h.Stop()
hives = append(hives, h)
}
msgs := make([]msgAndHandler, 0, b.N)
for i := 0; i < b.N; i++ {
msgs = append(msgs, msgAndHandler{
msg: &msg{MsgData: strconv.Itoa(i)},
handler: handler,
})
}
a, _ := hives[0].(*hive).app("qeeBenchApp")
qee := a.qee
batch := int(hives[0].Config().BatchSize)
b.StartTimer()
if b.N < batch {
return
}
for i := 0; i <= b.N/batch; i++ {
from := i * batch
to := from + batch
if b.N < to {
to = b.N
}
qee.handleMsgs(msgs[from:to])
}
b.StopTimer()
<-done
}
func BenchmarkQueenBeeCreationSingle(b *testing.B) {
doBenchmarkQueenBeeCreation(b, 1)
}
func BenchmarkQueenBeeCreationClustered(b *testing.B) {
doBenchmarkQueenBeeCreation(b, 3)
}