forked from influxdata/telegraf
-
Notifications
You must be signed in to change notification settings - Fork 0
/
nats_consumer_test.go
131 lines (108 loc) · 3.09 KB
/
nats_consumer_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package natsconsumer
import (
"testing"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil"
"github.com/nats-io/nats"
"github.com/stretchr/testify/assert"
)
const (
testMsg = "cpu_load_short,host=server01 value=23422.0 1422568543702900257\n"
testMsgGraphite = "cpu.load.short.graphite 23422 1454780029"
testMsgJSON = "{\"a\": 5, \"b\": {\"c\": 6}}\n"
invalidMsg = "cpu_load_short,host=server01 1422568543702900257\n"
metricBuffer = 5
)
func newTestNatsConsumer() (*natsConsumer, chan *nats.Msg) {
in := make(chan *nats.Msg, metricBuffer)
n := &natsConsumer{
QueueGroup: "test",
Subjects: []string{"telegraf"},
Servers: []string{"nats://localhost:4222"},
Secure: false,
in: in,
errs: make(chan error, metricBuffer),
done: make(chan struct{}),
}
return n, in
}
// Test that the parser parses NATS messages into metrics
func TestRunParser(t *testing.T) {
n, in := newTestNatsConsumer()
acc := testutil.Accumulator{}
n.acc = &acc
defer close(n.done)
n.parser, _ = parsers.NewInfluxParser()
n.wg.Add(1)
go n.receiver()
in <- natsMsg(testMsg)
acc.Wait(1)
}
// Test that the parser ignores invalid messages
func TestRunParserInvalidMsg(t *testing.T) {
n, in := newTestNatsConsumer()
acc := testutil.Accumulator{}
n.acc = &acc
defer close(n.done)
n.parser, _ = parsers.NewInfluxParser()
n.wg.Add(1)
go n.receiver()
in <- natsMsg(invalidMsg)
acc.WaitError(1)
assert.Contains(t, acc.Errors[0].Error(), "E! subject: telegraf, error: metric parse error")
assert.EqualValues(t, 0, acc.NMetrics())
}
// Test that the parser parses line format messages into metrics
func TestRunParserAndGather(t *testing.T) {
n, in := newTestNatsConsumer()
acc := testutil.Accumulator{}
n.acc = &acc
defer close(n.done)
n.parser, _ = parsers.NewInfluxParser()
n.wg.Add(1)
go n.receiver()
in <- natsMsg(testMsg)
n.Gather(&acc)
acc.Wait(1)
acc.AssertContainsFields(t, "cpu_load_short",
map[string]interface{}{"value": float64(23422)})
}
// Test that the parser parses graphite format messages into metrics
func TestRunParserAndGatherGraphite(t *testing.T) {
n, in := newTestNatsConsumer()
acc := testutil.Accumulator{}
n.acc = &acc
defer close(n.done)
n.parser, _ = parsers.NewGraphiteParser("_", []string{}, nil)
n.wg.Add(1)
go n.receiver()
in <- natsMsg(testMsgGraphite)
n.Gather(&acc)
acc.Wait(1)
acc.AssertContainsFields(t, "cpu_load_short_graphite",
map[string]interface{}{"value": float64(23422)})
}
// Test that the parser parses json format messages into metrics
func TestRunParserAndGatherJSON(t *testing.T) {
n, in := newTestNatsConsumer()
acc := testutil.Accumulator{}
n.acc = &acc
defer close(n.done)
n.parser, _ = parsers.NewJSONParser("nats_json_test", []string{}, nil)
n.wg.Add(1)
go n.receiver()
in <- natsMsg(testMsgJSON)
n.Gather(&acc)
acc.Wait(1)
acc.AssertContainsFields(t, "nats_json_test",
map[string]interface{}{
"a": float64(5),
"b_c": float64(6),
})
}
func natsMsg(val string) *nats.Msg {
return &nats.Msg{
Subject: "telegraf",
Data: []byte(val),
}
}