Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NOISSUE - Refactor messaging #1141

Merged
merged 28 commits into from
Apr 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
301c4f7
Refactor messaging
dborovcanin Apr 23, 2020
20e619d
Rename SubscribeHandler to MessageHandler
dborovcanin Apr 23, 2020
511f620
Remove `Auth` event logs
dborovcanin Apr 26, 2020
f64bce7
Update message pubsub APi
dborovcanin Apr 26, 2020
ecd5f73
Fix topics handling
dborovcanin Apr 26, 2020
62af5ac
Update CoAP adapter
dborovcanin Apr 26, 2020
4d806f4
Update Twins service
dborovcanin Apr 26, 2020
b599cc0
Update LoRa adapter
dborovcanin Apr 26, 2020
e4cd0c0
Update OPC UA adapter
dborovcanin Apr 26, 2020
f469a83
Remove broker package
dborovcanin Apr 26, 2020
bd5bec3
Update makefile
dborovcanin Apr 26, 2020
244a27b
Add comment explanation
dborovcanin Apr 26, 2020
a55fff5
Fix MQTT adapter
dborovcanin Apr 26, 2020
ccd15f3
Fix typo
dborovcanin Apr 27, 2020
a88c9cb
Move NATS pub/sub implementation to pubsub pkg
dborovcanin Apr 27, 2020
45b1407
Remove an empty line in main methods
dborovcanin Apr 27, 2020
49a0b66
Move messaging-related code to messaging package
dborovcanin Apr 27, 2020
1c50f8c
Fix Twins mocks
dborovcanin Apr 27, 2020
bd296d1
Change Occurred back to Created
dborovcanin Apr 27, 2020
b19d0f3
Fix tranformer test
dborovcanin Apr 27, 2020
f54047f
Fix message proto commands
dborovcanin Apr 27, 2020
7f1e76a
Replace string literal with constant
dborovcanin Apr 27, 2020
09165b7
Remove alias from main method
dborovcanin Apr 27, 2020
408c267
Change messaging pubsub alias
dborovcanin Apr 27, 2020
d22e451
Rename occured to created
dborovcanin Apr 27, 2020
ae76b43
Handle NATS connection in the NATS PubSub
dborovcanin Apr 27, 2020
0ca8c60
Rename n to pub/pubSub
dborovcanin Apr 27, 2020
c73fb47
Fix typos
dborovcanin Apr 28, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ test:

proto:
protoc --gofast_out=plugins=grpc:. *.proto
protoc --gogo_out=plugins=grpc:. broker/*.proto
protoc --gogo_out=plugins=grpc:. messaging/*.proto

$(SERVICES):
$(call compile_service,$(@))
Expand All @@ -83,7 +83,6 @@ $(DOCKERS_DEV):
$(call make_docker_dev,$(@))

dockers: $(DOCKERS)

dockers_dev: $(DOCKERS_DEV)

define docker_push
Expand Down
100 changes: 0 additions & 100 deletions broker/nats.go

This file was deleted.

10 changes: 5 additions & 5 deletions cmd/cassandra-writer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import (
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
"github.com/gocql/gocql"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/broker"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/messaging/nats"
"github.com/mainflux/mainflux/transformers/senml"
"github.com/mainflux/mainflux/writers"
"github.com/mainflux/mainflux/writers/api"
Expand Down Expand Up @@ -69,19 +69,19 @@ func main() {
log.Fatalf(err.Error())
}

b, err := broker.New(cfg.natsURL)
pubSub, err := nats.NewPubSub(cfg.natsURL, "", logger)
if err != nil {
logger.Error(err.Error())
logger.Error(fmt.Sprintf("Failed to connect to NATS: %s", err))
os.Exit(1)
}
defer b.Close()
defer pubSub.Close()

session := connectToCassandra(cfg.dbCfg, logger)
defer session.Close()

repo := newService(session, logger)
st := senml.New(cfg.contentType)
if err := writers.Start(b, repo, st, svcName, cfg.subjectsCfgPath, logger); err != nil {
if err := writers.Start(pubSub, repo, st, svcName, cfg.subjectsCfgPath, logger); err != nil {
logger.Error(fmt.Sprintf("Failed to create Cassandra writer: %s", err))
}

Expand Down
10 changes: 5 additions & 5 deletions cmd/coap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ import (
gocoap "github.com/dustin/go-coap"
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/broker"
"github.com/mainflux/mainflux/coap"
"github.com/mainflux/mainflux/coap/api"
logger "github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/messaging/nats"
thingsapi "github.com/mainflux/mainflux/things/api/auth/grpc"
opentracing "github.com/opentracing/opentracing-go"
stdprometheus "github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -81,14 +81,14 @@ func main() {
cc := thingsapi.NewClient(conn, thingsTracer, cfg.thingsAuthTimeout)
respChan := make(chan string, 10000)

b, err := broker.New(cfg.natsURL)
pubSub, err := nats.NewPubSub(cfg.natsURL, "", logger)
if err != nil {
logger.Error(err.Error())
logger.Error(fmt.Sprintf("Failed to connect to NATS: %s", err))
os.Exit(1)
}
defer b.Close()
defer pubSub.Close()

svc := coap.New(b, logger, cc, respChan)
svc := coap.New(pubSub, logger, cc, respChan)

svc = api.LoggingMiddleware(svc, logger)

Expand Down
15 changes: 7 additions & 8 deletions cmd/http/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ import (

kitprometheus "github.com/go-kit/kit/metrics/prometheus"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/broker"
adapter "github.com/mainflux/mainflux/http"
"github.com/mainflux/mainflux/http/api"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/messaging/nats"
thingsapi "github.com/mainflux/mainflux/things/api/auth/grpc"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go"
stdprometheus "github.com/prometheus/client_golang/prometheus"
jconfig "github.com/uber/jaeger-client-go/config"
"google.golang.org/grpc"
Expand Down Expand Up @@ -62,7 +62,6 @@ type config struct {
}

func main() {

cfg := loadConfig()

logger, err := logger.New(os.Stdout, cfg.logLevel)
Expand All @@ -79,15 +78,15 @@ func main() {
thingsTracer, thingsCloser := initJaeger("things", cfg.jaegerURL, logger)
defer thingsCloser.Close()

b, err := broker.New(cfg.natsURL)
pub, err := nats.NewPublisher(cfg.natsURL)
if err != nil {
logger.Error(err.Error())
logger.Error(fmt.Sprintf("Failed to connect to NATS: %s", err))
os.Exit(1)
}
defer b.Close()
defer pub.Close()

cc := thingsapi.NewClient(conn, thingsTracer, cfg.thingsAuthTimeout)
svc := adapter.New(b, cc)
tc := thingsapi.NewClient(conn, thingsTracer, cfg.thingsAuthTimeout)
svc := adapter.New(pub, tc)

svc = api.LoggingMiddleware(svc, logger)
svc = api.MetricsMiddleware(
Expand Down
11 changes: 6 additions & 5 deletions cmd/influxdb-writer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
influxdata "github.com/influxdata/influxdb/client/v2"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/broker"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/messaging/nats"
"github.com/mainflux/mainflux/transformers/senml"
"github.com/mainflux/mainflux/writers"
"github.com/mainflux/mainflux/writers/api"
Expand Down Expand Up @@ -70,12 +70,12 @@ func main() {
log.Fatalf(err.Error())
}

b, err := broker.New(cfg.natsURL)
pubSub, err := nats.NewPubSub(cfg.natsURL, "", logger)
if err != nil {
logger.Error(err.Error())
logger.Error(fmt.Sprintf("Failed to connect to NATS: %s", err))
os.Exit(1)
}
defer b.Close()
defer pubSub.Close()

client, err := influxdata.NewHTTPClient(clientCfg)
if err != nil {
Expand All @@ -90,7 +90,8 @@ func main() {
repo = api.LoggingMiddleware(repo, logger)
repo = api.MetricsMiddleware(repo, counter, latency)
st := senml.New(cfg.contentType)
if err := writers.Start(b, repo, st, svcName, cfg.subjectsCfgPath, logger); err != nil {

if err := writers.Start(pubSub, repo, st, svcName, cfg.subjectsCfgPath, logger); err != nil {
logger.Error(fmt.Sprintf("Failed to start InfluxDB writer: %s", err))
os.Exit(1)
}
Expand Down
10 changes: 5 additions & 5 deletions cmd/lora/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ import (
mqttPaho "github.com/eclipse/paho.mqtt.golang"
r "github.com/go-redis/redis"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/broker"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/lora"
"github.com/mainflux/mainflux/lora/api"
"github.com/mainflux/mainflux/lora/mqtt"
"github.com/mainflux/mainflux/messaging/nats"

kitprometheus "github.com/go-kit/kit/metrics/prometheus"
"github.com/mainflux/mainflux/lora/redis"
Expand Down Expand Up @@ -85,19 +85,19 @@ func main() {
esConn := connectToRedis(cfg.esURL, cfg.esPass, cfg.esDB, logger)
defer esConn.Close()

b, err := broker.New(cfg.natsURL)
pub, err := nats.NewPublisher(cfg.natsURL)
if err != nil {
logger.Error(err.Error())
logger.Error(fmt.Sprintf("Failed to connect to NATS: %s", err))
os.Exit(1)
}
defer b.Close()
defer pub.Close()

thingRM := newRouteMapRepositoy(rmConn, thingsRMPrefix, logger)
chanRM := newRouteMapRepositoy(rmConn, channelsRMPrefix, logger)

mqttConn := connectToMQTTBroker(cfg.loraMsgURL, logger)

svc := lora.New(b, thingRM, chanRM)
svc := lora.New(pub, thingRM, chanRM)
svc = api.LoggingMiddleware(svc, logger)
svc = api.MetricsMiddleware(
svc,
Expand Down
11 changes: 6 additions & 5 deletions cmd/mongodb-writer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (

kitprometheus "github.com/go-kit/kit/metrics/prometheus"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/broker"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/messaging/nats"
"github.com/mainflux/mainflux/transformers/senml"
"github.com/mainflux/mainflux/writers"
"github.com/mainflux/mainflux/writers/api"
Expand Down Expand Up @@ -66,12 +66,12 @@ func main() {
log.Fatal(err)
}

b, err := broker.New(cfg.natsURL)
pubSub, err := nats.NewPubSub(cfg.natsURL, "", logger)
if err != nil {
logger.Error(err.Error())
logger.Error(fmt.Sprintf("Failed to connect to NATS: %s", err))
os.Exit(1)
}
defer b.Close()
defer pubSub.Close()

addr := fmt.Sprintf("mongodb://%s:%s", cfg.dbHost, cfg.dbPort)
client, err := mongo.Connect(context.Background(), options.Client().ApplyURI(addr))
Expand All @@ -87,7 +87,8 @@ func main() {
repo = api.LoggingMiddleware(repo, logger)
repo = api.MetricsMiddleware(repo, counter, latency)
st := senml.New(cfg.contentType)
if err := writers.Start(b, repo, st, svcName, cfg.subjectsCfgPath, logger); err != nil {

if err := writers.Start(pubSub, repo, st, svcName, cfg.subjectsCfgPath, logger); err != nil {
logger.Error(fmt.Sprintf("Failed to start MongoDB writer: %s", err))
os.Exit(1)
}
Expand Down
11 changes: 6 additions & 5 deletions cmd/mqtt/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ import (

"github.com/go-redis/redis"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/broker"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/messaging"
"github.com/mainflux/mainflux/messaging/nats"
mqtt "github.com/mainflux/mainflux/mqtt"
mr "github.com/mainflux/mainflux/mqtt/redis"
thingsapi "github.com/mainflux/mainflux/things/api/auth/grpc"
Expand Down Expand Up @@ -129,17 +130,17 @@ func main() {

cc := thingsapi.NewClient(conn, thingsTracer, cfg.thingsAuthTimeout)

b, err := broker.New(cfg.natsURL)
pub, err := nats.NewPublisher(cfg.natsURL)
if err != nil {
logger.Error(err.Error())
logger.Error(fmt.Sprintf("Failed to connect to NATS: %s", err))
os.Exit(1)
}
defer b.Close()
defer pub.Close()

es := mr.NewEventStore(rc, cfg.instance)

// Event handler for MQTT hooks
evt := mqtt.New(b, cc, es, logger, tracer)
evt := mqtt.New([]messaging.Publisher{pub}, cc, es, logger, tracer)

errs := make(chan error, 2)

Expand Down
10 changes: 5 additions & 5 deletions cmd/opcua/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import (

r "github.com/go-redis/redis"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/broker"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/messaging/nats"
"github.com/mainflux/mainflux/opcua"
"github.com/mainflux/mainflux/opcua/api"
"github.com/mainflux/mainflux/opcua/db"
Expand Down Expand Up @@ -97,15 +97,15 @@ func main() {
esConn := connectToRedis(cfg.esURL, cfg.esPass, cfg.esDB, logger)
defer esConn.Close()

b, err := broker.New(cfg.natsURL)
pubSub, err := nats.NewPubSub(cfg.natsURL, "", logger)
if err != nil {
logger.Error(err.Error())
logger.Error(fmt.Sprintf("Failed to connect to NATS: %s", err))
os.Exit(1)
}
defer b.Close()
defer pubSub.Close()

ctx := context.Background()
sub := gopcua.NewSubscriber(ctx, b, thingRM, chanRM, connRM, logger)
sub := gopcua.NewSubscriber(ctx, pubSub, thingRM, chanRM, connRM, logger)
browser := gopcua.NewBrowser(ctx, logger)

svc := opcua.New(sub, browser, thingRM, chanRM, connRM, cfg.opcuaConfig, logger)
Expand Down
Loading