-
Notifications
You must be signed in to change notification settings - Fork 0
/
mqtt.go
92 lines (76 loc) · 2.35 KB
/
mqtt.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package main
import (
"os"
"strings"
"time"
MQTT "github.com/eclipse/paho.mqtt.golang"
"github.com/rs/zerolog/log"
)
const (
TIMEOUT time.Duration = time.Second * 10
SUBSCRIBE = APPNAME + "/#"
STATUS_TOPIC = APPNAME + "/status"
RESPONSE_TOPIC = APPNAME + "/response"
)
var mqttClient MQTT.Client
func sendToMtt(topic string, message string, retain bool) {
mqttClient.Publish(topic, byte(config.Mqtt.Qos), retain, message)
}
func receive(client MQTT.Client, msg MQTT.Message) {
topic := msg.Topic()
responseTopic := RESPONSE_TOPIC
if config.Rclone.ResponseTopic != "" {
responseTopic = config.Rclone.ResponseTopic
}
if topic != STATUS_TOPIC && topic != responseTopic {
message := string(msg.Payload()[:])
log.Trace().Msgf("MQTT Topic: %s", topic)
log.Trace().Msgf("MQTT Message: %s", message)
log.Trace().Msgf("MQTT Response Topic: %s", responseTopic)
command := topic[len(APPNAME):]
json := strings.TrimSpace(message)
if len(json) == 0 {
json = "{}"
}
response, err := sendToRclone(command, json)
if err != nil {
log.Fatal().Err(err)
return
}
sendToMtt(responseTopic, response, false)
}
}
func GetClientId() string {
hostname, _ := os.Hostname()
return APPNAME + "_" + hostname
}
func startMqttClient() {
opts := MQTT.NewClientOptions().AddBroker(config.Mqtt.Url)
if config.Mqtt.Username != "" && config.Mqtt.Password != "" {
opts.SetUsername(config.Mqtt.Username)
opts.SetPassword(config.Mqtt.Password)
}
opts.SetClientID(GetClientId())
opts.SetCleanSession(true)
opts.SetBinaryWill(STATUS_TOPIC, []byte("Offline"), 0, true)
opts.SetAutoReconnect(true)
opts.SetConnectionLostHandler(connLostHandler)
opts.SetOnConnectHandler(onConnectHandler)
mqttClient = MQTT.NewClient(opts)
token := mqttClient.Connect()
if token.WaitTimeout(TIMEOUT) && token.Error() != nil {
log.Fatal().Err(token.Error()).Msg("MQTT connection")
}
token = mqttClient.Publish(STATUS_TOPIC, 2, true, "Online")
token.Wait()
}
func connLostHandler(c MQTT.Client, err error) {
log.Fatal().Err(err).Msg("MQTT connection lost")
}
func onConnectHandler(c MQTT.Client) {
log.Debug().Msg("MQTT Client connected")
token := mqttClient.Subscribe(SUBSCRIBE, 0, receive)
if token.Wait() && token.Error() != nil {
log.Fatal().Err(token.Error()).Msgf("Could not subscribe to %s", SUBSCRIBE)
}
}