diff --git a/plugins/inputs/nats_consumer/README.md b/plugins/inputs/nats_consumer/README.md index 8a89d90c5fe82..9c3bfb2d77c6a 100644 --- a/plugins/inputs/nats_consumer/README.md +++ b/plugins/inputs/nats_consumer/README.md @@ -12,13 +12,22 @@ instances of telegraf can read from a NATS cluster in parallel. [[inputs.nats_consumer]] ## urls of NATS servers servers = ["nats://localhost:4222"] - ## Use Transport Layer Security - secure = false ## subject(s) to consume subjects = ["telegraf"] ## name a queue group queue_group = "telegraf_consumers" + ## Optional credentials + # username = "" + # password = "" + + ## Optional TLS Config + # tls_ca = "/etc/telegraf/ca.pem" + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + ## Use TLS but skip chain & host verification + # insecure_skip_verify = false + ## Sets the limits for pending msgs and bytes for each subscription ## These shouldn't need to be adjusted except in very high throughput scenarios # pending_message_limit = 65536 diff --git a/plugins/inputs/nats_consumer/nats_consumer.go b/plugins/inputs/nats_consumer/nats_consumer.go index 4411d8c3ec89c..7ee05bc176977 100644 --- a/plugins/inputs/nats_consumer/nats_consumer.go +++ b/plugins/inputs/nats_consumer/nats_consumer.go @@ -7,6 +7,7 @@ import ( "sync" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal/tls" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers" nats "github.com/nats-io/go-nats" @@ -34,7 +35,11 @@ type natsConsumer struct { QueueGroup string `toml:"queue_group"` Subjects []string `toml:"subjects"` Servers []string `toml:"servers"` - Secure bool `toml:"secure"` + Username string `toml:"username"` + Password string `toml:"password"` + tls.ClientConfig + // Legacy; Should be deprecated + Secure bool `toml:"secure"` // Client pending limits: PendingMessageLimit int `toml:"pending_message_limit"` @@ -61,13 +66,24 @@ type natsConsumer struct { var sampleConfig = ` ## urls of NATS servers servers = ["nats://localhost:4222"] - ## Use Transport Layer Security + ## Deprecated: Use Transport Layer Security secure = false ## subject(s) to consume subjects = ["telegraf"] ## name a queue group queue_group = "telegraf_consumers" + ## Optional credentials + # username = "" + # password = "" + + ## Optional TLS Config + # tls_ca = "/etc/telegraf/ca.pem" + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + ## Use TLS but skip chain & host verification + # insecure_skip_verify = false + ## Sets the limits for pending msgs and bytes for each subscription ## These shouldn't need to be adjusted except in very high throughput scenarios # pending_message_limit = 65536 @@ -125,7 +141,25 @@ func (n *natsConsumer) Start(acc telegraf.Accumulator) error { // override servers if any were specified opts.Servers = n.Servers - opts.Secure = n.Secure + // override authentication, if any was specified + if n.Username != "" { + opts.User = n.Username + opts.Password = n.Password + } + + // override TLS, if it was specified + tlsConfig, err := n.ClientConfig.TLSConfig() + if err != nil { + return err + } + if tlsConfig != nil { + // set NATS connection TLS options + opts.Secure = true + opts.TLSConfig = tlsConfig + } else { + // should be deprecated; use TLS + opts.Secure = n.Secure + } if n.conn == nil || n.conn.IsClosed() { n.conn, connectErr = opts.Connect()