Skip to content

Commit

Permalink
MF-1630 - Replace old subscriptions with a new one instead of throwin…
Browse files Browse the repository at this point in the history
…g an error (#1633)

* updated pubsub.subscribe() to delete and renew already subscribed objects

Signed-off-by: aryan <aryangodara03@gmail.com>

* fixing sync.mutex error

Signed-off-by: aryan <aryangodara03@gmail.com>

* made changes to pubsub_test.go to fix failing CI

Signed-off-by: aryan <aryangodara03@gmail.com>

* made some changes to pubsub.go

Signed-off-by: aryan <aryangodara03@gmail.com>

* made some changes to pubsub.go added flag var

Signed-off-by: aryan <aryangodara03@gmail.com>

* little code cleanup

Signed-off-by: aryan <aryangodara03@gmail.com>

* minor change, to make code more intuitive

Signed-off-by: aryan <aryangodara03@gmail.com>

* added comment, removed the ws_client file

Signed-off-by: aryan <aryangodara03@gmail.com>

* removed ErrAlreadySubscribed

Signed-off-by: aryan <aryangodara03@gmail.com>

* fixed some failing tests, will check again after semaphore result

Signed-off-by: aryan <aryangodara03@gmail.com>

* made similar changes for MQTT and rabbitMQ

Signed-off-by: aryan <aryangodara03@gmail.com>

* removed extra comment

Signed-off-by: aryan <aryangodara03@gmail.com>

* removed extra comment

Signed-off-by: aryan <aryangodara03@gmail.com>

* removed extra comment

Signed-off-by: aryan <aryangodara03@gmail.com>

* to fix failing CI

Signed-off-by: aryan <aryangodara03@gmail.com>

* checking code after adding conditions

Signed-off-by: aryan <aryangodara03@gmail.com>

* added ps.mu.Lock(), and changed len(s)

Signed-off-by: aryan <aryangodara03@gmail.com>

* added tests for failing unsubscribe

Signed-off-by: aryan <aryangodara03@gmail.com>

* shifted defer lock

Signed-off-by: aryan <aryangodara03@gmail.com>

* fixed mqtt/pubsub

Signed-off-by: aryan <aryangodara03@gmail.com>

* fix mqtt/pubsub typo

Signed-off-by: aryan <aryangodara03@gmail.com>

* add comment to improve readability

Signed-off-by: aryan <aryangodara03@gmail.com>

* add comment to better improve readability

Signed-off-by: aryan <aryangodara03@gmail.com>

* add comments for docs

Signed-off-by: aryan <aryangodara03@gmail.com>

* improve doc comment

Signed-off-by: aryan <aryangodara03@gmail.com>

* change to mqtt/pubsub

Signed-off-by: aryan <aryangodara03@gmail.com>

* remove unnecessary mutex lock

Signed-off-by: aryan <aryangodara03@gmail.com>

* change to pubsub

Signed-off-by: aryan <aryangodara03@gmail.com>

* add comments to improve readability

Signed-off-by: aryan <aryangodara03@gmail.com>

* fix typo in comment

Signed-off-by: aryan <aryangodara03@gmail.com>

Signed-off-by: aryan <aryangodara03@gmail.com>
  • Loading branch information
AryanGodara authored Aug 12, 2022
1 parent 721ee54 commit d9c4704
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 22 deletions.
21 changes: 19 additions & 2 deletions pkg/messaging/mqtt/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ var (
errSubscribeTimeout = errors.New("failed to subscribe due to timeout reached")
errUnsubscribeTimeout = errors.New("failed to unsubscribe due to timeout reached")
errUnsubscribeDeleteTopic = errors.New("failed to unsubscribe due to deletion of topic")
errAlreadySubscribed = errors.New("already subscribed to topic")
errNotSubscribed = errors.New("not subscribed")
errEmptyTopic = errors.New("empty topic")
errEmptyID = errors.New("empty ID")
Expand All @@ -47,6 +46,7 @@ type pubsub struct {
subscriptions map[string]subscription
}

// NewPubSub returns MQTT message publisher/subscriber.
func NewPubSub(url, queue string, timeout time.Duration, logger log.Logger) (messaging.PubSub, error) {
client, err := newClient(url, "mqtt-publisher", timeout)
if err != nil {
Expand Down Expand Up @@ -80,7 +80,23 @@ func (ps pubsub) Subscribe(id, topic string, handler messaging.MessageHandler) e
case true:
// Check topic
if ok = s.contains(topic); ok {
return errAlreadySubscribed
// Unlocking, so that Unsubscribe() can access ps.subscriptions
ps.mu.Unlock()
err := ps.Unsubscribe(id, topic)
ps.mu.Lock() // Lock so that deferred unlock handle it
if err != nil {
return err
}
if len(ps.subscriptions) == 0 {
client, err := newClient(ps.address, id, ps.timeout)
if err != nil {
return err
}
s = subscription{
client: client,
topics: []string{topic},
}
}
}
s.topics = append(s.topics, topic)
default:
Expand All @@ -93,6 +109,7 @@ func (ps pubsub) Subscribe(id, topic string, handler messaging.MessageHandler) e
topics: []string{topic},
}
}

token := s.client.Subscribe(topic, qos, ps.mqttHandler(handler))
if token.Error() != nil {
return token.Error()
Expand Down
24 changes: 18 additions & 6 deletions pkg/messaging/nats/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ var (
ErrNotSubscribed = errors.New("not subscribed")
ErrEmptyTopic = errors.New("empty topic")
ErrEmptyID = errors.New("empty id")
ErrFailed = errors.New("failed")
)

var _ messaging.PubSub = (*pubsub)(nil)
Expand Down Expand Up @@ -69,20 +70,30 @@ func (ps *pubsub) Subscribe(id, topic string, handler messaging.MessageHandler)
if topic == "" {
return ErrEmptyTopic
}

ps.mu.Lock()
defer ps.mu.Unlock()
// Check topic
s, ok := ps.subscriptions[topic]
switch ok {
case true:
// Check topic ID
if ok {
// Check client ID
if _, ok := s[id]; ok {
return ErrAlreadySubscribed
// Unlocking, so that Unsubscribe() can access ps.subscriptions
ps.mu.Unlock()
if err := ps.Unsubscribe(id, topic); err != nil {
return err
}

ps.mu.Lock()
// value of s can be changed while ps.mu is unlocked
s = ps.subscriptions[topic]
}
default:
}
defer ps.mu.Unlock()
if s == nil {
s = make(map[string]subscription)
ps.subscriptions[topic] = s
}

nh := ps.natsHandler(handler)

if ps.queue != "" {
Expand All @@ -104,6 +115,7 @@ func (ps *pubsub) Subscribe(id, topic string, handler messaging.MessageHandler)
Subscription: sub,
cancel: handler.Cancel,
}

return nil
}

Expand Down
62 changes: 58 additions & 4 deletions pkg/messaging/nats/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,124 +88,173 @@ func TestPubsub(t *testing.T) {
clientID string
errorMessage error
pubsub bool //true for subscribe and false for unsubscribe
handler messaging.MessageHandler
}{
{
desc: "Subscribe to a topic with an ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
clientID: "clientid1",
errorMessage: nil,
pubsub: true,
handler: handler{false},
},
{
desc: "Subscribe to the same topic with a different ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
clientID: "clientid2",
errorMessage: nil,
pubsub: true,
handler: handler{false},
},
{
desc: "Subscribe to an already subscribed topic with an ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
clientID: "clientid1",
errorMessage: nats.ErrAlreadySubscribed,
errorMessage: nil,
pubsub: true,
handler: handler{false},
},
{
desc: "Unsubscribe from a topic with an ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
clientID: "clientid1",
errorMessage: nil,
pubsub: false,
handler: handler{false},
},
{
desc: "Unsubscribe from a non-existent topic with an ID",
topic: "h",
clientID: "clientid1",
errorMessage: nats.ErrNotSubscribed,
pubsub: false,
handler: handler{false},
},
{
desc: "Unsubscribe from the same topic with a different ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
clientID: "clientidd2",
errorMessage: nats.ErrNotSubscribed,
pubsub: false,
handler: handler{false},
},
{
desc: "Unsubscribe from the same topic with a different ID not subscribed",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
clientID: "clientidd3",
errorMessage: nats.ErrNotSubscribed,
pubsub: false,
handler: handler{false},
},
{
desc: "Unsubscribe from an already unsubscribed topic with an ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
clientID: "clientid1",
errorMessage: nats.ErrNotSubscribed,
pubsub: false,
handler: handler{false},
},
{
desc: "Subscribe to a topic with a subtopic with an ID",
topic: fmt.Sprintf("%s.%s.%s", chansPrefix, topic, subtopic),
clientID: "clientidd1",
errorMessage: nil,
pubsub: true,
handler: handler{false},
},
{
desc: "Subscribe to an already subscribed topic with a subtopic with an ID",
topic: fmt.Sprintf("%s.%s.%s", chansPrefix, topic, subtopic),
clientID: "clientidd1",
errorMessage: nats.ErrAlreadySubscribed,
errorMessage: nil,
pubsub: true,
handler: handler{false},
},
{
desc: "Unsubscribe from a topic with a subtopic with an ID",
topic: fmt.Sprintf("%s.%s.%s", chansPrefix, topic, subtopic),
clientID: "clientidd1",
errorMessage: nil,
pubsub: false,
handler: handler{false},
},
{
desc: "Unsubscribe from an already unsubscribed topic with a subtopic with an ID",
topic: fmt.Sprintf("%s.%s.%s", chansPrefix, topic, subtopic),
clientID: "clientid1",
errorMessage: nats.ErrNotSubscribed,
pubsub: false,
handler: handler{false},
},
{
desc: "Subscribe to an empty topic with an ID",
topic: "",
clientID: "clientid1",
errorMessage: nats.ErrEmptyTopic,
pubsub: true,
handler: handler{false},
},
{
desc: "Unsubscribe from an empty topic with an ID",
topic: "",
clientID: "clientid1",
errorMessage: nats.ErrEmptyTopic,
pubsub: false,
handler: handler{false},
},
{
desc: "Subscribe to a topic with empty id",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
clientID: "",
errorMessage: nats.ErrEmptyID,
pubsub: true,
handler: handler{false},
},
{
desc: "Unsubscribe from a topic with empty id",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
clientID: "",
errorMessage: nats.ErrEmptyID,
pubsub: false,
handler: handler{false},
},
{
desc: "Subscribe to another topic with an ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic+"1"),
clientID: "clientid3",
errorMessage: nil,
pubsub: true,
handler: handler{true},
},
{
desc: "Subscribe to another already subscribed topic with an ID with Unsubscribe failing",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic+"1"),
clientID: "clientid3",
errorMessage: nats.ErrFailed,
pubsub: true,
handler: handler{true},
},
{
desc: "Subscribe to a new topic with an ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic+"2"),
clientID: "clientid4",
errorMessage: nil,
pubsub: true,
handler: handler{true},
},
{
desc: "Unsubscribe from a topic with an ID with failing handler",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic+"2"),
clientID: "clientid4",
errorMessage: nats.ErrFailed,
pubsub: false,
handler: handler{true},
},
}

for _, pc := range subcases {
if pc.pubsub == true {
err := pubsub.Subscribe(pc.clientID, pc.topic, handler{})
err := pubsub.Subscribe(pc.clientID, pc.topic, pc.handler)
if pc.errorMessage == nil {
require.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err))
} else {
Expand All @@ -222,13 +271,18 @@ func TestPubsub(t *testing.T) {
}
}

type handler struct{}
type handler struct {
fail bool
}

func (h handler) Handle(msg messaging.Message) error {
msgChan <- msg
return nil
}

func (h handler) Cancel() error {
if h.fail {
return nats.ErrFailed
}
return nil
}
21 changes: 15 additions & 6 deletions pkg/messaging/rabbitmq/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ var (
ErrNotSubscribed = errors.New("not subscribed")
ErrEmptyTopic = errors.New("empty topic")
ErrEmptyID = errors.New("empty id")
ErrFailed = errors.New("failed")
)

var _ messaging.PubSub = (*pubsub)(nil)
Expand Down Expand Up @@ -72,16 +73,24 @@ func (ps *pubsub) Subscribe(id, topic string, handler messaging.MessageHandler)
return ErrEmptyTopic
}
ps.mu.Lock()
defer ps.mu.Unlock()
// Check topic
s, ok := ps.subscriptions[topic]
switch ok {
case true:
// Check topic ID
if ok {
// Check client ID
if _, ok := s[id]; ok {
return ErrAlreadySubscribed
// Unlocking, so that Unsubscribe() can access ps.subscriptions
ps.mu.Unlock()
if err := ps.Unsubscribe(id, topic); err != nil {
return err
}

ps.mu.Lock()
// value of s can be changed while ps.mu is unlocked
s = ps.subscriptions[topic]
}
default:
}
defer ps.mu.Unlock()
if s == nil {
s = make(map[string]subscription)
ps.subscriptions[topic] = s
}
Expand Down
Loading

0 comments on commit d9c4704

Please sign in to comment.