Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MF-1630 - Replace old subscriptions with a new one instead of throwing an error #1633

Merged
merged 34 commits into from
Aug 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
a7da62c
updated pubsub.subscribe() to delete and renew already subscribed obj…
AryanGodara Jul 26, 2022
47e1026
fixing sync.mutex error
AryanGodara Jul 27, 2022
6d3aa7d
made changes to pubsub_test.go to fix failing CI
AryanGodara Jul 27, 2022
7bcca16
made some changes to pubsub.go
AryanGodara Jul 27, 2022
6eb49e8
made some changes to pubsub.go added flag var
AryanGodara Jul 27, 2022
5497d23
little code cleanup
AryanGodara Jul 28, 2022
76c0183
minor change, to make code more intuitive
AryanGodara Jul 28, 2022
6219f1d
added comment, removed the ws_client file
AryanGodara Jul 29, 2022
5b332c7
removed ErrAlreadySubscribed
AryanGodara Aug 2, 2022
d22fc22
fixed some failing tests, will check again after semaphore result
AryanGodara Aug 4, 2022
e9de8c1
made similar changes for MQTT and rabbitMQ
AryanGodara Aug 5, 2022
48875da
Merge branch 'master' of https://github.com/AryanGodara/mainflux into…
AryanGodara Aug 5, 2022
8ff2828
removed extra comment
AryanGodara Aug 5, 2022
a5e6434
removed extra comment
AryanGodara Aug 5, 2022
835922d
removed extra comment
AryanGodara Aug 6, 2022
a976de7
to fix failing CI
AryanGodara Aug 8, 2022
d21b917
checking code after adding conditions
AryanGodara Aug 9, 2022
2513f8c
Merge branch 'master' of https://github.com/AryanGodara/mainflux into…
AryanGodara Aug 11, 2022
cdb8d5d
added ps.mu.Lock(), and changed len(s)
AryanGodara Aug 11, 2022
a97cbd3
added tests for failing unsubscribe
AryanGodara Aug 11, 2022
b608f41
shifted defer lock
AryanGodara Aug 11, 2022
cf71814
Merge branch 'master' of https://github.com/AryanGodara/mainflux into…
AryanGodara Aug 11, 2022
7d53a14
fixed mqtt/pubsub
AryanGodara Aug 11, 2022
6fecbe3
fix mqtt/pubsub typo
AryanGodara Aug 11, 2022
58031ad
add comment to improve readability
AryanGodara Aug 11, 2022
351b3bc
add comment to better improve readability
AryanGodara Aug 11, 2022
20ca42a
add comments for docs
AryanGodara Aug 11, 2022
c13426b
improve doc comment
AryanGodara Aug 11, 2022
9a63589
Merge branch 'master' of https://github.com/AryanGodara/mainflux into…
AryanGodara Aug 12, 2022
c72e5fb
change to mqtt/pubsub
AryanGodara Aug 12, 2022
6076dbd
remove unnecessary mutex lock
AryanGodara Aug 12, 2022
c3e8065
change to pubsub
AryanGodara Aug 12, 2022
d484937
add comments to improve readability
AryanGodara Aug 12, 2022
ff6c6ba
fix typo in comment
AryanGodara Aug 12, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions pkg/messaging/mqtt/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ var (
errSubscribeTimeout = errors.New("failed to subscribe due to timeout reached")
errUnsubscribeTimeout = errors.New("failed to unsubscribe due to timeout reached")
errUnsubscribeDeleteTopic = errors.New("failed to unsubscribe due to deletion of topic")
errAlreadySubscribed = errors.New("already subscribed to topic")
errNotSubscribed = errors.New("not subscribed")
errEmptyTopic = errors.New("empty topic")
errEmptyID = errors.New("empty ID")
Expand All @@ -47,6 +46,7 @@ type pubsub struct {
subscriptions map[string]subscription
}

// NewPubSub returns MQTT message publisher/subscriber.
func NewPubSub(url, queue string, timeout time.Duration, logger log.Logger) (messaging.PubSub, error) {
client, err := newClient(url, "mqtt-publisher", timeout)
if err != nil {
Expand Down Expand Up @@ -80,7 +80,23 @@ func (ps pubsub) Subscribe(id, topic string, handler messaging.MessageHandler) e
case true:
// Check topic
if ok = s.contains(topic); ok {
return errAlreadySubscribed
// Unlocking, so that Unsubscribe() can access ps.subscriptions
ps.mu.Unlock()
err := ps.Unsubscribe(id, topic)
ps.mu.Lock() // Lock so that deferred unlock handle it
if err != nil {
return err
}
if len(ps.subscriptions) == 0 {
client, err := newClient(ps.address, id, ps.timeout)
if err != nil {
return err
}
s = subscription{
client: client,
topics: []string{topic},
}
}
}
s.topics = append(s.topics, topic)
default:
Expand All @@ -93,6 +109,7 @@ func (ps pubsub) Subscribe(id, topic string, handler messaging.MessageHandler) e
topics: []string{topic},
}
}

token := s.client.Subscribe(topic, qos, ps.mqttHandler(handler))
if token.Error() != nil {
return token.Error()
Expand Down
24 changes: 18 additions & 6 deletions pkg/messaging/nats/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ var (
ErrNotSubscribed = errors.New("not subscribed")
ErrEmptyTopic = errors.New("empty topic")
ErrEmptyID = errors.New("empty id")
ErrFailed = errors.New("failed")
)

var _ messaging.PubSub = (*pubsub)(nil)
Expand Down Expand Up @@ -69,20 +70,30 @@ func (ps *pubsub) Subscribe(id, topic string, handler messaging.MessageHandler)
if topic == "" {
return ErrEmptyTopic
}

ps.mu.Lock()
defer ps.mu.Unlock()
// Check topic
s, ok := ps.subscriptions[topic]
switch ok {
case true:
// Check topic ID
if ok {
// Check client ID
if _, ok := s[id]; ok {
return ErrAlreadySubscribed
// Unlocking, so that Unsubscribe() can access ps.subscriptions
ps.mu.Unlock()
if err := ps.Unsubscribe(id, topic); err != nil {
return err
}

ps.mu.Lock()
AryanGodara marked this conversation as resolved.
Show resolved Hide resolved
// value of s can be changed while ps.mu is unlocked
s = ps.subscriptions[topic]
}
default:
}
defer ps.mu.Unlock()
if s == nil {
AryanGodara marked this conversation as resolved.
Show resolved Hide resolved
s = make(map[string]subscription)
ps.subscriptions[topic] = s
}

nh := ps.natsHandler(handler)

if ps.queue != "" {
Expand All @@ -104,6 +115,7 @@ func (ps *pubsub) Subscribe(id, topic string, handler messaging.MessageHandler)
Subscription: sub,
cancel: handler.Cancel,
}

return nil
}

Expand Down
62 changes: 58 additions & 4 deletions pkg/messaging/nats/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,124 +88,173 @@ func TestPubsub(t *testing.T) {
clientID string
errorMessage error
pubsub bool //true for subscribe and false for unsubscribe
handler messaging.MessageHandler
}{
{
desc: "Subscribe to a topic with an ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
clientID: "clientid1",
errorMessage: nil,
pubsub: true,
handler: handler{false},
},
{
desc: "Subscribe to the same topic with a different ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
clientID: "clientid2",
errorMessage: nil,
pubsub: true,
handler: handler{false},
},
{
desc: "Subscribe to an already subscribed topic with an ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
clientID: "clientid1",
errorMessage: nats.ErrAlreadySubscribed,
errorMessage: nil,
pubsub: true,
handler: handler{false},
},
{
desc: "Unsubscribe from a topic with an ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
clientID: "clientid1",
errorMessage: nil,
pubsub: false,
handler: handler{false},
},
{
desc: "Unsubscribe from a non-existent topic with an ID",
topic: "h",
clientID: "clientid1",
errorMessage: nats.ErrNotSubscribed,
pubsub: false,
handler: handler{false},
},
{
desc: "Unsubscribe from the same topic with a different ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
clientID: "clientidd2",
errorMessage: nats.ErrNotSubscribed,
pubsub: false,
handler: handler{false},
},
{
desc: "Unsubscribe from the same topic with a different ID not subscribed",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
clientID: "clientidd3",
errorMessage: nats.ErrNotSubscribed,
pubsub: false,
handler: handler{false},
},
{
desc: "Unsubscribe from an already unsubscribed topic with an ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
clientID: "clientid1",
errorMessage: nats.ErrNotSubscribed,
pubsub: false,
handler: handler{false},
},
{
desc: "Subscribe to a topic with a subtopic with an ID",
topic: fmt.Sprintf("%s.%s.%s", chansPrefix, topic, subtopic),
clientID: "clientidd1",
errorMessage: nil,
pubsub: true,
handler: handler{false},
},
{
desc: "Subscribe to an already subscribed topic with a subtopic with an ID",
topic: fmt.Sprintf("%s.%s.%s", chansPrefix, topic, subtopic),
clientID: "clientidd1",
errorMessage: nats.ErrAlreadySubscribed,
errorMessage: nil,
pubsub: true,
handler: handler{false},
},
{
desc: "Unsubscribe from a topic with a subtopic with an ID",
topic: fmt.Sprintf("%s.%s.%s", chansPrefix, topic, subtopic),
clientID: "clientidd1",
errorMessage: nil,
pubsub: false,
handler: handler{false},
},
{
desc: "Unsubscribe from an already unsubscribed topic with a subtopic with an ID",
topic: fmt.Sprintf("%s.%s.%s", chansPrefix, topic, subtopic),
clientID: "clientid1",
errorMessage: nats.ErrNotSubscribed,
pubsub: false,
handler: handler{false},
},
{
desc: "Subscribe to an empty topic with an ID",
topic: "",
clientID: "clientid1",
errorMessage: nats.ErrEmptyTopic,
pubsub: true,
handler: handler{false},
},
{
desc: "Unsubscribe from an empty topic with an ID",
topic: "",
clientID: "clientid1",
errorMessage: nats.ErrEmptyTopic,
pubsub: false,
handler: handler{false},
},
{
desc: "Subscribe to a topic with empty id",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
clientID: "",
errorMessage: nats.ErrEmptyID,
pubsub: true,
handler: handler{false},
},
{
desc: "Unsubscribe from a topic with empty id",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
clientID: "",
errorMessage: nats.ErrEmptyID,
pubsub: false,
handler: handler{false},
},
{
desc: "Subscribe to another topic with an ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic+"1"),
clientID: "clientid3",
errorMessage: nil,
pubsub: true,
handler: handler{true},
},
{
desc: "Subscribe to another already subscribed topic with an ID with Unsubscribe failing",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic+"1"),
clientID: "clientid3",
errorMessage: nats.ErrFailed,
pubsub: true,
handler: handler{true},
},
{
desc: "Subscribe to a new topic with an ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic+"2"),
clientID: "clientid4",
errorMessage: nil,
pubsub: true,
handler: handler{true},
},
{
desc: "Unsubscribe from a topic with an ID with failing handler",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic+"2"),
clientID: "clientid4",
errorMessage: nats.ErrFailed,
pubsub: false,
handler: handler{true},
},
}

for _, pc := range subcases {
if pc.pubsub == true {
err := pubsub.Subscribe(pc.clientID, pc.topic, handler{})
err := pubsub.Subscribe(pc.clientID, pc.topic, pc.handler)
if pc.errorMessage == nil {
require.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err))
} else {
Expand All @@ -222,13 +271,18 @@ func TestPubsub(t *testing.T) {
}
}

type handler struct{}
type handler struct {
fail bool
}

func (h handler) Handle(msg messaging.Message) error {
msgChan <- msg
return nil
}

func (h handler) Cancel() error {
if h.fail {
return nats.ErrFailed
}
return nil
}
21 changes: 15 additions & 6 deletions pkg/messaging/rabbitmq/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ var (
ErrNotSubscribed = errors.New("not subscribed")
ErrEmptyTopic = errors.New("empty topic")
ErrEmptyID = errors.New("empty id")
ErrFailed = errors.New("failed")
)

var _ messaging.PubSub = (*pubsub)(nil)
Expand Down Expand Up @@ -72,16 +73,24 @@ func (ps *pubsub) Subscribe(id, topic string, handler messaging.MessageHandler)
return ErrEmptyTopic
}
ps.mu.Lock()
defer ps.mu.Unlock()
// Check topic
s, ok := ps.subscriptions[topic]
switch ok {
case true:
// Check topic ID
if ok {
// Check client ID
if _, ok := s[id]; ok {
return ErrAlreadySubscribed
// Unlocking, so that Unsubscribe() can access ps.subscriptions
ps.mu.Unlock()
if err := ps.Unsubscribe(id, topic); err != nil {
return err
}

ps.mu.Lock()
// value of s can be changed while ps.mu is unlocked
s = ps.subscriptions[topic]
}
default:
}
defer ps.mu.Unlock()
if s == nil {
s = make(map[string]subscription)
ps.subscriptions[topic] = s
}
Expand Down
Loading