Skip to content

Commit

Permalink
NOISSUE - Refactor messaging (#1141)
Browse files Browse the repository at this point in the history
* Refactor messaging

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Rename SubscribeHandler to MessageHandler

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Remove `Auth` event logs

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Update message pubsub APi

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Fix topics handling

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Update CoAP adapter

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Update Twins service

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Update LoRa adapter

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Update OPC UA adapter

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Remove broker package

Package `broker` is conceptually renamed to package `nats`.

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Update makefile

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Add comment explanation

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Fix MQTT adapter

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Fix typo

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Move NATS pub/sub implementation to pubsub pkg

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Remove an empty line in main methods

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Move messaging-related code to messaging package

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Fix Twins mocks

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Change Occurred back to Created

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Fix tranformer test

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Fix message proto commands

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Replace string literal with constant

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Remove alias from main method

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Change messaging pubsub alias

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Rename occured to created

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Handle NATS connection in the NATS PubSub

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Rename n to pub/pubSub

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Fix typos

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
  • Loading branch information
dborovcanin authored Apr 28, 2020
1 parent 72067bf commit 3b0e85e
Show file tree
Hide file tree
Showing 40 changed files with 512 additions and 423 deletions.
3 changes: 1 addition & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ test:

proto:
protoc --gofast_out=plugins=grpc:. *.proto
protoc --gogo_out=plugins=grpc:. broker/*.proto
protoc --gogo_out=plugins=grpc:. messaging/*.proto

$(SERVICES):
$(call compile_service,$(@))
Expand All @@ -83,7 +83,6 @@ $(DOCKERS_DEV):
$(call make_docker_dev,$(@))

dockers: $(DOCKERS)

dockers_dev: $(DOCKERS_DEV)

define docker_push
Expand Down
100 changes: 0 additions & 100 deletions broker/nats.go

This file was deleted.

10 changes: 5 additions & 5 deletions cmd/cassandra-writer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import (
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
"github.com/gocql/gocql"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/broker"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/messaging/nats"
"github.com/mainflux/mainflux/transformers/senml"
"github.com/mainflux/mainflux/writers"
"github.com/mainflux/mainflux/writers/api"
Expand Down Expand Up @@ -69,19 +69,19 @@ func main() {
log.Fatalf(err.Error())
}

b, err := broker.New(cfg.natsURL)
pubSub, err := nats.NewPubSub(cfg.natsURL, "", logger)
if err != nil {
logger.Error(err.Error())
logger.Error(fmt.Sprintf("Failed to connect to NATS: %s", err))
os.Exit(1)
}
defer b.Close()
defer pubSub.Close()

session := connectToCassandra(cfg.dbCfg, logger)
defer session.Close()

repo := newService(session, logger)
st := senml.New(cfg.contentType)
if err := writers.Start(b, repo, st, svcName, cfg.subjectsCfgPath, logger); err != nil {
if err := writers.Start(pubSub, repo, st, svcName, cfg.subjectsCfgPath, logger); err != nil {
logger.Error(fmt.Sprintf("Failed to create Cassandra writer: %s", err))
}

Expand Down
10 changes: 5 additions & 5 deletions cmd/coap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ import (
gocoap "github.com/dustin/go-coap"
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/broker"
"github.com/mainflux/mainflux/coap"
"github.com/mainflux/mainflux/coap/api"
logger "github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/messaging/nats"
thingsapi "github.com/mainflux/mainflux/things/api/auth/grpc"
opentracing "github.com/opentracing/opentracing-go"
stdprometheus "github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -81,14 +81,14 @@ func main() {
cc := thingsapi.NewClient(conn, thingsTracer, cfg.thingsAuthTimeout)
respChan := make(chan string, 10000)

b, err := broker.New(cfg.natsURL)
pubSub, err := nats.NewPubSub(cfg.natsURL, "", logger)
if err != nil {
logger.Error(err.Error())
logger.Error(fmt.Sprintf("Failed to connect to NATS: %s", err))
os.Exit(1)
}
defer b.Close()
defer pubSub.Close()

svc := coap.New(b, logger, cc, respChan)
svc := coap.New(pubSub, logger, cc, respChan)

svc = api.LoggingMiddleware(svc, logger)

Expand Down
15 changes: 7 additions & 8 deletions cmd/http/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ import (

kitprometheus "github.com/go-kit/kit/metrics/prometheus"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/broker"
adapter "github.com/mainflux/mainflux/http"
"github.com/mainflux/mainflux/http/api"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/messaging/nats"
thingsapi "github.com/mainflux/mainflux/things/api/auth/grpc"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go"
stdprometheus "github.com/prometheus/client_golang/prometheus"
jconfig "github.com/uber/jaeger-client-go/config"
"google.golang.org/grpc"
Expand Down Expand Up @@ -62,7 +62,6 @@ type config struct {
}

func main() {

cfg := loadConfig()

logger, err := logger.New(os.Stdout, cfg.logLevel)
Expand All @@ -79,15 +78,15 @@ func main() {
thingsTracer, thingsCloser := initJaeger("things", cfg.jaegerURL, logger)
defer thingsCloser.Close()

b, err := broker.New(cfg.natsURL)
pub, err := nats.NewPublisher(cfg.natsURL)
if err != nil {
logger.Error(err.Error())
logger.Error(fmt.Sprintf("Failed to connect to NATS: %s", err))
os.Exit(1)
}
defer b.Close()
defer pub.Close()

cc := thingsapi.NewClient(conn, thingsTracer, cfg.thingsAuthTimeout)
svc := adapter.New(b, cc)
tc := thingsapi.NewClient(conn, thingsTracer, cfg.thingsAuthTimeout)
svc := adapter.New(pub, tc)

svc = api.LoggingMiddleware(svc, logger)
svc = api.MetricsMiddleware(
Expand Down
11 changes: 6 additions & 5 deletions cmd/influxdb-writer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
influxdata "github.com/influxdata/influxdb/client/v2"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/broker"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/messaging/nats"
"github.com/mainflux/mainflux/transformers/senml"
"github.com/mainflux/mainflux/writers"
"github.com/mainflux/mainflux/writers/api"
Expand Down Expand Up @@ -70,12 +70,12 @@ func main() {
log.Fatalf(err.Error())
}

b, err := broker.New(cfg.natsURL)
pubSub, err := nats.NewPubSub(cfg.natsURL, "", logger)
if err != nil {
logger.Error(err.Error())
logger.Error(fmt.Sprintf("Failed to connect to NATS: %s", err))
os.Exit(1)
}
defer b.Close()
defer pubSub.Close()

client, err := influxdata.NewHTTPClient(clientCfg)
if err != nil {
Expand All @@ -90,7 +90,8 @@ func main() {
repo = api.LoggingMiddleware(repo, logger)
repo = api.MetricsMiddleware(repo, counter, latency)
st := senml.New(cfg.contentType)
if err := writers.Start(b, repo, st, svcName, cfg.subjectsCfgPath, logger); err != nil {

if err := writers.Start(pubSub, repo, st, svcName, cfg.subjectsCfgPath, logger); err != nil {
logger.Error(fmt.Sprintf("Failed to start InfluxDB writer: %s", err))
os.Exit(1)
}
Expand Down
10 changes: 5 additions & 5 deletions cmd/lora/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ import (
mqttPaho "github.com/eclipse/paho.mqtt.golang"
r "github.com/go-redis/redis"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/broker"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/lora"
"github.com/mainflux/mainflux/lora/api"
"github.com/mainflux/mainflux/lora/mqtt"
"github.com/mainflux/mainflux/messaging/nats"

kitprometheus "github.com/go-kit/kit/metrics/prometheus"
"github.com/mainflux/mainflux/lora/redis"
Expand Down Expand Up @@ -85,19 +85,19 @@ func main() {
esConn := connectToRedis(cfg.esURL, cfg.esPass, cfg.esDB, logger)
defer esConn.Close()

b, err := broker.New(cfg.natsURL)
pub, err := nats.NewPublisher(cfg.natsURL)
if err != nil {
logger.Error(err.Error())
logger.Error(fmt.Sprintf("Failed to connect to NATS: %s", err))
os.Exit(1)
}
defer b.Close()
defer pub.Close()

thingRM := newRouteMapRepositoy(rmConn, thingsRMPrefix, logger)
chanRM := newRouteMapRepositoy(rmConn, channelsRMPrefix, logger)

mqttConn := connectToMQTTBroker(cfg.loraMsgURL, logger)

svc := lora.New(b, thingRM, chanRM)
svc := lora.New(pub, thingRM, chanRM)
svc = api.LoggingMiddleware(svc, logger)
svc = api.MetricsMiddleware(
svc,
Expand Down
11 changes: 6 additions & 5 deletions cmd/mongodb-writer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (

kitprometheus "github.com/go-kit/kit/metrics/prometheus"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/broker"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/messaging/nats"
"github.com/mainflux/mainflux/transformers/senml"
"github.com/mainflux/mainflux/writers"
"github.com/mainflux/mainflux/writers/api"
Expand Down Expand Up @@ -66,12 +66,12 @@ func main() {
log.Fatal(err)
}

b, err := broker.New(cfg.natsURL)
pubSub, err := nats.NewPubSub(cfg.natsURL, "", logger)
if err != nil {
logger.Error(err.Error())
logger.Error(fmt.Sprintf("Failed to connect to NATS: %s", err))
os.Exit(1)
}
defer b.Close()
defer pubSub.Close()

addr := fmt.Sprintf("mongodb://%s:%s", cfg.dbHost, cfg.dbPort)
client, err := mongo.Connect(context.Background(), options.Client().ApplyURI(addr))
Expand All @@ -87,7 +87,8 @@ func main() {
repo = api.LoggingMiddleware(repo, logger)
repo = api.MetricsMiddleware(repo, counter, latency)
st := senml.New(cfg.contentType)
if err := writers.Start(b, repo, st, svcName, cfg.subjectsCfgPath, logger); err != nil {

if err := writers.Start(pubSub, repo, st, svcName, cfg.subjectsCfgPath, logger); err != nil {
logger.Error(fmt.Sprintf("Failed to start MongoDB writer: %s", err))
os.Exit(1)
}
Expand Down
11 changes: 6 additions & 5 deletions cmd/mqtt/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ import (

"github.com/go-redis/redis"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/broker"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/messaging"
"github.com/mainflux/mainflux/messaging/nats"
mqtt "github.com/mainflux/mainflux/mqtt"
mr "github.com/mainflux/mainflux/mqtt/redis"
thingsapi "github.com/mainflux/mainflux/things/api/auth/grpc"
Expand Down Expand Up @@ -129,17 +130,17 @@ func main() {

cc := thingsapi.NewClient(conn, thingsTracer, cfg.thingsAuthTimeout)

b, err := broker.New(cfg.natsURL)
pub, err := nats.NewPublisher(cfg.natsURL)
if err != nil {
logger.Error(err.Error())
logger.Error(fmt.Sprintf("Failed to connect to NATS: %s", err))
os.Exit(1)
}
defer b.Close()
defer pub.Close()

es := mr.NewEventStore(rc, cfg.instance)

// Event handler for MQTT hooks
evt := mqtt.New(b, cc, es, logger, tracer)
evt := mqtt.New([]messaging.Publisher{pub}, cc, es, logger, tracer)

errs := make(chan error, 2)

Expand Down
10 changes: 5 additions & 5 deletions cmd/opcua/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import (

r "github.com/go-redis/redis"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/broker"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/messaging/nats"
"github.com/mainflux/mainflux/opcua"
"github.com/mainflux/mainflux/opcua/api"
"github.com/mainflux/mainflux/opcua/db"
Expand Down Expand Up @@ -97,15 +97,15 @@ func main() {
esConn := connectToRedis(cfg.esURL, cfg.esPass, cfg.esDB, logger)
defer esConn.Close()

b, err := broker.New(cfg.natsURL)
pubSub, err := nats.NewPubSub(cfg.natsURL, "", logger)
if err != nil {
logger.Error(err.Error())
logger.Error(fmt.Sprintf("Failed to connect to NATS: %s", err))
os.Exit(1)
}
defer b.Close()
defer pubSub.Close()

ctx := context.Background()
sub := gopcua.NewSubscriber(ctx, b, thingRM, chanRM, connRM, logger)
sub := gopcua.NewSubscriber(ctx, pubSub, thingRM, chanRM, connRM, logger)
browser := gopcua.NewBrowser(ctx, logger)

svc := opcua.New(sub, browser, thingRM, chanRM, connRM, cfg.opcuaConfig, logger)
Expand Down
Loading

0 comments on commit 3b0e85e

Please sign in to comment.