Skip to content

Commit

Permalink
Add TLS & credentials configuration for nats_consumer input plugin (i…
Browse files Browse the repository at this point in the history
  • Loading branch information
mmelnyk authored and idohalevi committed Sep 23, 2020
1 parent fe9c9ef commit ffaba18
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 5 deletions.
13 changes: 11 additions & 2 deletions plugins/inputs/nats_consumer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,22 @@ instances of telegraf can read from a NATS cluster in parallel.
[[inputs.nats_consumer]]
## urls of NATS servers
servers = ["nats://localhost:4222"]
## Use Transport Layer Security
secure = false
## subject(s) to consume
subjects = ["telegraf"]
## name a queue group
queue_group = "telegraf_consumers"

## Optional credentials
# username = ""
# password = ""

## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false

## Sets the limits for pending msgs and bytes for each subscription
## These shouldn't need to be adjusted except in very high throughput scenarios
# pending_message_limit = 65536
Expand Down
40 changes: 37 additions & 3 deletions plugins/inputs/nats_consumer/nats_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/tls"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
nats "github.com/nats-io/go-nats"
Expand Down Expand Up @@ -34,7 +35,11 @@ type natsConsumer struct {
QueueGroup string `toml:"queue_group"`
Subjects []string `toml:"subjects"`
Servers []string `toml:"servers"`
Secure bool `toml:"secure"`
Username string `toml:"username"`
Password string `toml:"password"`
tls.ClientConfig
// Legacy; Should be deprecated
Secure bool `toml:"secure"`

// Client pending limits:
PendingMessageLimit int `toml:"pending_message_limit"`
Expand All @@ -61,13 +66,24 @@ type natsConsumer struct {
var sampleConfig = `
## urls of NATS servers
servers = ["nats://localhost:4222"]
## Use Transport Layer Security
## Deprecated: Use Transport Layer Security
secure = false
## subject(s) to consume
subjects = ["telegraf"]
## name a queue group
queue_group = "telegraf_consumers"
## Optional credentials
# username = ""
# password = ""
## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
## Sets the limits for pending msgs and bytes for each subscription
## These shouldn't need to be adjusted except in very high throughput scenarios
# pending_message_limit = 65536
Expand Down Expand Up @@ -125,7 +141,25 @@ func (n *natsConsumer) Start(acc telegraf.Accumulator) error {
// override servers if any were specified
opts.Servers = n.Servers

opts.Secure = n.Secure
// override authentication, if any was specified
if n.Username != "" {
opts.User = n.Username
opts.Password = n.Password
}

// override TLS, if it was specified
tlsConfig, err := n.ClientConfig.TLSConfig()
if err != nil {
return err
}
if tlsConfig != nil {
// set NATS connection TLS options
opts.Secure = true
opts.TLSConfig = tlsConfig
} else {
// should be deprecated; use TLS
opts.Secure = n.Secure
}

if n.conn == nil || n.conn.IsClosed() {
n.conn, connectErr = opts.Connect()
Expand Down

0 comments on commit ffaba18

Please sign in to comment.