diff --git a/CHANGELOG.md b/CHANGELOG.md index b2bfd9a79cab4..d58777b986da1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -114,6 +114,7 @@ be deprecated eventually. - [#2450](https://github.com/influxdata/telegraf/issues/2450): Network statistics not collected when system has alias interfaces - [#1911](https://github.com/influxdata/telegraf/issues/1911): Sysstat plugin needs LANG=C or similar locale - [#2528](https://github.com/influxdata/telegraf/issues/2528): File output closes standard streams on reload. +- [#2603](https://github.com/influxdata/telegraf/issues/2603): AMQP output disconnect blocks all outputs ## v1.2.1 [2017-02-01] diff --git a/plugins/outputs/amqp/README.md b/plugins/outputs/amqp/README.md index 208ae934cb35b..ab5900bcf1359 100644 --- a/plugins/outputs/amqp/README.md +++ b/plugins/outputs/amqp/README.md @@ -35,6 +35,10 @@ For an introduction to AMQP see: ## InfluxDB database # database = "telegraf" + ## Write timeout, formatted as a string. If not provided, will default + ## to 5s. 0s means no timeout (not recommended). + # timeout = "5s" + ## Optional SSL Config # ssl_ca = "/etc/telegraf/ca.pem" # ssl_cert = "/etc/telegraf/cert.pem" diff --git a/plugins/outputs/amqp/amqp.go b/plugins/outputs/amqp/amqp.go index 7b4c7d4c9eb8c..63c6db6fdc2b0 100644 --- a/plugins/outputs/amqp/amqp.go +++ b/plugins/outputs/amqp/amqp.go @@ -3,6 +3,7 @@ package amqp import ( "fmt" "log" + "net" "strings" "sync" "time" @@ -15,6 +16,12 @@ import ( "github.com/streadway/amqp" ) +type client struct { + conn *amqp.Connection + channel *amqp.Channel + headers amqp.Table +} + type AMQP struct { // AMQP brokers to send metrics to URL string @@ -30,6 +37,8 @@ type AMQP struct { RetentionPolicy string // InfluxDB precision (DEPRECATED) Precision string + // Connection timeout + Timeout internal.Duration // Path to CA file SSLCA string `toml:"ssl_ca"` @@ -40,10 +49,8 @@ type AMQP struct { // Use SSL but skip chain & host verification InsecureSkipVerify bool - conn *amqp.Connection - channel *amqp.Channel sync.Mutex - headers amqp.Table + c *client serializer serializers.Serializer } @@ -81,6 +88,10 @@ var sampleConfig = ` ## InfluxDB database # database = "telegraf" + ## Write timeout, formatted as a string. If not provided, will default + ## to 5s. 0s means no timeout (not recommended). + # timeout = "5s" + ## Optional SSL Config # ssl_ca = "/etc/telegraf/ca.pem" # ssl_cert = "/etc/telegraf/cert.pem" @@ -100,10 +111,7 @@ func (a *AMQP) SetSerializer(serializer serializers.Serializer) { } func (q *AMQP) Connect() error { - q.Lock() - defer q.Unlock() - - q.headers = amqp.Table{ + headers := amqp.Table{ "database": q.Database, "retention_policy": q.RetentionPolicy, } @@ -126,13 +134,15 @@ func (q *AMQP) Connect() error { amqpConf := amqp.Config{ TLSClientConfig: tls, SASL: sasl, // if nil, it will be PLAIN + Dial: func(network, addr string) (net.Conn, error) { + return net.DialTimeout(network, addr, q.Timeout.Duration) + }, } connection, err = amqp.DialConfig(q.URL, amqpConf) if err != nil { return err } - q.conn = connection channel, err := connection.Channel() if err != nil { @@ -151,25 +161,38 @@ func (q *AMQP) Connect() error { if err != nil { return fmt.Errorf("Failed to declare an exchange: %s", err) } - q.channel = channel + + q.setClient(&client{ + conn: connection, + channel: channel, + headers: headers, + }) + go func() { err := <-connection.NotifyClose(make(chan *amqp.Error)) if err == nil { return } + + q.setClient(nil) + log.Printf("I! Closing: %s", err) log.Printf("I! Trying to reconnect") for err := q.Connect(); err != nil; err = q.Connect() { log.Println("E! ", err.Error()) time.Sleep(10 * time.Second) } - }() return nil } func (q *AMQP) Close() error { - err := q.conn.Close() + c := q.getClient() + if c == nil { + return nil + } + + err := c.conn.Close() if err != nil && err != amqp.ErrClosed { log.Printf("E! Error closing AMQP connection: %s", err) return err @@ -186,11 +209,15 @@ func (q *AMQP) Description() string { } func (q *AMQP) Write(metrics []telegraf.Metric) error { - q.Lock() - defer q.Unlock() if len(metrics) == 0 { return nil } + + c := q.getClient() + if c == nil { + return fmt.Errorf("connection is not open") + } + outbuf := make(map[string][]byte) for _, metric := range metrics { @@ -210,13 +237,15 @@ func (q *AMQP) Write(metrics []telegraf.Metric) error { } for key, buf := range outbuf { - err := q.channel.Publish( + // Note that since the channel is not in confirm mode, the absence of + // an error does not indicate successful delivery. + err := c.channel.Publish( q.Exchange, // exchange key, // routing key false, // mandatory false, // immediate amqp.Publishing{ - Headers: q.headers, + Headers: c.headers, ContentType: "text/plain", Body: buf, }) @@ -227,12 +256,25 @@ func (q *AMQP) Write(metrics []telegraf.Metric) error { return nil } +func (q *AMQP) getClient() *client { + q.Lock() + defer q.Unlock() + return q.c +} + +func (q *AMQP) setClient(c *client) { + q.Lock() + q.c = c + q.Unlock() +} + func init() { outputs.Add("amqp", func() telegraf.Output { return &AMQP{ AuthMethod: DefaultAuthMethod, Database: DefaultDatabase, RetentionPolicy: DefaultRetentionPolicy, + Timeout: internal.Duration{Duration: time.Second * 5}, } }) }