Skip to content

Commit

Permalink
MF-780 - Use Normalizer as a lib (#915)
Browse files Browse the repository at this point in the history
* Use Normalizer as a lib

To normalize messages on the consumer side, Normalizer is moved
to the internal pkgs. Writers being message consumers are modified to
do message normalization instead of subscribing to normalized messages
subject.

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Fix logging middleware for readers and writers

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Remove normalizer interface

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Use Normalizer in writers

As we agreed on #919, we'll use normalizer as an interface and provide
the default SenML implementation. Because of that, Normalizer is removed
from `internal` and we'll use the project structure proposed in the
aforementioned issue.

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Fix tests

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Remove unused batch settings from influxDB reader

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Update docs

Move Normalizer service to `addons`.

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Rename channels input topic

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Update Noramlizer docs

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Remove commented code

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Update readers logging

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Update addons docker-compose files

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Update topcis explanations

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
  • Loading branch information
dborovcanin authored and drasko committed Oct 31, 2019
1 parent 380af87 commit 8be2516
Show file tree
Hide file tree
Showing 53 changed files with 341 additions and 403 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# SPDX-License-Identifier: Apache-2.0

BUILD_DIR = build
SERVICES = users things http normalizer ws coap lora influxdb-writer influxdb-reader mongodb-writer mongodb-reader cassandra-writer cassandra-reader postgres-writer postgres-reader cli bootstrap opcua
SERVICES = users things http ws coap lora normalizer influxdb-writer influxdb-reader mongodb-writer mongodb-reader cassandra-writer cassandra-reader postgres-writer postgres-reader cli bootstrap opcua
DOCKERS = $(addprefix docker_,$(SERVICES))
DOCKERS_DEV = $(addprefix docker_dev_,$(SERVICES))
CGO_ENABLED ?= 0
Expand Down
4 changes: 3 additions & 1 deletion cmd/cassandra-writer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/gocql/gocql"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/normalizer"
"github.com/mainflux/mainflux/writers"
"github.com/mainflux/mainflux/writers/api"
"github.com/mainflux/mainflux/writers/cassandra"
Expand Down Expand Up @@ -74,7 +75,8 @@ func main() {
defer session.Close()

repo := newService(session, logger)
if err := writers.Start(nc, repo, svcName, cfg.channels, logger); err != nil {
norm := normalizer.New()
if err := writers.Start(nc, repo, norm, svcName, cfg.channels, logger); err != nil {
logger.Error(fmt.Sprintf("Failed to create Cassandra writer: %s", err))
}

Expand Down
107 changes: 41 additions & 66 deletions cmd/influxdb-writer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,14 @@ import (
"net/http"
"os"
"os/signal"
"strconv"
"syscall"
"time"

"github.com/BurntSushi/toml"
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
influxdata "github.com/influxdata/influxdb/client/v2"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/normalizer"
"github.com/mainflux/mainflux/writers"
"github.com/mainflux/mainflux/writers/api"
"github.com/mainflux/mainflux/writers/influxdb"
Expand All @@ -29,43 +28,37 @@ import (
const (
svcName = "influxdb-writer"

defNatsURL = nats.DefaultURL
defLogLevel = "error"
defPort = "8180"
defBatchSize = "5000"
defBatchTimeout = "5"
defDBName = "mainflux"
defDBHost = "localhost"
defDBPort = "8086"
defDBUser = "mainflux"
defDBPass = "mainflux"
defChanCfgPath = "/config/channels.toml"

envNatsURL = "MF_NATS_URL"
envLogLevel = "MF_INFLUX_WRITER_LOG_LEVEL"
envPort = "MF_INFLUX_WRITER_PORT"
envBatchSize = "MF_INFLUX_WRITER_BATCH_SIZE"
envBatchTimeout = "MF_INFLUX_WRITER_BATCH_TIMEOUT"
envDBName = "MF_INFLUX_WRITER_DB_NAME"
envDBHost = "MF_INFLUX_WRITER_DB_HOST"
envDBPort = "MF_INFLUX_WRITER_DB_PORT"
envDBUser = "MF_INFLUX_WRITER_DB_USER"
envDBPass = "MF_INFLUX_WRITER_DB_PASS"
envChanCfgPath = "MF_INFLUX_WRITER_CHANNELS_CONFIG"
defNatsURL = nats.DefaultURL
defLogLevel = "error"
defPort = "8180"
defDBName = "mainflux"
defDBHost = "localhost"
defDBPort = "8086"
defDBUser = "mainflux"
defDBPass = "mainflux"
defChanCfgPath = "/config/channels.toml"

envNatsURL = "MF_NATS_URL"
envLogLevel = "MF_INFLUX_WRITER_LOG_LEVEL"
envPort = "MF_INFLUX_WRITER_PORT"
envDBName = "MF_INFLUX_WRITER_DB_NAME"
envDBHost = "MF_INFLUX_WRITER_DB_HOST"
envDBPort = "MF_INFLUX_WRITER_DB_PORT"
envDBUser = "MF_INFLUX_WRITER_DB_USER"
envDBPass = "MF_INFLUX_WRITER_DB_PASS"
envChanCfgPath = "MF_INFLUX_WRITER_CHANNELS_CONFIG"
)

type config struct {
natsURL string
logLevel string
port string
batchSize string
batchTimeout string
dbName string
dbHost string
dbPort string
dbUser string
dbPass string
channels map[string]bool
natsURL string
logLevel string
port string
dbName string
dbHost string
dbPort string
dbUser string
dbPass string
channels map[string]bool
}

func main() {
Expand All @@ -90,29 +83,13 @@ func main() {
}
defer client.Close()

batchTimeout, err := strconv.Atoi(cfg.batchTimeout)
if err != nil {
logger.Error(fmt.Sprintf("Invalid value for batch timeout: %s", err))
os.Exit(1)
}

batchSize, err := strconv.Atoi(cfg.batchSize)
if err != nil {
logger.Error(fmt.Sprintf("Invalid value of batch size: %s", err))
os.Exit(1)
}

timeout := time.Duration(batchTimeout) * time.Second
repo, err := influxdb.New(client, cfg.dbName, batchSize, timeout)
if err != nil {
logger.Error(fmt.Sprintf("Failed to create InfluxDB writer: %s", err))
os.Exit(1)
}
repo := influxdb.New(client, cfg.dbName)

counter, latency := makeMetrics()
repo = api.LoggingMiddleware(repo, logger)
repo = api.MetricsMiddleware(repo, counter, latency)
if err := writers.Start(nc, repo, svcName, cfg.channels, logger); err != nil {
norm := normalizer.New()
if err := writers.Start(nc, repo, norm, svcName, cfg.channels, logger); err != nil {
logger.Error(fmt.Sprintf("Failed to start InfluxDB writer: %s", err))
os.Exit(1)
}
Expand All @@ -133,17 +110,15 @@ func main() {
func loadConfigs() (config, influxdata.HTTPConfig) {
chanCfgPath := mainflux.Env(envChanCfgPath, defChanCfgPath)
cfg := config{
natsURL: mainflux.Env(envNatsURL, defNatsURL),
logLevel: mainflux.Env(envLogLevel, defLogLevel),
port: mainflux.Env(envPort, defPort),
batchSize: mainflux.Env(envBatchSize, defBatchSize),
batchTimeout: mainflux.Env(envBatchTimeout, defBatchTimeout),
dbName: mainflux.Env(envDBName, defDBName),
dbHost: mainflux.Env(envDBHost, defDBHost),
dbPort: mainflux.Env(envDBPort, defDBPort),
dbUser: mainflux.Env(envDBUser, defDBUser),
dbPass: mainflux.Env(envDBPass, defDBPass),
channels: loadChansConfig(chanCfgPath),
natsURL: mainflux.Env(envNatsURL, defNatsURL),
logLevel: mainflux.Env(envLogLevel, defLogLevel),
port: mainflux.Env(envPort, defPort),
dbName: mainflux.Env(envDBName, defDBName),
dbHost: mainflux.Env(envDBHost, defDBHost),
dbPort: mainflux.Env(envDBPort, defDBPort),
dbUser: mainflux.Env(envDBUser, defDBUser),
dbPass: mainflux.Env(envDBPass, defDBPass),
channels: loadChansConfig(chanCfgPath),
}

clientCfg := influxdata.HTTPConfig{
Expand Down
4 changes: 3 additions & 1 deletion cmd/mongodb-writer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/normalizer"
"github.com/mainflux/mainflux/writers"
"github.com/mainflux/mainflux/writers/api"
"github.com/mainflux/mainflux/writers/mongodb"
Expand Down Expand Up @@ -84,7 +85,8 @@ func main() {
counter, latency := makeMetrics()
repo = api.LoggingMiddleware(repo, logger)
repo = api.MetricsMiddleware(repo, counter, latency)
if err := writers.Start(nc, repo, svcName, cfg.channels, logger); err != nil {
norm := normalizer.New()
if err := writers.Start(nc, repo, norm, svcName, cfg.channels, logger); err != nil {
logger.Error(fmt.Sprintf("Failed to start MongoDB writer: %s", err))
os.Exit(1)
}
Expand Down
4 changes: 3 additions & 1 deletion cmd/postgres-writer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/jmoiron/sqlx"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/normalizer"
"github.com/mainflux/mainflux/writers"
"github.com/mainflux/mainflux/writers/api"
"github.com/mainflux/mainflux/writers/postgres"
Expand Down Expand Up @@ -80,7 +81,8 @@ func main() {
defer db.Close()

repo := newService(db, logger)
if err = writers.Start(nc, repo, svcName, cfg.channels, logger); err != nil {
norm := normalizer.New()
if err = writers.Start(nc, repo, norm, svcName, cfg.channels, logger); err != nil {
logger.Error(fmt.Sprintf("Failed to create Postgres writer: %s", err))
}

Expand Down
8 changes: 8 additions & 0 deletions docker/addons/bootstrap/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
# Copyright (c) Mainflux
# SPDX-License-Identifier: Apache-2.0

# This docker-compose file contains optional bootstrap services. Since it's optional, this file is
# dependent of docker-compose file from <project_root>/docker. In order to run this services, execute command:
# docker-compose -f docker/docker-compose.yml -f docker/addons/bootstrap/docker-compose.yml up
# from project root.

version: "3.7"

networks:
Expand Down
10 changes: 5 additions & 5 deletions docker/addons/cassandra-reader/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
###
# This docker-compose file contains optional Cassandra and cassandra-reader. Since these are optional, this file is
# dependent of docker-compose file from <project_root>/docker. In order to run
# these optional service, execute command:
# Copyright (c) Mainflux
# SPDX-License-Identifier: Apache-2.0

# This docker-compose file contains optional cassandra-reader. Since it's optional, this file is
# dependent of docker-compose file from <project_root>/docker. In order to run this service, execute command:
# docker-compose -f docker/docker-compose.yml -f docker/addons/cassandra-reader/docker-compose.yml up
# from project root.
###

version: "3.7"

Expand Down
8 changes: 4 additions & 4 deletions docker/addons/cassandra-writer/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
###
# Copyright (c) Mainflux
# SPDX-License-Identifier: Apache-2.0

# This docker-compose file contains optional Cassandra and cassandra-writer. Since these are optional, this file is
# dependent of docker-compose file from <project_root>/docker. In order to run
# these optional service, execute command:
# dependent of docker-compose file from <project_root>/docker. In order to run these services, execute command:
# docker-compose -f docker/docker-compose.yml -f docker/addons/cassandra-writer/docker-compose.yml up
# from project root.
###

version: "3.7"

Expand Down
5 changes: 4 additions & 1 deletion docker/addons/influxdb-reader/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
# Copyright (c) Mainflux
# SPDX-License-Identifier: Apache-2.0

###
# This docker-compose file contains optional InfluxDB-reader service for the Mainflux
# platform. Since this service is optional, this file is dependent on the docker-compose.yml
# file from <project_root>/docker/. In order to run InfluxDB-reader service, core services,
# file from <project_root>/docker/. In order to run this service, core services,
# as well as the network from the core composition, should be already running.
###

Expand Down
6 changes: 3 additions & 3 deletions docker/addons/influxdb-writer/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
###
# Copyright (c) Mainflux
# SPDX-License-Identifier: Apache-2.0

# This docker-compose file contains optional InfluxDB, InfluxDB-writer and Grafana services
# for the Mainflux platform. Since this services are optional, this file is dependent on the
# docker-compose.yml file from <project_root>/docker/. In order to run these services,
# core services, as well as the network from the core composition, should be already running.
###

version: "3.7"

Expand All @@ -16,7 +17,6 @@ volumes:
mainflux-grafana-volume:

services:

influxdb:
image: influxdb:1.6.4-alpine
container_name: mainflux-influxdb
Expand Down
5 changes: 3 additions & 2 deletions docker/addons/lora-adapter/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
###
# Copyright (c) Mainflux
# SPDX-License-Identifier: Apache-2.0

# This docker-compose file contains optional lora-adapter and lora-redis services
# for the Mainflux platform. Since this services are optional, this file is dependent on the
# docker-compose.yml file from <project_root>/docker/. In order to run these services,
# core services, as well as the network from the core composition, should be already running.
###

version: "3.7"

Expand Down
7 changes: 4 additions & 3 deletions docker/addons/mongodb-reader/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
###
# Copyright (c) Mainflux
# SPDX-License-Identifier: Apache-2.0

# This docker-compose file contains optional MongoDB-reader service
# for Mainflux platform. Since these are optional, this file is dependent of docker-compose file
# from <project_root>/docker. In order to run these optional service, execute command:
# from <project_root>/docker. In order to run this service, execute command:
# docker-compose -f docker/docker-compose.yml -f docker/addons/mongodb-reader/docker-compose.yml up
# from project root. MongoDB service is defined in docker/addons/mongodb-writer/docker-compose.yml.
###

version: "3.7"

Expand Down
7 changes: 4 additions & 3 deletions docker/addons/mongodb-writer/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
###
# Copyright (c) Mainflux
# SPDX-License-Identifier: Apache-2.0

# This docker-compose file contains optional MongoDB and MongoDB-writer services
# for Mainflux platform. Since these are optional, this file is dependent of docker-compose file
# from <project_root>/docker. In order to run these optional service, execute command:
# from <project_root>/docker. In order to run these services, execute command:
# docker-compose -f docker/docker-compose.yml -f docker/addons/mongodb-writer/docker-compose.yml up
# from project root. MongoDB default port (27017) is exposed, so you can use various tools for database
# inspection and data visualization.
###

version: "3.7"

Expand Down
29 changes: 29 additions & 0 deletions docker/addons/normalizer/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Copyright (c) Mainflux
# SPDX-License-Identifier: Apache-2.0

# This docker-compose file contains optional lora-adapter and lora-redis services
# for the Mainflux platform. Since this service is optional, this file is dependent on the
# docker-compose.yml file from <project_root>/docker/. In order to run this service,
# core services, as well as the network from the core composition, should be already running.

version: "3.7"

networks:
docker_mainflux-base-net:
external: true

services:
normalizer:
image: mainflux/normalizer:latest
container_name: mainflux-normalizer
restart: on-failure
depends_on:
- nats
environment:
MF_NORMALIZER_LOG_LEVEL: ${MF_NORMALIZER_LOG_LEVEL}
MF_NATS_URL: ${MF_NATS_URL}
MF_NORMALIZER_PORT: ${MF_NORMALIZER_PORT}
ports:
- ${MF_NORMALIZER_PORT}:${MF_NORMALIZER_PORT}
networks:
- mainflux-base-net
5 changes: 3 additions & 2 deletions docker/addons/opcua-adapter/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
###
# Copyright (c) Mainflux
# SPDX-License-Identifier: Apache-2.0

# This docker-compose file contains optional opcua-adapter and opcua-redis services
# for the Mainflux platform. Since this services are optional, this file is dependent on the
# docker-compose.yml file from <project_root>/docker/. In order to run these services,
# core services, as well as the network from the core composition, should be already running.
###

version: "3.7"

Expand Down
7 changes: 4 additions & 3 deletions docker/addons/postgres-reader/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
###
# Copyright (c) Mainflux
# SPDX-License-Identifier: Apache-2.0

# This docker-compose file contains optional Postgres-reader service for Mainflux platform.
# Since this service is optional, this file is dependent of docker-compose.yml file
# from <project_root>/docker. In order to run these optional service, execute command:
# from <project_root>/docker. In order to run this service, execute command:
# docker-compose -f docker/docker-compose.yml -f docker/addons/postgres-reader/docker-compose.yml up
# from project root.
###

version: "3.7"

Expand Down
Loading

0 comments on commit 8be2516

Please sign in to comment.