From 32fa9d5e55cbe14a0ec6ddc68476b705f1422618 Mon Sep 17 00:00:00 2001 From: Song Gao Date: Wed, 25 Dec 2024 18:38:31 +0800 Subject: [PATCH] fix(io): mqtt subscription concurrent map (#3475) Signed-off-by: Song Gao --- internal/io/mqtt/client/client.go | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/internal/io/mqtt/client/client.go b/internal/io/mqtt/client/client.go index 84c1f5489a..84ab19d455 100644 --- a/internal/io/mqtt/client/client.go +++ b/internal/io/mqtt/client/client.go @@ -17,6 +17,7 @@ package client import ( "crypto/tls" "fmt" + "sync" "sync/atomic" "time" @@ -41,7 +42,7 @@ type Connection struct { scHandler api.StatusChangeHandler conf *ConnectionConfig // key is the topic. Each topic will have only one connector - subscriptions map[string]*subscriptionInfo + subscriptions sync.Map } type ConnectionConfig struct { @@ -61,7 +62,7 @@ type subscriptionInfo struct { func CreateConnection(_ api.StreamContext) modules.Connection { return &Connection{ - subscriptions: make(map[string]*subscriptionInfo), + subscriptions: sync.Map{}, } } @@ -129,12 +130,17 @@ func (conn *Connection) onConnect(_ pahoMqtt.Client) { conn.logger.Warnf("sc handler has not set yet") } conn.logger.Infof("The connection to mqtt broker is established") - for topic, info := range conn.subscriptions { - err := conn.Subscribe(topic, info.Qos, info.Handler) - if err != nil { // should never happen. If happens because of connection, it will retry later - conn.logger.Errorf("Failed to subscribe topic %s: %v", topic, err) + conn.subscriptions.Range(func(key, value any) bool { + topic, ok1 := key.(string) + info, ok2 := value.(*subscriptionInfo) + if ok1 && ok2 { + err := conn.Subscribe(topic, info.Qos, info.Handler) + if err != nil { // should never happen. If happens because of connection, it will retry later + conn.logger.Errorf("Failed to subscribe topic %s: %v", topic, err) + } } - } + return true + }) } func (conn *Connection) onConnectLost(_ pahoMqtt.Client, err error) { @@ -159,7 +165,7 @@ func (conn *Connection) DetachSub(ctx api.StreamContext, props map[string]any) { if err != nil { return } - delete(conn.subscriptions, topic) + conn.subscriptions.Delete(topic) conn.Client.Unsubscribe(topic) } @@ -190,10 +196,10 @@ func (conn *Connection) Publish(topic string, qos byte, retained bool, payload a } func (conn *Connection) Subscribe(topic string, qos byte, callback pahoMqtt.MessageHandler) error { - conn.subscriptions[topic] = &subscriptionInfo{ + conn.subscriptions.Store(topic, &subscriptionInfo{ Qos: qos, Handler: callback, - } + }) token := conn.Client.Subscribe(topic, qos, callback) return handleToken(token) }