Skip to content

Commit

Permalink
NOISSUE - Refactor MQTT subscriber (#1561)
Browse files Browse the repository at this point in the history
* correct suscriber interface validator + refactore token error handling

Signed-off-by: tzzed <zerouali.t@gmail.com>

* apply review suggestion

Signed-off-by: tzzed <zerouali.t@gmail.com>

Co-authored-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
  • Loading branch information
tzzed and dborovcanin committed Feb 9, 2022
1 parent 1f8a221 commit f8ce94e
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 19 deletions.
11 changes: 5 additions & 6 deletions pkg/messaging/mqtt/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ import (
"time"

mqtt "github.com/eclipse/paho.mqtt.golang"

"github.com/mainflux/mainflux/pkg/messaging"
)

var _ messaging.Publisher = (*publisher)(nil)

var errPublishTimeout = errors.New("failed to publish due to timeout reached")

var _ messaging.Publisher = (*publisher)(nil)

type publisher struct {
client mqtt.Client
timeout time.Duration
Expand All @@ -40,11 +41,9 @@ func (pub publisher) Publish(topic string, msg messaging.Message) error {
return token.Error()
}
ok := token.WaitTimeout(pub.timeout)
if ok && token.Error() != nil {
return token.Error()
}
if !ok {
return errPublishTimeout
}
return nil

return token.Error()
}
7 changes: 4 additions & 3 deletions pkg/messaging/mqtt/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,13 @@ func newClient(address string, timeout time.Duration) (mqtt.Client, error) {
}

ok := token.WaitTimeout(timeout)
if ok && token.Error() != nil {
return nil, token.Error()
}
if !ok {
return nil, errConnect
}

if token.Error() != nil {
return nil, token.Error()
}

return client, nil
}
18 changes: 8 additions & 10 deletions pkg/messaging/mqtt/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,19 @@ import (

mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/gogo/protobuf/proto"

log "github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/pkg/errors"
"github.com/mainflux/mainflux/pkg/messaging"
)

var _ messaging.Publisher = (*publisher)(nil)

var (
errSubscribeTimeout = errors.New("failed to subscribe due to timeout reached")
errUnsubscribeTimeout = errors.New("failed to unsubscribe due to timeout reached")
)

var _ messaging.Subscriber = (*subscriber)(nil)

type subscriber struct {
client mqtt.Client
timeout time.Duration
Expand Down Expand Up @@ -48,28 +49,25 @@ func (sub subscriber) Subscribe(topic string, handler messaging.MessageHandler)
return token.Error()
}
ok := token.WaitTimeout(sub.timeout)
if ok && token.Error() != nil {
return token.Error()
}
if !ok {
return errSubscribeTimeout
}
return nil

return token.Error()
}

func (sub subscriber) Unsubscribe(topic string) error {
token := sub.client.Unsubscribe(topic)
if token.Error() != nil {
return token.Error()
}

ok := token.WaitTimeout(sub.timeout)
if ok && token.Error() != nil {
return token.Error()
}
if !ok {
return errUnsubscribeTimeout
}
return nil

return token.Error()
}

func (sub subscriber) mqttHandler(h messaging.MessageHandler) mqtt.MessageHandler {
Expand Down

0 comments on commit f8ce94e

Please sign in to comment.