forked from influxdata/telegraf
-
Notifications
You must be signed in to change notification settings - Fork 0
/
kafka_consumer.go
306 lines (259 loc) · 7.61 KB
/
kafka_consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
package kafka_consumer
import (
"context"
"fmt"
"log"
"strings"
"sync"
"github.com/Shopify/sarama"
cluster "github.com/bsm/sarama-cluster"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/tls"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
)
const (
defaultMaxUndeliveredMessages = 1000
)
type empty struct{}
type semaphore chan empty
type Consumer interface {
Errors() <-chan error
Messages() <-chan *sarama.ConsumerMessage
MarkOffset(msg *sarama.ConsumerMessage, metadata string)
Close() error
}
type Kafka struct {
ConsumerGroup string `toml:"consumer_group"`
ClientID string `toml:"client_id"`
Topics []string `toml:"topics"`
Brokers []string `toml:"brokers"`
MaxMessageLen int `toml:"max_message_len"`
Version string `toml:"version"`
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
Offset string `toml:"offset"`
SASLUsername string `toml:"sasl_username"`
SASLPassword string `toml:"sasl_password"`
TopicTag string `toml:"topic_tag"`
tls.ClientConfig
cluster Consumer
parser parsers.Parser
wg *sync.WaitGroup
cancel context.CancelFunc
// Unconfirmed messages
messages map[telegraf.TrackingID]*sarama.ConsumerMessage
// doNotCommitMsgs tells the parser not to call CommitUpTo on the consumer
// this is mostly for test purposes, but there may be a use-case for it later.
doNotCommitMsgs bool
}
var sampleConfig = `
## kafka servers
brokers = ["localhost:9092"]
## topic(s) to consume
topics = ["telegraf"]
## Add topic as tag if topic_tag is not empty
# topic_tag = ""
## Optional Client id
# client_id = "Telegraf"
## Set the minimal supported Kafka version. Setting this enables the use of new
## Kafka features and APIs. Of particular interest, lz4 compression
## requires at least version 0.10.0.0.
## ex: version = "1.1.0"
# version = ""
## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
## Optional SASL Config
# sasl_username = "kafka"
# sasl_password = "secret"
## the name of the consumer group
consumer_group = "telegraf_metrics_consumers"
## Offset (must be either "oldest" or "newest")
offset = "oldest"
## Maximum length of a message to consume, in bytes (default 0/unlimited);
## larger messages are dropped
max_message_len = 1000000
## Maximum messages to read from the broker that have not been written by an
## output. For best throughput set based on the number of metrics within
## each message and the size of the output's metric_batch_size.
##
## For example, if each message from the queue contains 10 metrics and the
## output metric_batch_size is 1000, setting this to 100 will ensure that a
## full batch is collected and the write is triggered immediately without
## waiting until the next flush_interval.
# max_undelivered_messages = 1000
## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
`
func (k *Kafka) SampleConfig() string {
return sampleConfig
}
func (k *Kafka) Description() string {
return "Read metrics from Kafka topic(s)"
}
func (k *Kafka) SetParser(parser parsers.Parser) {
k.parser = parser
}
func (k *Kafka) Start(acc telegraf.Accumulator) error {
var clusterErr error
config := cluster.NewConfig()
if k.Version != "" {
version, err := sarama.ParseKafkaVersion(k.Version)
if err != nil {
return err
}
config.Version = version
}
config.Consumer.Return.Errors = true
tlsConfig, err := k.ClientConfig.TLSConfig()
if err != nil {
return err
}
if k.ClientID != "" {
config.ClientID = k.ClientID
} else {
config.ClientID = "Telegraf"
}
if tlsConfig != nil {
log.Printf("D! TLS Enabled")
config.Net.TLS.Config = tlsConfig
config.Net.TLS.Enable = true
}
if k.SASLUsername != "" && k.SASLPassword != "" {
log.Printf("D! Using SASL auth with username '%s',",
k.SASLUsername)
config.Net.SASL.User = k.SASLUsername
config.Net.SASL.Password = k.SASLPassword
config.Net.SASL.Enable = true
}
switch strings.ToLower(k.Offset) {
case "oldest", "":
config.Consumer.Offsets.Initial = sarama.OffsetOldest
case "newest":
config.Consumer.Offsets.Initial = sarama.OffsetNewest
default:
log.Printf("I! WARNING: Kafka consumer invalid offset '%s', using 'oldest'",
k.Offset)
config.Consumer.Offsets.Initial = sarama.OffsetOldest
}
if k.cluster == nil {
k.cluster, clusterErr = cluster.NewConsumer(
k.Brokers,
k.ConsumerGroup,
k.Topics,
config,
)
if clusterErr != nil {
log.Printf("E! Error when creating Kafka Consumer, brokers: %v, topics: %v",
k.Brokers, k.Topics)
return clusterErr
}
}
ctx, cancel := context.WithCancel(context.Background())
k.cancel = cancel
// Start consumer goroutine
k.wg = &sync.WaitGroup{}
k.wg.Add(1)
go func() {
defer k.wg.Done()
k.receiver(ctx, acc)
}()
log.Printf("I! Started the kafka consumer service, brokers: %v, topics: %v",
k.Brokers, k.Topics)
return nil
}
// receiver() reads all incoming messages from the consumer, and parses them into
// influxdb metric points.
func (k *Kafka) receiver(ctx context.Context, ac telegraf.Accumulator) {
k.messages = make(map[telegraf.TrackingID]*sarama.ConsumerMessage)
acc := ac.WithTracking(k.MaxUndeliveredMessages)
sem := make(semaphore, k.MaxUndeliveredMessages)
for {
select {
case <-ctx.Done():
return
case track := <-acc.Delivered():
<-sem
k.onDelivery(track)
case err := <-k.cluster.Errors():
acc.AddError(err)
case sem <- empty{}:
select {
case <-ctx.Done():
return
case track := <-acc.Delivered():
// Once for the delivered message, once to leave the case
<-sem
<-sem
k.onDelivery(track)
case err := <-k.cluster.Errors():
<-sem
acc.AddError(err)
case msg := <-k.cluster.Messages():
err := k.onMessage(acc, msg)
if err != nil {
acc.AddError(err)
<-sem
}
}
}
}
}
func (k *Kafka) markOffset(msg *sarama.ConsumerMessage) {
if !k.doNotCommitMsgs {
k.cluster.MarkOffset(msg, "")
}
}
func (k *Kafka) onMessage(acc telegraf.TrackingAccumulator, msg *sarama.ConsumerMessage) error {
if k.MaxMessageLen != 0 && len(msg.Value) > k.MaxMessageLen {
k.markOffset(msg)
return fmt.Errorf("Message longer than max_message_len (%d > %d)",
len(msg.Value), k.MaxMessageLen)
}
metrics, err := k.parser.Parse(msg.Value)
if err != nil {
return err
}
if len(k.TopicTag) > 0 {
for _, metric := range metrics {
metric.AddTag(k.TopicTag, msg.Topic)
}
}
id := acc.AddTrackingMetricGroup(metrics)
k.messages[id] = msg
return nil
}
func (k *Kafka) onDelivery(track telegraf.DeliveryInfo) {
msg, ok := k.messages[track.ID()]
if !ok {
log.Printf("E! [inputs.kafka_consumer] Could not mark message delivered: %d", track.ID())
return
}
if track.Delivered() {
k.markOffset(msg)
}
delete(k.messages, track.ID())
}
func (k *Kafka) Stop() {
k.cancel()
k.wg.Wait()
if err := k.cluster.Close(); err != nil {
log.Printf("E! [inputs.kafka_consumer] Error closing consumer: %v", err)
}
}
func (k *Kafka) Gather(acc telegraf.Accumulator) error {
return nil
}
func init() {
inputs.Add("kafka_consumer", func() telegraf.Input {
return &Kafka{
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
}
})
}