Skip to content

Commit

Permalink
feat(io): support mqtt5
Browse files Browse the repository at this point in the history
Refactor to extract mqtt connection interface
Implement the interface for v4 and v5 separately
Fix subscription lock

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
  • Loading branch information
ngjaying committed Nov 20, 2024
1 parent 410f52d commit c969d2c
Show file tree
Hide file tree
Showing 13 changed files with 824 additions and 337 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ require (
github.com/denisenkom/go-mssqldb v0.12.3
github.com/dolthub/go-mysql-server v0.18.1
github.com/dop251/goja v0.0.0-20240828124009-016eb7256539
github.com/eclipse/paho.golang v0.21.0
github.com/eclipse/paho.mqtt.golang v1.5.0
github.com/edgexfoundry/go-mod-core-contracts/v3 v3.1.0
github.com/edgexfoundry/go-mod-messaging/v3 v3.1.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,8 @@ github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkp
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/dvsekhvalnov/jose2go v1.6.0 h1:Y9gnSnP4qEI0+/uQkHvFXeD2PLPJeXEL+ySMEA2EjTY=
github.com/dvsekhvalnov/jose2go v1.6.0/go.mod h1:QsHjhyTlD/lAVqn/NSbVZmSCGeDehTB/mPZadG+mhXU=
github.com/eclipse/paho.golang v0.21.0 h1:cxxEReu+iFbA5RrHfRGxJOh8tXZKDywuehneoeBeyn8=
github.com/eclipse/paho.golang v0.21.0/go.mod h1:GHF6vy7SvDbDHBguaUpfuBkEB5G6j0zKxMG4gbh6QRQ=
github.com/eclipse/paho.mqtt.golang v1.5.0 h1:EH+bUVJNgttidWFkLLVKaQPGmkTUfQQqjOsyvMGvD6o=
github.com/eclipse/paho.mqtt.golang v1.5.0/go.mod h1:du/2qNQVqJf/Sqs4MEL77kR8QTqANF7XU7Fk0aOTAgk=
github.com/edgexfoundry/go-mod-core-contracts/v3 v3.1.0 h1:KWSL0ZmFLJpscxs1lgSfQJAMLsCg1p4ZfVwxMVNiF5Y=
Expand Down
3 changes: 1 addition & 2 deletions internal/binder/io/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/lf-edge/ekuiper/v2/internal/io/http/httpserver"
"github.com/lf-edge/ekuiper/v2/internal/io/memory"
"github.com/lf-edge/ekuiper/v2/internal/io/mqtt"
mqttCon "github.com/lf-edge/ekuiper/v2/internal/io/mqtt/client"
"github.com/lf-edge/ekuiper/v2/internal/io/neuron"
"github.com/lf-edge/ekuiper/v2/internal/io/simulator"
"github.com/lf-edge/ekuiper/v2/internal/io/sink"
Expand Down Expand Up @@ -56,7 +55,7 @@ func init() {
modules.RegisterLookupSource("memory", memory.GetLookupSource)
modules.RegisterLookupSource("httppull", http.GetLookUpSource)

modules.RegisterConnection("mqtt", mqttCon.CreateConnection)
modules.RegisterConnection("mqtt", mqtt.CreateConnection)
modules.RegisterConnection("nng", nng.CreateConnection)
modules.RegisterConnection("httppush", httpserver.CreateConnection)
modules.RegisterConnection("websocket", httpserver.CreateWebsocketConnection)
Expand Down
242 changes: 17 additions & 225 deletions internal/io/mqtt/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,239 +15,31 @@
package client

import (
"crypto/tls"
"fmt"
"sync/atomic"
"time"

pahoMqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/google/uuid"
"github.com/lf-edge/ekuiper/contract/v2/api"

"github.com/lf-edge/ekuiper/v2/pkg/cast"
"github.com/lf-edge/ekuiper/v2/pkg/cert"
"github.com/lf-edge/ekuiper/v2/pkg/connection"
"github.com/lf-edge/ekuiper/v2/pkg/errorx"
"github.com/lf-edge/ekuiper/v2/pkg/modules"
)

type Connection struct {
pahoMqtt.Client
selId string
id string
logger api.Logger
connected atomic.Bool
status atomic.Value
scHandler api.StatusChangeHandler
conf *ConnectionConfig
// key is the topic. Each topic will have only one connector
subscriptions map[string]*subscriptionInfo
}

type ConnectionConfig struct {
Server string `json:"server"`
PVersion string `json:"protocolVersion"`
ClientId string `json:"clientid"`
Uname string `json:"username"`
Password string `json:"password"`
pversion uint // 3 or 4
tls *tls.Config
// Client is the interface for mqtt client. There are two implementations v4 and v5
type Client interface {
Connect(ctx api.StreamContext) error
Subscribe(ctx api.StreamContext, topic string, qos byte, callback MessageHandler) error
Unsubscribe(ctx api.StreamContext, topic string) error
Disconnect(ctx api.StreamContext)
Publish(ctx api.StreamContext, topic string, qos byte, retained bool, payload []byte, properties map[string]string) error
ParseMsg(ctx api.StreamContext, msg any) ([]byte, map[string]any, map[string]string)
}

type subscriptionInfo struct {
type SubscriptionInfo struct {
Qos byte
Handler pahoMqtt.MessageHandler
}

func CreateConnection(_ api.StreamContext) modules.Connection {
return &Connection{
subscriptions: make(map[string]*subscriptionInfo),
}
}

func (conn *Connection) Provision(ctx api.StreamContext, conId string, props map[string]any) error {
c, err := ValidateConfig(props)
if err != nil {
return err
}
opts := pahoMqtt.NewClientOptions().AddBroker(c.Server).SetProtocolVersion(c.pversion).SetAutoReconnect(true).SetMaxReconnectInterval(connection.DefaultMaxInterval).SetClientID(c.ClientId).SetTLSConfig(c.tls)

if c.Uname != "" {
opts = opts.SetUsername(c.Uname)
}
if c.Password != "" {
opts = opts.SetPassword(c.Password)
}

conn.status.Store(modules.ConnectionStatus{Status: api.ConnectionConnecting})
opts.OnConnect = conn.onConnect
opts.OnConnectionLost = conn.onConnectLost
opts.OnReconnecting = conn.onReconnecting

cli := pahoMqtt.NewClient(opts)
conn.logger = ctx.GetLogger()
conn.selId = c.ClientId
conn.Client = cli
conn.conf = c
conn.id = conId
return nil
}

func (conn *Connection) GetId(ctx api.StreamContext) string {
return conn.id
}

func (conn *Connection) Dial(ctx api.StreamContext) error {
token := conn.Client.Connect()
err := handleToken(token)
if err != nil {
return errorx.NewIOErr(fmt.Sprintf("found error when connecting for %s: %s", conn.conf.Server, err))
}
// store connected status immediately to avoid publish error due to onConnect is called slower
conn.connected.Store(true)
ctx.GetLogger().Infof("new mqtt client created")
return nil
}

func (conn *Connection) Status(_ api.StreamContext) modules.ConnectionStatus {
return conn.status.Load().(modules.ConnectionStatus)
}

func (conn *Connection) SetStatusChangeHandler(ctx api.StreamContext, sch api.StatusChangeHandler) {
st := conn.status.Load().(modules.ConnectionStatus)
sch(st.Status, st.ErrMsg)
conn.scHandler = sch
conn.logger.Infof("trigger status change handler")
Handler MessageHandler
}

func (conn *Connection) onConnect(_ pahoMqtt.Client) {
conn.connected.Store(true)
conn.status.Store(modules.ConnectionStatus{Status: api.ConnectionConnected})
if conn.scHandler != nil {
conn.scHandler(api.ConnectionConnected, "")
} else {
conn.logger.Warnf("sc handler has not set yet")
}
conn.logger.Infof("The connection to mqtt broker is established")
for topic, info := range conn.subscriptions {
err := conn.Subscribe(topic, info.Qos, info.Handler)
if err != nil { // should never happen. If happens because of connection, it will retry later
conn.logger.Errorf("Failed to subscribe topic %s: %v", topic, err)
}
}
}

func (conn *Connection) onConnectLost(_ pahoMqtt.Client, err error) {
conn.connected.Store(false)
conn.status.Store(modules.ConnectionStatus{Status: api.ConnectionDisconnected, ErrMsg: err.Error()})
if conn.scHandler != nil {
conn.scHandler(api.ConnectionDisconnected, err.Error())
}
conn.logger.Infof("%v", err)
}

func (conn *Connection) onReconnecting(_ pahoMqtt.Client, _ *pahoMqtt.ClientOptions) {
conn.status.Store(modules.ConnectionStatus{Status: api.ConnectionConnecting})
if conn.scHandler != nil {
conn.scHandler(api.ConnectionConnecting, "")
}
conn.logger.Debugf("Reconnecting to mqtt broker")
}

func (conn *Connection) DetachSub(ctx api.StreamContext, props map[string]any) {
topic, err := getTopicFromProps(props)
if err != nil {
return
}
delete(conn.subscriptions, topic)
conn.Client.Unsubscribe(topic)
}

func (conn *Connection) Close(ctx api.StreamContext) error {
if conn == nil || conn.Client == nil {
return nil
}
conn.Client.Disconnect(1)
return nil
}

func (conn *Connection) Ping(ctx api.StreamContext) error {
if conn.connected.Load() {
return nil
}
return conn.Dial(ctx)
}

// MQTT features

func (conn *Connection) Publish(topic string, qos byte, retained bool, payload any) error {
// Need to return error immediately so that we can enable cache immediately
if conn == nil || !conn.connected.Load() {
return errorx.NewIOErr("mqtt client is not connected")
}
token := conn.Client.Publish(topic, qos, retained, payload)
return handleToken(token)
}

func (conn *Connection) Subscribe(topic string, qos byte, callback pahoMqtt.MessageHandler) error {
conn.subscriptions[topic] = &subscriptionInfo{
Qos: qos,
Handler: callback,
}
token := conn.Client.Subscribe(topic, qos, callback)
return handleToken(token)
}

func handleToken(token pahoMqtt.Token) error {
if !token.WaitTimeout(5 * time.Second) {
return errorx.NewIOErr("timeout")
} else if token.Error() != nil {
return errorx.NewIOErr(token.Error().Error())
}
return nil
}

func ValidateConfig(props map[string]any) (*ConnectionConfig, error) {
c := &ConnectionConfig{PVersion: "3.1.1"}
err := cast.MapToStruct(props, c)
if err != nil {
return nil, err
}

if c.Server == "" {
return nil, fmt.Errorf("missing server property")
}

if c.ClientId == "" {
c.ClientId = uuid.New().String()
}
// Default to MQTT 3.1.1 or NanoMQ cannot connect
switch c.PVersion {
case "3.1":
c.pversion = 3
case "3.1.1", "4":
c.pversion = 4
default:
return nil, fmt.Errorf("unsupported protocol version %s", c.PVersion)
}
tlsConfig, err := cert.GenTLSConfig(props, "mqtt")
if err != nil {
return nil, err
}
c.tls = tlsConfig
return c, nil
type CommonConfig struct {
Server string `json:"server"`
PVersion string `json:"protocolVersion"`
}

const (
dataSourceProp = "datasource"
type (
ConnectHandler func(ctx api.StreamContext)
ConnectErrorHandler func(ctx api.StreamContext, e error)
MessageHandler func(ctx api.StreamContext, msg any)
)

func getTopicFromProps(props map[string]any) (string, error) {
v, ok := props[dataSourceProp]
if ok {
return v.(string), nil
}
return "", fmt.Errorf("topic or datasource not defined")
}

var _ modules.StatefulDialer = &Connection{}
80 changes: 0 additions & 80 deletions internal/io/mqtt/client/client_test.go

This file was deleted.

Loading

0 comments on commit c969d2c

Please sign in to comment.