diff --git a/plugins/outputs/kafka/kafka.go b/plugins/outputs/kafka/kafka.go index a45e2a4e9c3cf..a99c8e1c20819 100644 --- a/plugins/outputs/kafka/kafka.go +++ b/plugins/outputs/kafka/kafka.go @@ -246,32 +246,34 @@ func (k *Kafka) Description() string { } func (k *Kafka) Write(metrics []telegraf.Metric) error { - if len(metrics) == 0 { - return nil - } - + msgs := make([]*sarama.ProducerMessage, 0, len(metrics)) for _, metric := range metrics { buf, err := k.serializer.Serialize(metric) if err != nil { return err } - topicName := k.GetTopicName(metric) - m := &sarama.ProducerMessage{ - Topic: topicName, + Topic: k.GetTopicName(metric), Value: sarama.ByteEncoder(buf), } - if h, ok := metric.Tags()[k.RoutingTag]; ok { + if h, ok := metric.GetTag(k.RoutingTag); ok { m.Key = sarama.StringEncoder(h) } + msgs = append(msgs, m) + } - _, _, err = k.producer.SendMessage(m) - - if err != nil { - return fmt.Errorf("FAILED to send kafka message: %s\n", err) + err := k.producer.SendMessages(msgs) + if err != nil { + // We could have many errors, return only the first encountered. + if errs, ok := err.(sarama.ProducerErrors); ok { + for _, prodErr := range errs { + return prodErr + } } + return err } + return nil }