Skip to content

Commit

Permalink
mqtt_consumer: option to set persistent session and client ID
Browse files Browse the repository at this point in the history
  • Loading branch information
sparrc authored and geodimm committed Mar 10, 2016
1 parent 7e248b4 commit 2c9ddbc
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
- [#754](https://github.com/influxdata/telegraf/pull/754): docker plugin: adding `docker info` metrics to output. Thanks @titilambert!
- [#788](https://github.com/influxdata/telegraf/pull/788): -input-list and -output-list command-line options. Thanks @ebookbug!
- [#778](https://github.com/influxdata/telegraf/pull/778): Adding a TCP input listener.
- [#797](https://github.com/influxdata/telegraf/issues/797): Provide option for persistent MQTT consumer client sessions.

### Bugfixes
- [#748](https://github.com/influxdata/telegraf/issues/748): Fix sensor plugin split on ":"
Expand Down
22 changes: 21 additions & 1 deletion plugins/inputs/mqtt_consumer/mqtt_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ type MQTTConsumer struct {
// Legacy metric buffer support
MetricBuffer int

PersistentSession bool
ClientID string `toml:"client_id"`

// Path to CA file
SSLCA string `toml:"ssl_ca"`
// Path to host cert file
Expand Down Expand Up @@ -57,6 +60,13 @@ var sampleConfig = `
"sensors/#",
]
# if true, messages that can't be delivered while the subscriber is offline
# will be delivered when it comes back (such as on service restart).
# NOTE: if true, client_id MUST be set
persistent_session = false
# If empty, a random client ID will be generated.
client_id = ""
## username and password to connect MQTT server.
# username = "telegraf"
# password = "metricsmetricsmetricsmetrics"
Expand Down Expand Up @@ -91,6 +101,11 @@ func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error {
m.Lock()
defer m.Unlock()

if m.PersistentSession && m.ClientID == "" {
return fmt.Errorf("ERROR MQTT Consumer: When using persistent_session" +
" = true, you MUST also set client_id")
}

m.acc = acc
if m.QoS > 2 || m.QoS < 0 {
return fmt.Errorf("MQTT Consumer, invalid QoS value: %d", m.QoS)
Expand Down Expand Up @@ -166,7 +181,11 @@ func (m *MQTTConsumer) Gather(acc telegraf.Accumulator) error {
func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) {
opts := mqtt.NewClientOptions()

opts.SetClientID("Telegraf-Consumer-" + internal.RandomString(5))
if m.ClientID == "" {
opts.SetClientID("Telegraf-Consumer-" + internal.RandomString(5))
} else {
opts.SetClientID(m.ClientID)
}

tlsCfg, err := internal.GetTLSConfig(
m.SSLCert, m.SSLKey, m.SSLCA, m.InsecureSkipVerify)
Expand Down Expand Up @@ -199,6 +218,7 @@ func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) {
}
opts.SetAutoReconnect(true)
opts.SetKeepAlive(time.Second * 60)
opts.SetCleanSession(!m.PersistentSession)
return opts, nil
}

Expand Down
48 changes: 48 additions & 0 deletions plugins/inputs/mqtt_consumer/mqtt_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil"

"github.com/stretchr/testify/assert"

"git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git"
)

Expand All @@ -28,6 +30,52 @@ func newTestMQTTConsumer() (*MQTTConsumer, chan mqtt.Message) {
return n, in
}

// Test that default client has random ID
func TestRandomClientID(t *testing.T) {
m1 := &MQTTConsumer{
Servers: []string{"localhost:1883"}}
opts, err := m1.createOpts()
assert.NoError(t, err)

m2 := &MQTTConsumer{
Servers: []string{"localhost:1883"}}
opts2, err2 := m2.createOpts()
assert.NoError(t, err2)

assert.NotEqual(t, opts.ClientID, opts2.ClientID)
}

// Test that default client has random ID
func TestClientID(t *testing.T) {
m1 := &MQTTConsumer{
Servers: []string{"localhost:1883"},
ClientID: "telegraf-test",
}
opts, err := m1.createOpts()
assert.NoError(t, err)

m2 := &MQTTConsumer{
Servers: []string{"localhost:1883"},
ClientID: "telegraf-test",
}
opts2, err2 := m2.createOpts()
assert.NoError(t, err2)

assert.Equal(t, "telegraf-test", opts2.ClientID)
assert.Equal(t, "telegraf-test", opts.ClientID)
}

// Test that Start() fails if client ID is not set but persistent is
func TestPersistentClientIDFail(t *testing.T) {
m1 := &MQTTConsumer{
Servers: []string{"localhost:1883"},
PersistentSession: true,
}
acc := testutil.Accumulator{}
err := m1.Start(&acc)
assert.Error(t, err)
}

// Test that the parser parses NATS messages into metrics
func TestRunParser(t *testing.T) {
n, in := newTestMQTTConsumer()
Expand Down

0 comments on commit 2c9ddbc

Please sign in to comment.