Skip to content

Commit

Permalink
Add static routing_key option to amqp output (#3994)
Browse files Browse the repository at this point in the history
  • Loading branch information
gentstr authored and danielnelson committed Jun 3, 2018
1 parent 414d7b9 commit 986186a
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 7 deletions.
6 changes: 4 additions & 2 deletions etc/telegraf.conf
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,11 @@
# ## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as
# ## described here: https://www.rabbitmq.com/plugins.html
# # auth_method = "PLAIN"
# ## Topic routing key
# # routing_key = ""
# ## Telegraf tag to use as a routing key
# ## ie, if this tag exists, its value will be used as the routing key
# ## and override routing_key config even if defined
# routing_tag = "host"
# ## Delivery Mode controls if a published message is persistent
# ## Valid options are "transient" and "persistent". default: "transient"
Expand Down Expand Up @@ -2328,7 +2331,7 @@
# reverse_metric_names = true


# # A plugin to collect stats from Opensmtpd - a validating, recursive, and caching DNS resolver
# # A plugin to collect stats from Opensmtpd - a validating, recursive, and caching DNS resolver
# [[inputs.opensmtpd]]
# ## If running as a restricted user you can prepend sudo for additional access:
# #use_sudo = false
Expand Down Expand Up @@ -3630,4 +3633,3 @@
# [[inputs.zipkin]]
# # path = "/api/v1/spans" # URL path for span data
# # port = 9411 # Port on which Telegraf listens

17 changes: 13 additions & 4 deletions plugins/outputs/amqp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,18 @@

This plugin writes to a AMQP 0-9-1 Exchange, a promenent implementation of this protocol being [RabbitMQ](https://www.rabbitmq.com/).

Metrics are written to a topic exchange using tag, defined in configuration file as RoutingTag, as a routing key.
Metrics are written to a topic exchange using a routing key defined by:
1. The routing_key config defines a static value
2. The routing_tag config defines a metric tag with a dynamic value, overriding the static routing_key if found
3. If neither option is defined, or the tag is not found in a metric, then the empty routing key will be used

If RoutingTag is empty, then empty routing key will be used.
Metrics are grouped in batches by RoutingTag.
Metrics are grouped in batches by the final routing key.

This plugin doesn't bind exchange to a queue, so it should be done by consumer.
This plugin doesn't bind exchange to a queue, so it should be done by consumer. The exchange is always defined as type: topic.
To use it for distributing metrics equally among workers (type: direct), set the routing_key to a static value on the exchange,
declare and bind a single queue with the same routing_key, and consume from the same queue in each worker.
To use it to send metrics to many consumers at once (type: fanout), set the routing_key to "#" on the exchange, then declare, bind,
and consume from individual queues in each worker.

For an introduction to AMQP see:
- https://www.rabbitmq.com/tutorials/amqp-concepts.html
Expand All @@ -26,8 +32,11 @@ For an introduction to AMQP see:
## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as
## described here: https://www.rabbitmq.com/plugins.html
# auth_method = "PLAIN"
## Topic routing key
# routing_key = ""
## Telegraf tag to use as a routing key
## ie, if this tag exists, its value will be used as the routing key
## and override routing_key config even if defined
routing_tag = "host"
## Delivery Mode controls if a published message is persistent
## Valid options are "transient" and "persistent". default: "transient"
Expand Down
10 changes: 9 additions & 1 deletion plugins/outputs/amqp/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ type AMQP struct {
Exchange string
// AMQP Auth method
AuthMethod string
// Routing Key Tag
// Routing Key (static)
RoutingKey string `toml:"routing_key"`
// Routing Key from Tag
RoutingTag string `toml:"routing_tag"`
// InfluxDB database
Database string
Expand Down Expand Up @@ -77,8 +79,11 @@ var sampleConfig = `
## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as
## described here: https://www.rabbitmq.com/plugins.html
# auth_method = "PLAIN"
## Topic routing key
# routing_key = ""
## Telegraf tag to use as a routing key
## ie, if this tag exists, its value will be used as the routing key
## and override routing_key config even if defined
routing_tag = "host"
## Delivery Mode controls if a published message is persistent
## Valid options are "transient" and "persistent". default: "transient"
Expand Down Expand Up @@ -234,6 +239,9 @@ func (q *AMQP) Write(metrics []telegraf.Metric) error {

for _, metric := range metrics {
var key string
if q.RoutingKey != "" {
key = q.RoutingKey
}
if q.RoutingTag != "" {
if h, ok := metric.Tags()[q.RoutingTag]; ok {
key = h
Expand Down

0 comments on commit 986186a

Please sign in to comment.