Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Combine BatchPoints with the same RoutingTag to one message in amqp output #287

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions outputs/amqp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@ This plugin writes to a AMQP exchange using tag, defined in configuration file
as RoutingTag, as a routing key.

If RoutingTag is empty, then empty routing key will be used.
Metrics are grouped in batches by RoutingTag.

This plugin doesn't bind exchange to a queue, so it should be done by consumer.
7 changes: 6 additions & 1 deletion outputs/amqp/amqp.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package amqp

import (
"bytes"
"fmt"
"log"
"sync"
Expand Down Expand Up @@ -88,6 +89,7 @@ func (q *AMQP) Write(points []*client.Point) error {
if len(points) == 0 {
return nil
}
var outbuf = make(map[string][][]byte)

for _, p := range points {
// Combine tags from Point and BatchPoints and grab the resulting
Expand All @@ -100,15 +102,18 @@ func (q *AMQP) Write(points []*client.Point) error {
key = h
}
}
outbuf[key] = append(outbuf[key], []byte(value))

}
for key, buf := range outbuf {
err := q.channel.Publish(
q.Exchange, // exchange
key, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(value),
Body: bytes.Join(buf, []byte("\n")),
})
if err != nil {
return fmt.Errorf("FAILED to send amqp message: %s", err)
Expand Down