Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to specify routing_key in amqp output #3994

Merged
merged 3 commits into from
Jun 3, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions etc/telegraf.conf
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,11 @@
# ## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as
# ## described here: https://www.rabbitmq.com/plugins.html
# # auth_method = "PLAIN"
# ## Topic routing key
# # routing_key = ""
# ## Telegraf tag to use as a routing key
# ## ie, if this tag exists, its value will be used as the routing key
# ## and override routing_key config even if defined
# routing_tag = "host"
# ## Delivery Mode controls if a published message is persistent
# ## Valid options are "transient" and "persistent". default: "transient"
Expand Down Expand Up @@ -2176,7 +2179,7 @@
# reverse_metric_names = true


# # A plugin to collect stats from Opensmtpd - a validating, recursive, and caching DNS resolver
# # A plugin to collect stats from Opensmtpd - a validating, recursive, and caching DNS resolver
# [[inputs.opensmtpd]]
# ## If running as a restricted user you can prepend sudo for additional access:
# #use_sudo = false
Expand Down Expand Up @@ -3403,4 +3406,3 @@
# [[inputs.zipkin]]
# # path = "/api/v1/spans" # URL path for span data
# # port = 9411 # Port on which Telegraf listens

17 changes: 13 additions & 4 deletions plugins/outputs/amqp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,18 @@

This plugin writes to a AMQP 0-9-1 Exchange, a promenent implementation of this protocol being [RabbitMQ](https://www.rabbitmq.com/).

Metrics are written to a topic exchange using tag, defined in configuration file as RoutingTag, as a routing key.
Metrics are written to a topic exchange using a routing key defined by:
1. The routing_key config defines a static value
2. The routing_tag config defines a metric tag with a dynamic value, overriding the static routing_key if found
3. If neither option is defined, or the tag is not found in a metric, then the empty routing key will be used

If RoutingTag is empty, then empty routing key will be used.
Metrics are grouped in batches by RoutingTag.
Metrics are grouped in batches by the final routing key.

This plugin doesn't bind exchange to a queue, so it should be done by consumer.
This plugin doesn't bind exchange to a queue, so it should be done by consumer. The exchange is always defined as type: topic.
To use it for distributing metrics equally among workers (type: direct), set the routing_key to a static value on the exchange,
declare and bind a single queue with the same routing_key, and consume from the same queue in each worker.
To use it to send metrics to many consumers at once (type: fanout), set the routing_key to "#" on the exchange, then declare, bind,
and consume from individual queues in each worker.

For an introduction to AMQP see:
- https://www.rabbitmq.com/tutorials/amqp-concepts.html
Expand All @@ -26,8 +32,11 @@ For an introduction to AMQP see:
## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as
## described here: https://www.rabbitmq.com/plugins.html
# auth_method = "PLAIN"
## Topic routing key
# routing_key = ""
## Telegraf tag to use as a routing key
## ie, if this tag exists, its value will be used as the routing key
## and override routing_key config even if defined
routing_tag = "host"
## Delivery Mode controls if a published message is persistent
## Valid options are "transient" and "persistent". default: "transient"
Expand Down
10 changes: 9 additions & 1 deletion plugins/outputs/amqp/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ type AMQP struct {
Exchange string
// AMQP Auth method
AuthMethod string
// Routing Key Tag
// Routing Key (static)
RoutingKey string `toml:"routing_key"`
// Routing Key from Tag
RoutingTag string `toml:"routing_tag"`
// InfluxDB database
Database string
Expand Down Expand Up @@ -83,8 +85,11 @@ var sampleConfig = `
## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as
## described here: https://www.rabbitmq.com/plugins.html
# auth_method = "PLAIN"
## Topic routing key
# routing_key = ""
## Telegraf tag to use as a routing key
## ie, if this tag exists, its value will be used as the routing key
## and override routing_key config even if defined
routing_tag = "host"
## Delivery Mode controls if a published message is persistent
## Valid options are "transient" and "persistent". default: "transient"
Expand Down Expand Up @@ -241,6 +246,9 @@ func (q *AMQP) Write(metrics []telegraf.Metric) error {

for _, metric := range metrics {
var key string
if q.RoutingKey != "" {
key = q.RoutingKey
}
if q.RoutingTag != "" {
if h, ok := metric.Tags()[q.RoutingTag]; ok {
key = h
Expand Down