Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(outputs.mqtt): Add client trace logging, resolve MQTT5 reconnect login #15429

Merged
merged 6 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions plugins/common/mqtt/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type MqttConfig struct {
KeepAlive int64 `toml:"keep_alive"`
PersistentSession bool `toml:"persistent_session"`
PublishPropertiesV5 *PublishProperties `toml:"v5"`
ClientTrace bool `toml:"client_trace"`

tls.ClientConfig

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package mqtt_consumer
package mqtt

import (
"github.com/influxdata/telegraf"
Expand Down
9 changes: 9 additions & 0 deletions plugins/common/mqtt/mqtt_v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
mqttv3 "github.com/eclipse/paho.mqtt.golang" // Library that supports v3.1.1

"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/logger"
)

type mqttv311Client struct {
Expand Down Expand Up @@ -77,6 +78,14 @@ func NewMQTTv311Client(cfg *MqttConfig) (*mqttv311Client, error) {
opts.AddBroker(broker)
}

if cfg.ClientTrace {
log := &mqttLogger{logger.NewLogger("paho", "", "")}
mqttv3.ERROR = log
mqttv3.CRITICAL = log
mqttv3.WARN = log
mqttv3.DEBUG = log
}

return &mqttv311Client{
client: mqttv3.NewClient(opts),
timeout: time.Duration(cfg.Timeout),
Expand Down
43 changes: 26 additions & 17 deletions plugins/common/mqtt/mqtt_v5.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,19 @@ import (

"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/logger"
)

type mqttv5Client struct {
client *mqttv5auto.ConnectionManager
options mqttv5auto.ClientConfig
username config.Secret
password config.Secret
timeout time.Duration
qos int
retain bool
properties *mqttv5.PublishProperties
client *mqttv5auto.ConnectionManager
options mqttv5auto.ClientConfig
username config.Secret
password config.Secret
timeout time.Duration
qos int
retain bool
clientTrace bool
properties *mqttv5.PublishProperties
}

func NewMQTTv5Client(cfg *MqttConfig) (*mqttv5Client, error) {
Expand Down Expand Up @@ -94,13 +96,14 @@ func NewMQTTv5Client(cfg *MqttConfig) (*mqttv5Client, error) {
}

return &mqttv5Client{
options: opts,
timeout: time.Duration(cfg.Timeout),
username: cfg.Username,
password: cfg.Password,
qos: cfg.QoS,
retain: cfg.Retain,
properties: properties,
options: opts,
timeout: time.Duration(cfg.Timeout),
username: cfg.Username,
password: cfg.Password,
qos: cfg.QoS,
retain: cfg.Retain,
properties: properties,
clientTrace: cfg.ClientTrace,
}, nil
}

Expand All @@ -115,8 +118,14 @@ func (m *mqttv5Client) Connect() (bool, error) {
return false, fmt.Errorf("getting password failed: %w", err)
}
defer pass.Destroy()
m.options.ConnectUsername = user.TemporaryString()
m.options.ConnectPassword = pass.Bytes()
m.options.ConnectUsername = user.String()
m.options.ConnectPassword = []byte(pass.String())

if m.clientTrace {
log := mqttLogger{logger.NewLogger("paho", "", "")}
m.options.Debug = log
m.options.Errors = log
}

client, err := mqttv5auto.NewConnection(context.Background(), m.options)
if err != nil {
Expand Down
9 changes: 0 additions & 9 deletions plugins/inputs/mqtt_consumer/mqtt_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ type MQTTConsumer struct {
Password config.Secret `toml:"password"`
QoS int `toml:"qos"`
ConnectionTimeout config.Duration `toml:"connection_timeout"`
ClientTrace bool `toml:"client_trace"`
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
PersistentSession bool `toml:"persistent_session"`
ClientID string `toml:"client_id"`
Expand Down Expand Up @@ -105,14 +104,6 @@ func (m *MQTTConsumer) SetParser(parser telegraf.Parser) {
m.parser = parser
}
func (m *MQTTConsumer) Init() error {
if m.ClientTrace {
log := &mqttLogger{m.Log}
mqtt.ERROR = log
mqtt.CRITICAL = log
mqtt.WARN = log
mqtt.DEBUG = log
}

m.state = Disconnected
if m.PersistentSession && m.ClientID == "" {
return errors.New("persistent_session requires client_id")
Expand Down
6 changes: 6 additions & 0 deletions plugins/outputs/mqtt/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,12 @@ to use them.
## actually reads it
# retain = false

## Client trace messages
## When set to true, and debug mode enabled in the agent settings, the MQTT
## client's messages are included in telegraf logs. These messages are very
## noisey, but essential for debugging issues.
# client_trace = false

## Layout of the topics published.
## The following choices are available:
## non-batch -- send individual messages, one for each metric
Expand Down
6 changes: 6 additions & 0 deletions plugins/outputs/mqtt/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@
## actually reads it
# retain = false

## Client trace messages
## When set to true, and debug mode enabled in the agent settings, the MQTT
## client's messages are included in telegraf logs. These messages are very
## noisey, but essential for debugging issues.
# client_trace = false

## Layout of the topics published.
## The following choices are available:
## non-batch -- send individual messages, one for each metric
Expand Down
Loading