Skip to content

Commit

Permalink
Send all messages before waiting for results in kafka output (influxd…
Browse files Browse the repository at this point in the history
  • Loading branch information
danielnelson authored and rgitzel committed Oct 17, 2018
1 parent 0bf62b1 commit 10cdb28
Showing 1 changed file with 14 additions and 12 deletions.
26 changes: 14 additions & 12 deletions plugins/outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,32 +246,34 @@ func (k *Kafka) Description() string {
}

func (k *Kafka) Write(metrics []telegraf.Metric) error {
if len(metrics) == 0 {
return nil
}

msgs := make([]*sarama.ProducerMessage, 0, len(metrics))
for _, metric := range metrics {
buf, err := k.serializer.Serialize(metric)
if err != nil {
return err
}

topicName := k.GetTopicName(metric)

m := &sarama.ProducerMessage{
Topic: topicName,
Topic: k.GetTopicName(metric),
Value: sarama.ByteEncoder(buf),
}
if h, ok := metric.Tags()[k.RoutingTag]; ok {
if h, ok := metric.GetTag(k.RoutingTag); ok {
m.Key = sarama.StringEncoder(h)
}
msgs = append(msgs, m)
}

_, _, err = k.producer.SendMessage(m)

if err != nil {
return fmt.Errorf("FAILED to send kafka message: %s\n", err)
err := k.producer.SendMessages(msgs)
if err != nil {
// We could have many errors, return only the first encountered.
if errs, ok := err.(sarama.ProducerErrors); ok {
for _, prodErr := range errs {
return prodErr
}
}
return err
}

return nil
}

Expand Down

0 comments on commit 10cdb28

Please sign in to comment.