Skip to content

Commit

Permalink
fix(io): mqtt subscription concurrent map (#3475)
Browse files Browse the repository at this point in the history
Signed-off-by: Song Gao <disxiaofei@163.com>
  • Loading branch information
Yisaer authored Dec 25, 2024
1 parent da93e13 commit 32fa9d5
Showing 1 changed file with 16 additions and 10 deletions.
26 changes: 16 additions & 10 deletions internal/io/mqtt/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package client
import (
"crypto/tls"
"fmt"
"sync"
"sync/atomic"
"time"

Expand All @@ -41,7 +42,7 @@ type Connection struct {
scHandler api.StatusChangeHandler
conf *ConnectionConfig
// key is the topic. Each topic will have only one connector
subscriptions map[string]*subscriptionInfo
subscriptions sync.Map
}

type ConnectionConfig struct {
Expand All @@ -61,7 +62,7 @@ type subscriptionInfo struct {

func CreateConnection(_ api.StreamContext) modules.Connection {
return &Connection{
subscriptions: make(map[string]*subscriptionInfo),
subscriptions: sync.Map{},
}
}

Expand Down Expand Up @@ -129,12 +130,17 @@ func (conn *Connection) onConnect(_ pahoMqtt.Client) {
conn.logger.Warnf("sc handler has not set yet")
}
conn.logger.Infof("The connection to mqtt broker is established")
for topic, info := range conn.subscriptions {
err := conn.Subscribe(topic, info.Qos, info.Handler)
if err != nil { // should never happen. If happens because of connection, it will retry later
conn.logger.Errorf("Failed to subscribe topic %s: %v", topic, err)
conn.subscriptions.Range(func(key, value any) bool {
topic, ok1 := key.(string)
info, ok2 := value.(*subscriptionInfo)
if ok1 && ok2 {
err := conn.Subscribe(topic, info.Qos, info.Handler)
if err != nil { // should never happen. If happens because of connection, it will retry later
conn.logger.Errorf("Failed to subscribe topic %s: %v", topic, err)
}
}
}
return true
})
}

func (conn *Connection) onConnectLost(_ pahoMqtt.Client, err error) {
Expand All @@ -159,7 +165,7 @@ func (conn *Connection) DetachSub(ctx api.StreamContext, props map[string]any) {
if err != nil {
return
}
delete(conn.subscriptions, topic)
conn.subscriptions.Delete(topic)
conn.Client.Unsubscribe(topic)
}

Expand Down Expand Up @@ -190,10 +196,10 @@ func (conn *Connection) Publish(topic string, qos byte, retained bool, payload a
}

func (conn *Connection) Subscribe(topic string, qos byte, callback pahoMqtt.MessageHandler) error {
conn.subscriptions[topic] = &subscriptionInfo{
conn.subscriptions.Store(topic, &subscriptionInfo{
Qos: qos,
Handler: callback,
}
})
token := conn.Client.Subscribe(topic, qos, callback)
return handleToken(token)
}
Expand Down

0 comments on commit 32fa9d5

Please sign in to comment.