diff --git a/outputs/amqp/README.md b/outputs/amqp/README.md index e708e3496434b..2fdedfbf183d3 100644 --- a/outputs/amqp/README.md +++ b/outputs/amqp/README.md @@ -4,5 +4,6 @@ This plugin writes to a AMQP exchange using tag, defined in configuration file as RoutingTag, as a routing key. If RoutingTag is empty, then empty routing key will be used. +Metrics are grouped in batches by RoutingTag. This plugin doesn't bind exchange to a queue, so it should be done by consumer. diff --git a/outputs/amqp/amqp.go b/outputs/amqp/amqp.go index b8ae0501ddf4e..e33aad274435d 100644 --- a/outputs/amqp/amqp.go +++ b/outputs/amqp/amqp.go @@ -1,6 +1,7 @@ package amqp import ( + "bytes" "fmt" "log" "sync" @@ -88,6 +89,7 @@ func (q *AMQP) Write(points []*client.Point) error { if len(points) == 0 { return nil } + var outbuf = make(map[string][][]byte) for _, p := range points { // Combine tags from Point and BatchPoints and grab the resulting @@ -100,7 +102,10 @@ func (q *AMQP) Write(points []*client.Point) error { key = h } } + outbuf[key] = append(outbuf[key], []byte(value)) + } + for key, buf := range outbuf { err := q.channel.Publish( q.Exchange, // exchange key, // routing key @@ -108,7 +113,7 @@ func (q *AMQP) Write(points []*client.Point) error { false, // immediate amqp.Publishing{ ContentType: "text/plain", - Body: []byte(value), + Body: bytes.Join(buf, []byte("\n")), }) if err != nil { return fmt.Errorf("FAILED to send amqp message: %s", err)