Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NOISSUE - Create broker package for NATS #1080

Merged
merged 27 commits into from
Apr 1, 2020
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions broker/broker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0

package broker

import (
"context"

"github.com/nats-io/nats.go"
)

// SubjectAllChannels allows to subscribe to all subjects of all channels
const SubjectAllChannels = "channel.>"
manuio marked this conversation as resolved.
Show resolved Hide resolved

// Nats specifies a NATS message API.
type Nats interface {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You think its better to have a separate interface for each broker that we use like nats kafka etc...? Not one Interface lets say Messenger that needs to be satisfied by broker imply for example nats.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree.

Copy link
Contributor Author

@manuio manuio Mar 31, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My first idea was to have a single interface. But it's not possible unless you create a new service for this because NATS interface will never match with Kafka one (it's doable for publish but not for subscribe). For now if we want to use this as package it's better to have different interfaces .

// Publish publishes message to the msessage broker.
Publish(context.Context, string, Message) error
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you pls put named params, it's easier to understand what are params. Now I see string string and dunno what it means.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's what we should do for all the interfaces, starting from here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not against but in most of our services we are not doing this. Let me open an issue to fix it everywhere


// Subscribe subscribes to the message broker for a given channel ID and subtopic.
Subscribe(string, string, func(msg *nats.Msg)) (*nats.Subscription, error)

// Close closes NATS connection.
Close()
}
5 changes: 3 additions & 2 deletions message.pb.go → broker/message.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

File renamed without changes.
72 changes: 72 additions & 0 deletions broker/nats/pubsub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0

// Package nats contains NATS message publisher implementation.
package nats

import (
"context"
"fmt"

"github.com/gogo/protobuf/proto"
"github.com/mainflux/mainflux/broker"
"github.com/mainflux/mainflux/errors"
"github.com/nats-io/nats.go"
)

const prefix = "channel"

var errNatsConn = errors.New("Failed to connect to NATS")

var _ broker.Nats = (*pubsub)(nil)

type pubsub struct {
conn *nats.Conn
}

// New returns NATS message publisher.
func New(url string) (broker.Nats, error) {
manuio marked this conversation as resolved.
Show resolved Hide resolved
nc, err := nats.Connect(url)
if err != nil {
return nil, errors.Wrap(errNatsConn, err)
}

return &pubsub{
conn: nc,
}, nil
}

func (ps pubsub) Publish(_ context.Context, _ string, msg broker.Message) error {
data, err := proto.Marshal(&msg)
if err != nil {
return err
}

subject := fmt.Sprintf("%s.%s", prefix, msg.Channel)
if msg.Subtopic != "" {
subject = fmt.Sprintf("%s.%s", subject, msg.Subtopic)
}
return ps.conn.Publish(subject, data)
}

func fmtSubject(chanID, subtopic string) string {
subject := fmt.Sprintf("%s.%s", prefix, chanID)
if subtopic != "" {
subject = fmt.Sprintf("%s.%s", subject, subtopic)
}
return subject
}

func (ps pubsub) Subscribe(chanID, subtopic string, f func(msg *nats.Msg)) (*nats.Subscription, error) {
subject := fmtSubject(chanID, subtopic)
sub, err := ps.conn.Subscribe(subject, f)
if err != nil {
return nil, err
}

return sub, nil
}

func (ps pubsub) Close() {
ps.conn.Close()
}
2 changes: 1 addition & 1 deletion cmd/cassandra-writer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"github.com/mainflux/mainflux/writers"
"github.com/mainflux/mainflux/writers/api"
"github.com/mainflux/mainflux/writers/cassandra"
nats "github.com/nats-io/nats.go"
"github.com/nats-io/nats.go"
stdprometheus "github.com/prometheus/client_golang/prometheus"
)

Expand Down
25 changes: 13 additions & 12 deletions cmd/coap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ import (
gocoap "github.com/dustin/go-coap"
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
"github.com/mainflux/mainflux"
broker "github.com/mainflux/mainflux/broker/nats"
"github.com/mainflux/mainflux/coap"
"github.com/mainflux/mainflux/coap/api"
"github.com/mainflux/mainflux/coap/nats"
logger "github.com/mainflux/mainflux/logger"
thingsapi "github.com/mainflux/mainflux/things/api/auth/grpc"
opentracing "github.com/opentracing/opentracing-go"
Expand All @@ -29,12 +29,12 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"

broker "github.com/nats-io/nats.go"
"github.com/nats-io/nats.go"
)

const (
defPort = "5683"
defNatsURL = broker.DefaultURL
defNatsURL = nats.DefaultURL
defThingsURL = "localhost:8181"
defLogLevel = "error"
defClientTLS = "false"
Expand Down Expand Up @@ -74,13 +74,6 @@ func main() {
log.Fatalf(err.Error())
}

nc, err := broker.Connect(cfg.natsURL)
if err != nil {
logger.Error(fmt.Sprintf("Failed to connect to NATS: %s", err))
os.Exit(1)
}
defer nc.Close()

conn := connectToThings(cfg, logger)
defer conn.Close()

Expand All @@ -89,8 +82,16 @@ func main() {

cc := thingsapi.NewClient(conn, thingsTracer, cfg.thingsTimeout)
respChan := make(chan string, 10000)
pubsub := nats.New(nc)
svc := coap.New(pubsub, cc, respChan)

pubsub, err := broker.New(cfg.natsURL)
if err != nil {
logger.Error(err.Error())
os.Exit(1)
}
defer pubsub.Close()

svc := coap.New(pubsub, logger, cc, respChan)

svc = api.LoggingMiddleware(svc, logger)

svc = api.MetricsMiddleware(
Expand Down
23 changes: 11 additions & 12 deletions cmd/http/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ import (

kitprometheus "github.com/go-kit/kit/metrics/prometheus"
"github.com/mainflux/mainflux"
broker "github.com/mainflux/mainflux/broker/nats"
adapter "github.com/mainflux/mainflux/http"
"github.com/mainflux/mainflux/http/api"
"github.com/mainflux/mainflux/http/nats"
"github.com/mainflux/mainflux/logger"
thingsapi "github.com/mainflux/mainflux/things/api/auth/grpc"
broker "github.com/nats-io/nats.go"
"github.com/nats-io/nats.go"
manuio marked this conversation as resolved.
Show resolved Hide resolved
opentracing "github.com/opentracing/opentracing-go"
stdprometheus "github.com/prometheus/client_golang/prometheus"
jconfig "github.com/uber/jaeger-client-go/config"
Expand All @@ -36,7 +36,7 @@ const (
defCACerts = ""
defPort = "8180"
defLogLevel = "error"
defNatsURL = broker.DefaultURL
defNatsURL = nats.DefaultURL
defThingsURL = "localhost:8181"
defJaegerURL = ""
defThingsTimeout = "1" // in seconds
Expand Down Expand Up @@ -71,13 +71,6 @@ func main() {
log.Fatalf(err.Error())
}

nc, err := broker.Connect(cfg.natsURL)
if err != nil {
logger.Error(fmt.Sprintf("Failed to connect to NATS: %s", err))
os.Exit(1)
}
defer nc.Close()

conn := connectToThings(cfg, logger)
defer conn.Close()

Expand All @@ -87,10 +80,16 @@ func main() {
thingsTracer, thingsCloser := initJaeger("things", cfg.jaegerURL, logger)
defer thingsCloser.Close()

pubsub, err := broker.New(cfg.natsURL)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it would be better to say nats.New(cfg.natsURL) you are making new nats conn from nats package (e.g isolated impl of nats broker) or tomorrow kafka.New(cfg.kafka) from broker/kafka package. Just an idea
Also I would prefer messenger or msg or whatever that sounds better then pubsub.Publish() for example

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree for pubsub, I was never convinced. I simply replaced it by b, err := broker.New(cfg.natsURL).
I also propose that we think about to use an interface for all brokers or different ones per broker.
In my last commit I replaced broker/nats/pubsub.go by broker/nats.go. There is no nats package anymore. Eventually I could replace broker.New by broker.NewNats

if err != nil {
logger.Error(err.Error())
os.Exit(1)
}
defer pubsub.Close()

cc := thingsapi.NewClient(conn, thingsTracer, cfg.thingsTimeout)
pub := nats.NewMessagePublisher(nc)
svc := adapter.New(pubsub, cc)

svc := adapter.New(pub, cc)
svc = api.LoggingMiddleware(svc, logger)
svc = api.MetricsMiddleware(
svc,
Expand Down
27 changes: 9 additions & 18 deletions cmd/lora/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ import (
mqttPaho "github.com/eclipse/paho.mqtt.golang"
r "github.com/go-redis/redis"
"github.com/mainflux/mainflux"
broker "github.com/mainflux/mainflux/broker/nats"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/lora"
"github.com/mainflux/mainflux/lora/api"
"github.com/mainflux/mainflux/lora/mqtt"
pub "github.com/mainflux/mainflux/lora/nats"

kitprometheus "github.com/go-kit/kit/metrics/prometheus"
"github.com/mainflux/mainflux/lora/redis"
nats "github.com/nats-io/nats.go"
"github.com/nats-io/nats.go"
stdprometheus "github.com/prometheus/client_golang/prometheus"
)

Expand Down Expand Up @@ -80,23 +80,25 @@ func main() {
log.Fatalf(err.Error())
}

natsConn := connectToNATS(cfg.natsURL, logger)
defer natsConn.Close()

rmConn := connectToRedis(cfg.routeMapURL, cfg.routeMapPass, cfg.routeMapDB, logger)
defer rmConn.Close()

esConn := connectToRedis(cfg.esURL, cfg.esPass, cfg.esDB, logger)
defer esConn.Close()

publisher := pub.NewMessagePublisher(natsConn)
pubsub, err := broker.New(cfg.natsURL)
if err != nil {
logger.Error(err.Error())
os.Exit(1)
}
defer pubsub.Close()

thingRM := newRouteMapRepositoy(rmConn, thingsRMPrefix, logger)
chanRM := newRouteMapRepositoy(rmConn, channelsRMPrefix, logger)

mqttConn := connectToMQTTBroker(cfg.loraMsgURL, logger)

svc := lora.New(publisher, thingRM, chanRM)
svc := lora.New(pubsub, thingRM, chanRM)
svc = api.LoggingMiddleware(svc, logger)
svc = api.MetricsMiddleware(
svc,
Expand Down Expand Up @@ -147,17 +149,6 @@ func loadConfig() config {
}
}

func connectToNATS(url string, logger logger.Logger) *nats.Conn {
conn, err := nats.Connect(url)
if err != nil {
logger.Error(fmt.Sprintf("Failed to connect to NATS: %s", err))
os.Exit(1)
}

logger.Info("Connected to NATS")
return conn
}

func connectToMQTTBroker(loraURL string, logger logger.Logger) mqttPaho.Client {
opts := mqttPaho.NewClientOptions()
opts.AddBroker(loraURL)
Expand Down
2 changes: 1 addition & 1 deletion cmd/mongodb-writer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"github.com/mainflux/mainflux/writers"
"github.com/mainflux/mainflux/writers/api"
"github.com/mainflux/mainflux/writers/mongodb"
nats "github.com/nats-io/nats.go"
"github.com/nats-io/nats.go"
stdprometheus "github.com/prometheus/client_golang/prometheus"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
Expand Down
28 changes: 13 additions & 15 deletions cmd/mqtt/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ import (

"github.com/go-redis/redis"
"github.com/mainflux/mainflux"
broker "github.com/mainflux/mainflux/broker/nats"
"github.com/mainflux/mainflux/logger"
mqtt "github.com/mainflux/mainflux/mqtt"
"github.com/mainflux/mainflux/mqtt/nats"
mr "github.com/mainflux/mainflux/mqtt/redis"
thingsapi "github.com/mainflux/mainflux/things/api/auth/grpc"
mp "github.com/mainflux/mproxy/pkg/mqtt"
ws "github.com/mainflux/mproxy/pkg/websocket"
broker "github.com/nats-io/nats.go"
"github.com/nats-io/nats.go"
opentracing "github.com/opentracing/opentracing-go"
jconfig "github.com/uber/jaeger-client-go/config"
"google.golang.org/grpc"
Expand Down Expand Up @@ -60,7 +60,7 @@ const (
envThingsURL = "MF_THINGS_URL"
envThingsTimeout = "MF_MQTT_ADAPTER_THINGS_TIMEOUT"
// Nats
defNatsURL = broker.DefaultURL
defNatsURL = nats.DefaultURL
envNatsURL = "MF_NATS_URL"
// Jaeger
defJaegerURL = ""
Expand Down Expand Up @@ -114,13 +114,6 @@ func main() {
log.Fatalf(err.Error())
}

nc, err := broker.Connect(cfg.natsURL)
if err != nil {
logger.Error(fmt.Sprintf("Failed to connect to NATS: %s", err))
os.Exit(1)
}
defer nc.Close()

conn := connectToThings(cfg, logger)
defer conn.Close()

Expand All @@ -130,17 +123,22 @@ func main() {
thingsTracer, thingsCloser := initJaeger("things", cfg.jaegerURL, logger)
defer thingsCloser.Close()

cc := thingsapi.NewClient(conn, thingsTracer, cfg.thingsTimeout)
pub := nats.NewMessagePublisher(nc)

rc := connectToRedis(cfg.esURL, cfg.esPass, cfg.esDB, logger)
defer rc.Close()

cc := thingsapi.NewClient(conn, thingsTracer, cfg.thingsTimeout)

pubsub, err := broker.New(cfg.natsURL)
if err != nil {
logger.Error(err.Error())
os.Exit(1)
}
defer pubsub.Close()

es := mr.NewEventStore(rc, cfg.instance)
pubs := []mainflux.MessagePublisher{pub}

// Event handler for MQTT hooks
evt := mqtt.New(cc, pubs, es, logger, tracer)
evt := mqtt.New(cc, pubsub, es, logger, tracer)

errs := make(chan error, 2)

Expand Down
Loading