Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MF-1630 - Replace old subscriptions with a new one instead of throwing an error #1633

Merged
merged 34 commits into from
Aug 12, 2022
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
a7da62c
updated pubsub.subscribe() to delete and renew already subscribed obj…
AryanGodara Jul 26, 2022
47e1026
fixing sync.mutex error
AryanGodara Jul 27, 2022
6d3aa7d
made changes to pubsub_test.go to fix failing CI
AryanGodara Jul 27, 2022
7bcca16
made some changes to pubsub.go
AryanGodara Jul 27, 2022
6eb49e8
made some changes to pubsub.go added flag var
AryanGodara Jul 27, 2022
5497d23
little code cleanup
AryanGodara Jul 28, 2022
76c0183
minor change, to make code more intuitive
AryanGodara Jul 28, 2022
6219f1d
added comment, removed the ws_client file
AryanGodara Jul 29, 2022
5b332c7
removed ErrAlreadySubscribed
AryanGodara Aug 2, 2022
d22fc22
fixed some failing tests, will check again after semaphore result
AryanGodara Aug 4, 2022
e9de8c1
made similar changes for MQTT and rabbitMQ
AryanGodara Aug 5, 2022
48875da
Merge branch 'master' of https://github.com/AryanGodara/mainflux into…
AryanGodara Aug 5, 2022
8ff2828
removed extra comment
AryanGodara Aug 5, 2022
a5e6434
removed extra comment
AryanGodara Aug 5, 2022
835922d
removed extra comment
AryanGodara Aug 6, 2022
a976de7
to fix failing CI
AryanGodara Aug 8, 2022
d21b917
checking code after adding conditions
AryanGodara Aug 9, 2022
2513f8c
Merge branch 'master' of https://github.com/AryanGodara/mainflux into…
AryanGodara Aug 11, 2022
cdb8d5d
added ps.mu.Lock(), and changed len(s)
AryanGodara Aug 11, 2022
a97cbd3
added tests for failing unsubscribe
AryanGodara Aug 11, 2022
b608f41
shifted defer lock
AryanGodara Aug 11, 2022
cf71814
Merge branch 'master' of https://github.com/AryanGodara/mainflux into…
AryanGodara Aug 11, 2022
7d53a14
fixed mqtt/pubsub
AryanGodara Aug 11, 2022
6fecbe3
fix mqtt/pubsub typo
AryanGodara Aug 11, 2022
58031ad
add comment to improve readability
AryanGodara Aug 11, 2022
351b3bc
add comment to better improve readability
AryanGodara Aug 11, 2022
20ca42a
add comments for docs
AryanGodara Aug 11, 2022
c13426b
improve doc comment
AryanGodara Aug 11, 2022
9a63589
Merge branch 'master' of https://github.com/AryanGodara/mainflux into…
AryanGodara Aug 12, 2022
c72e5fb
change to mqtt/pubsub
AryanGodara Aug 12, 2022
6076dbd
remove unnecessary mutex lock
AryanGodara Aug 12, 2022
c3e8065
change to pubsub
AryanGodara Aug 12, 2022
d484937
add comments to improve readability
AryanGodara Aug 12, 2022
ff6c6ba
fix typo in comment
AryanGodara Aug 12, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions pkg/messaging/mqtt/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ var (
errSubscribeTimeout = errors.New("failed to subscribe due to timeout reached")
errUnsubscribeTimeout = errors.New("failed to unsubscribe due to timeout reached")
errUnsubscribeDeleteTopic = errors.New("failed to unsubscribe due to deletion of topic")
errAlreadySubscribed = errors.New("already subscribed to topic")
errNotSubscribed = errors.New("not subscribed")
errEmptyTopic = errors.New("empty topic")
errEmptyID = errors.New("empty ID")
Expand Down Expand Up @@ -80,7 +79,20 @@ func (ps pubsub) Subscribe(id, topic string, handler messaging.MessageHandler) e
case true:
// Check topic
if ok = s.contains(topic); ok {
return errAlreadySubscribed
// Unlocking, so that Unsubscribe() can access ps.subscriptions
ps.mu.Unlock()
if err := ps.Unsubscribe(id, topic); err != nil {
return err
}
ps.mu.Lock()
AryanGodara marked this conversation as resolved.
Show resolved Hide resolved
client, err := newClient(ps.address, id, ps.timeout)
if err != nil {
return err
}
s = subscription{
client: client,
topics: []string{topic},
}
}
s.topics = append(s.topics, topic)
default:
Expand Down
11 changes: 10 additions & 1 deletion pkg/messaging/nats/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func (ps *pubsub) Subscribe(id, topic string, handler messaging.MessageHandler)
if topic == "" {
return ErrEmptyTopic
}

ps.mu.Lock()
defer ps.mu.Unlock()
// Check topic
Expand All @@ -77,7 +78,14 @@ func (ps *pubsub) Subscribe(id, topic string, handler messaging.MessageHandler)
case true:
// Check topic ID
if _, ok := s[id]; ok {
return ErrAlreadySubscribed
// Unlocking, so that Unsubscribe() can access ps.subscriptions
ps.mu.Unlock()
if err := ps.Unsubscribe(id, topic); err != nil {
return err
}
ps.mu.Lock()
AryanGodara marked this conversation as resolved.
Show resolved Hide resolved
s = make(map[string]subscription)
AryanGodara marked this conversation as resolved.
Show resolved Hide resolved
ps.subscriptions[topic] = s
}
default:
s = make(map[string]subscription)
Expand All @@ -104,6 +112,7 @@ func (ps *pubsub) Subscribe(id, topic string, handler messaging.MessageHandler)
Subscription: sub,
cancel: handler.Cancel,
}

return nil
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/messaging/nats/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func TestPubsub(t *testing.T) {
desc: "Subscribe to an already subscribed topic with an ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
clientID: "clientid1",
errorMessage: nats.ErrAlreadySubscribed,
errorMessage: nil,
pubsub: true,
},
{
Expand Down Expand Up @@ -156,7 +156,7 @@ func TestPubsub(t *testing.T) {
desc: "Subscribe to an already subscribed topic with a subtopic with an ID",
topic: fmt.Sprintf("%s.%s.%s", chansPrefix, topic, subtopic),
clientID: "clientidd1",
errorMessage: nats.ErrAlreadySubscribed,
errorMessage: nil,
pubsub: true,
},
{
Expand Down
10 changes: 9 additions & 1 deletion pkg/messaging/rabbitmq/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,15 @@ func (ps *pubsub) Subscribe(id, topic string, handler messaging.MessageHandler)
case true:
// Check topic ID
if _, ok := s[id]; ok {
return ErrAlreadySubscribed
// Unlocking, so Unsubscribe() can access ps.subscriptions
ps.mu.Unlock()
if err := ps.Unsubscribe(id, topic); err != nil {
return err
}
ps.mu.Lock()

s = make(map[string]subscription)
ps.subscriptions[topic] = s
}
default:
s = make(map[string]subscription)
Expand Down
4 changes: 2 additions & 2 deletions pkg/messaging/rabbitmq/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func TestPubsub(t *testing.T) {
desc: "Subscribe to an already subscribed topic with an ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
clientID: "clientid1",
errorMessage: rabbitmq.ErrAlreadySubscribed,
errorMessage: nil,
pubsub: true,
},
{
Expand Down Expand Up @@ -153,7 +153,7 @@ func TestPubsub(t *testing.T) {
desc: "Subscribe to an already subscribed topic with a subtopic with an ID",
topic: fmt.Sprintf("%s.%s.%s", chansPrefix, topic, subtopic),
clientID: "clientidd1",
errorMessage: rabbitmq.ErrAlreadySubscribed,
errorMessage: nil,
pubsub: true,
},
{
Expand Down