Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to amqp output to publish persistent messages #3528

Merged
merged 2 commits into from
Dec 1, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions plugins/inputs/postfix/stat_none.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

package postfix

import (
"time"
)

func statCTime(_ interface{}) time.Time {
return time.Time{}
}
3 changes: 3 additions & 0 deletions plugins/outputs/amqp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ For an introduction to AMQP see:
## Telegraf tag to use as a routing key
## ie, if this tag exists, its value will be used as the routing key
routing_tag = "host"
## Delivery Mode controls if a published message is persistent
## Valid options are "transient" and "persistent". default: "transient"
# delivery_mode = "transient"

## InfluxDB retention policy
# retention_policy = "default"
Expand Down
28 changes: 24 additions & 4 deletions plugins/outputs/amqp/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ type AMQP struct {
Precision string
// Connection timeout
Timeout internal.Duration
// Delivery Mode controls if a published message is persistent
// Valid options are "transient" and "persistent". default: "transient"
DeliveryMode string

// Path to CA file
SSLCA string `toml:"ssl_ca"`
Expand All @@ -52,7 +55,8 @@ type AMQP struct {
sync.Mutex
c *client

serializer serializers.Serializer
deliveryMode uint8
serializer serializers.Serializer
}

type externalAuth struct{}
Expand Down Expand Up @@ -82,6 +86,9 @@ var sampleConfig = `
## Telegraf tag to use as a routing key
## ie, if this tag exists, its value will be used as the routing key
routing_tag = "host"
## Delivery Mode controls if a published message is persistent
## Valid options are "transient" and "persistent". default: "transient"
delivery_mode = "transient"

## InfluxDB retention policy
# retention_policy = "default"
Expand Down Expand Up @@ -111,6 +118,18 @@ func (a *AMQP) SetSerializer(serializer serializers.Serializer) {
}

func (q *AMQP) Connect() error {
switch q.DeliveryMode {
case "transient":
q.deliveryMode = amqp.Transient
break
case "persistent":
q.deliveryMode = amqp.Persistent
break
default:
q.deliveryMode = amqp.Transient
break
}

headers := amqp.Table{
"database": q.Database,
"retention_policy": q.RetentionPolicy,
Expand Down Expand Up @@ -245,9 +264,10 @@ func (q *AMQP) Write(metrics []telegraf.Metric) error {
false, // mandatory
false, // immediate
amqp.Publishing{
Headers: c.headers,
ContentType: "text/plain",
Body: buf,
Headers: c.headers,
ContentType: "text/plain",
Body: buf,
DeliveryMode: q.deliveryMode,
})
if err != nil {
return fmt.Errorf("Failed to send AMQP message: %s", err)
Expand Down