Skip to content

Commit

Permalink
feat(outputs.mqtt): Add client trace logging, resolve MQTT5 reconnect…
Browse files Browse the repository at this point in the history
… login (#15429)
  • Loading branch information
powersj authored Jun 6, 2024
1 parent 079c9d2 commit f0c7258
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 27 deletions.
1 change: 1 addition & 0 deletions plugins/common/mqtt/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type MqttConfig struct {
KeepAlive int64 `toml:"keep_alive"`
PersistentSession bool `toml:"persistent_session"`
PublishPropertiesV5 *PublishProperties `toml:"v5"`
ClientTrace bool `toml:"client_trace"`

tls.ClientConfig

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package mqtt_consumer
package mqtt

import (
"github.com/influxdata/telegraf"
Expand Down
9 changes: 9 additions & 0 deletions plugins/common/mqtt/mqtt_v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
mqttv3 "github.com/eclipse/paho.mqtt.golang" // Library that supports v3.1.1

"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/logger"
)

type mqttv311Client struct {
Expand Down Expand Up @@ -77,6 +78,14 @@ func NewMQTTv311Client(cfg *MqttConfig) (*mqttv311Client, error) {
opts.AddBroker(broker)
}

if cfg.ClientTrace {
log := &mqttLogger{logger.NewLogger("paho", "", "")}
mqttv3.ERROR = log
mqttv3.CRITICAL = log
mqttv3.WARN = log
mqttv3.DEBUG = log
}

return &mqttv311Client{
client: mqttv3.NewClient(opts),
timeout: time.Duration(cfg.Timeout),
Expand Down
43 changes: 26 additions & 17 deletions plugins/common/mqtt/mqtt_v5.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,19 @@ import (

"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/logger"
)

type mqttv5Client struct {
client *mqttv5auto.ConnectionManager
options mqttv5auto.ClientConfig
username config.Secret
password config.Secret
timeout time.Duration
qos int
retain bool
properties *mqttv5.PublishProperties
client *mqttv5auto.ConnectionManager
options mqttv5auto.ClientConfig
username config.Secret
password config.Secret
timeout time.Duration
qos int
retain bool
clientTrace bool
properties *mqttv5.PublishProperties
}

func NewMQTTv5Client(cfg *MqttConfig) (*mqttv5Client, error) {
Expand Down Expand Up @@ -94,13 +96,14 @@ func NewMQTTv5Client(cfg *MqttConfig) (*mqttv5Client, error) {
}

return &mqttv5Client{
options: opts,
timeout: time.Duration(cfg.Timeout),
username: cfg.Username,
password: cfg.Password,
qos: cfg.QoS,
retain: cfg.Retain,
properties: properties,
options: opts,
timeout: time.Duration(cfg.Timeout),
username: cfg.Username,
password: cfg.Password,
qos: cfg.QoS,
retain: cfg.Retain,
properties: properties,
clientTrace: cfg.ClientTrace,
}, nil
}

Expand All @@ -115,8 +118,14 @@ func (m *mqttv5Client) Connect() (bool, error) {
return false, fmt.Errorf("getting password failed: %w", err)
}
defer pass.Destroy()
m.options.ConnectUsername = user.TemporaryString()
m.options.ConnectPassword = pass.Bytes()
m.options.ConnectUsername = user.String()
m.options.ConnectPassword = []byte(pass.String())

if m.clientTrace {
log := mqttLogger{logger.NewLogger("paho", "", "")}
m.options.Debug = log
m.options.Errors = log
}

client, err := mqttv5auto.NewConnection(context.Background(), m.options)
if err != nil {
Expand Down
9 changes: 0 additions & 9 deletions plugins/inputs/mqtt_consumer/mqtt_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ type MQTTConsumer struct {
Password config.Secret `toml:"password"`
QoS int `toml:"qos"`
ConnectionTimeout config.Duration `toml:"connection_timeout"`
ClientTrace bool `toml:"client_trace"`
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
PersistentSession bool `toml:"persistent_session"`
ClientID string `toml:"client_id"`
Expand Down Expand Up @@ -105,14 +104,6 @@ func (m *MQTTConsumer) SetParser(parser telegraf.Parser) {
m.parser = parser
}
func (m *MQTTConsumer) Init() error {
if m.ClientTrace {
log := &mqttLogger{m.Log}
mqtt.ERROR = log
mqtt.CRITICAL = log
mqtt.WARN = log
mqtt.DEBUG = log
}

m.state = Disconnected
if m.PersistentSession && m.ClientID == "" {
return errors.New("persistent_session requires client_id")
Expand Down
6 changes: 6 additions & 0 deletions plugins/outputs/mqtt/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,12 @@ to use them.
## actually reads it
# retain = false

## Client trace messages
## When set to true, and debug mode enabled in the agent settings, the MQTT
## client's messages are included in telegraf logs. These messages are very
## noisey, but essential for debugging issues.
# client_trace = false

## Layout of the topics published.
## The following choices are available:
## non-batch -- send individual messages, one for each metric
Expand Down
6 changes: 6 additions & 0 deletions plugins/outputs/mqtt/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@
## actually reads it
# retain = false

## Client trace messages
## When set to true, and debug mode enabled in the agent settings, the MQTT
## client's messages are included in telegraf logs. These messages are very
## noisey, but essential for debugging issues.
# client_trace = false

## Layout of the topics published.
## The following choices are available:
## non-batch -- send individual messages, one for each metric
Expand Down

0 comments on commit f0c7258

Please sign in to comment.