Skip to content

Commit

Permalink
MF-1681 - Failed to reconnect to NATS (absmach#1686)
Browse files Browse the repository at this point in the history
* Make NATS reconnect never give up

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Extract maxReconnects to a constant

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>
Signed-off-by: rodneyosodo <blackd0t@protonmail.com>
  • Loading branch information
dborovcanin authored and rodneyosodo committed Feb 2, 2023
1 parent 1395308 commit 6a3b1b4
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 13 deletions.
2 changes: 1 addition & 1 deletion docker/brokers/nats.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
services:
broker:
image: nats:2.2.4-alpine
command: "-c /etc/nats/nats.conf"
command: "-c /etc/nats/nats.conf -DV"
volumes:
- ./../nats/:/etc/nats
ports:
Expand Down
7 changes: 6 additions & 1 deletion pkg/messaging/nats/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ import (
broker "github.com/nats-io/nats.go"
)

// A maximum number of reconnect attempts before NATS connection closes permanently.
// Value -1 represents an unlimited number of reconnect retries, i.e. the client
// will never give up on retrying to re-establish connection to NATS server.
const maxReconnects = -1

var _ messaging.Publisher = (*publisher)(nil)

type publisher struct {
Expand All @@ -22,7 +27,7 @@ type publisher struct {

// NewPublisher returns NATS message Publisher.
func NewPublisher(url string) (messaging.Publisher, error) {
conn, err := broker.Connect(url)
conn, err := broker.Connect(url, broker.MaxReconnects(maxReconnects))
if err != nil {
return nil, err
}
Expand Down
11 changes: 5 additions & 6 deletions pkg/messaging/nats/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@ import (

const chansPrefix = "channels"

// Publisher and Subscriber errors.
var (
ErrAlreadySubscribed = errors.New("already subscribed to topic")
ErrNotSubscribed = errors.New("not subscribed")
ErrEmptyTopic = errors.New("empty topic")
ErrEmptyID = errors.New("empty id")
ErrFailed = errors.New("failed")
ErrNotSubscribed = errors.New("not subscribed")
ErrEmptyTopic = errors.New("empty topic")
ErrEmptyID = errors.New("empty id")
)

var _ messaging.PubSub = (*pubsub)(nil)
Expand All @@ -48,7 +47,7 @@ type pubsub struct {
// here: https://docs.nats.io/developing-with-nats/receiving/queues.
// If the queue is empty, Subscribe will be used.
func NewPubSub(url, queue string, logger log.Logger) (messaging.PubSub, error) {
conn, err := broker.Connect(url)
conn, err := broker.Connect(url, broker.MaxReconnects(maxReconnects))
if err != nil {
return nil, err
}
Expand Down
12 changes: 7 additions & 5 deletions pkg/messaging/nats/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package nats_test

import (
"errors"
"fmt"
"testing"

Expand All @@ -22,8 +23,9 @@ const (
)

var (
msgChan = make(chan messaging.Message)
data = []byte("payload")
msgChan = make(chan messaging.Message)
data = []byte("payload")
errFailed = errors.New("failed")
)

func TestPublisher(t *testing.T) {
Expand Down Expand Up @@ -230,7 +232,7 @@ func TestPubsub(t *testing.T) {
desc: "Subscribe to another already subscribed topic with an ID with Unsubscribe failing",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic+"1"),
clientID: "clientid3",
errorMessage: nats.ErrFailed,
errorMessage: errFailed,
pubsub: true,
handler: handler{true},
},
Expand All @@ -246,7 +248,7 @@ func TestPubsub(t *testing.T) {
desc: "Unsubscribe from a topic with an ID with failing handler",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic+"2"),
clientID: "clientid4",
errorMessage: nats.ErrFailed,
errorMessage: errFailed,
pubsub: false,
handler: handler{true},
},
Expand Down Expand Up @@ -282,7 +284,7 @@ func (h handler) Handle(msg messaging.Message) error {

func (h handler) Cancel() error {
if h.fail {
return nats.ErrFailed
return errFailed
}
return nil
}

0 comments on commit 6a3b1b4

Please sign in to comment.