From 8be2516321fa21d16a7bff3bf0d9384906fe15e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Du=C5=A1an=20Borov=C4=8Danin?= Date: Thu, 31 Oct 2019 14:04:47 +0100 Subject: [PATCH] MF-780 - Use Normalizer as a lib (#915) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Use Normalizer as a lib To normalize messages on the consumer side, Normalizer is moved to the internal pkgs. Writers being message consumers are modified to do message normalization instead of subscribing to normalized messages subject. Signed-off-by: Dušan Borovčanin * Fix logging middleware for readers and writers Signed-off-by: Dušan Borovčanin * Remove normalizer interface Signed-off-by: Dušan Borovčanin * Use Normalizer in writers As we agreed on #919, we'll use normalizer as an interface and provide the default SenML implementation. Because of that, Normalizer is removed from `internal` and we'll use the project structure proposed in the aforementioned issue. Signed-off-by: Dušan Borovčanin * Fix tests Signed-off-by: Dušan Borovčanin * Remove unused batch settings from influxDB reader Signed-off-by: Dušan Borovčanin * Update docs Move Normalizer service to `addons`. Signed-off-by: Dušan Borovčanin * Rename channels input topic Signed-off-by: Dušan Borovčanin * Update Noramlizer docs Signed-off-by: Dušan Borovčanin * Remove commented code Signed-off-by: Dušan Borovčanin * Update readers logging Signed-off-by: Dušan Borovčanin * Update addons docker-compose files Signed-off-by: Dušan Borovčanin * Update topcis explanations Signed-off-by: Dušan Borovčanin --- Makefile | 2 +- cmd/cassandra-writer/main.go | 4 +- cmd/influxdb-writer/main.go | 107 +++++++----------- cmd/mongodb-writer/main.go | 4 +- cmd/postgres-writer/main.go | 4 +- docker/addons/bootstrap/docker-compose.yml | 8 ++ .../cassandra-reader/docker-compose.yml | 10 +- .../cassandra-writer/docker-compose.yml | 8 +- .../addons/influxdb-reader/docker-compose.yml | 5 +- .../addons/influxdb-writer/docker-compose.yml | 6 +- docker/addons/lora-adapter/docker-compose.yml | 5 +- .../addons/mongodb-reader/docker-compose.yml | 7 +- .../addons/mongodb-writer/docker-compose.yml | 7 +- docker/addons/normalizer/docker-compose.yml | 29 +++++ .../addons/opcua-adapter/docker-compose.yml | 5 +- .../addons/postgres-reader/docker-compose.yml | 7 +- .../addons/postgres-writer/docker-compose.yml | 7 +- docs/architecture.md | 7 +- docs/getting-started.md | 3 +- k8s/README.md | 14 +-- normalizer/README.md | 6 +- normalizer/api/logging.go | 2 +- normalizer/api/metrics.go | 2 +- normalizer/nats/pubsub.go | 2 +- normalizer/normalizer.go | 13 +-- normalizer/service.go | 8 +- readers/api/logging.go | 9 +- readers/cassandra/README.md | 2 +- readers/cassandra/messages_test.go | 6 +- readers/influxdb/README.md | 2 +- readers/influxdb/messages_test.go | 9 +- readers/mongodb/README.md | 2 +- readers/mongodb/messages_test.go | 4 +- readers/postgres/README.md | 1 + readers/postgres/messages_test.go | 5 +- topics.go | 5 + writers/README.md | 2 +- writers/api/logging.go | 6 +- writers/api/metrics.go | 4 +- writers/cassandra/README.md | 2 +- writers/cassandra/messages.go | 57 ++++++---- writers/cassandra/messages_test.go | 8 +- writers/influxdb/README.md | 8 +- writers/influxdb/messages.go | 94 +++------------ writers/influxdb/messages_test.go | 83 +++----------- writers/messages.go | 2 +- writers/mongodb/README.md | 1 + writers/mongodb/messages.go | 65 ++++++----- writers/mongodb/messages_test.go | 6 +- writers/postgres/README.md | 1 + writers/postgres/messages.go | 30 +++-- writers/postgres/messages_test.go | 9 +- writers/writer.go | 39 ++++--- 53 files changed, 341 insertions(+), 403 deletions(-) create mode 100644 docker/addons/normalizer/docker-compose.yml diff --git a/Makefile b/Makefile index 1a138ced49..bde37e8fb3 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,7 @@ # SPDX-License-Identifier: Apache-2.0 BUILD_DIR = build -SERVICES = users things http normalizer ws coap lora influxdb-writer influxdb-reader mongodb-writer mongodb-reader cassandra-writer cassandra-reader postgres-writer postgres-reader cli bootstrap opcua +SERVICES = users things http ws coap lora normalizer influxdb-writer influxdb-reader mongodb-writer mongodb-reader cassandra-writer cassandra-reader postgres-writer postgres-reader cli bootstrap opcua DOCKERS = $(addprefix docker_,$(SERVICES)) DOCKERS_DEV = $(addprefix docker_dev_,$(SERVICES)) CGO_ENABLED ?= 0 diff --git a/cmd/cassandra-writer/main.go b/cmd/cassandra-writer/main.go index 81de29964d..fa9cce3fa5 100644 --- a/cmd/cassandra-writer/main.go +++ b/cmd/cassandra-writer/main.go @@ -19,6 +19,7 @@ import ( "github.com/gocql/gocql" "github.com/mainflux/mainflux" "github.com/mainflux/mainflux/logger" + "github.com/mainflux/mainflux/normalizer" "github.com/mainflux/mainflux/writers" "github.com/mainflux/mainflux/writers/api" "github.com/mainflux/mainflux/writers/cassandra" @@ -74,7 +75,8 @@ func main() { defer session.Close() repo := newService(session, logger) - if err := writers.Start(nc, repo, svcName, cfg.channels, logger); err != nil { + norm := normalizer.New() + if err := writers.Start(nc, repo, norm, svcName, cfg.channels, logger); err != nil { logger.Error(fmt.Sprintf("Failed to create Cassandra writer: %s", err)) } diff --git a/cmd/influxdb-writer/main.go b/cmd/influxdb-writer/main.go index 7c28a5bf7c..02f744d5d3 100644 --- a/cmd/influxdb-writer/main.go +++ b/cmd/influxdb-writer/main.go @@ -10,15 +10,14 @@ import ( "net/http" "os" "os/signal" - "strconv" "syscall" - "time" "github.com/BurntSushi/toml" kitprometheus "github.com/go-kit/kit/metrics/prometheus" influxdata "github.com/influxdata/influxdb/client/v2" "github.com/mainflux/mainflux" "github.com/mainflux/mainflux/logger" + "github.com/mainflux/mainflux/normalizer" "github.com/mainflux/mainflux/writers" "github.com/mainflux/mainflux/writers/api" "github.com/mainflux/mainflux/writers/influxdb" @@ -29,43 +28,37 @@ import ( const ( svcName = "influxdb-writer" - defNatsURL = nats.DefaultURL - defLogLevel = "error" - defPort = "8180" - defBatchSize = "5000" - defBatchTimeout = "5" - defDBName = "mainflux" - defDBHost = "localhost" - defDBPort = "8086" - defDBUser = "mainflux" - defDBPass = "mainflux" - defChanCfgPath = "/config/channels.toml" - - envNatsURL = "MF_NATS_URL" - envLogLevel = "MF_INFLUX_WRITER_LOG_LEVEL" - envPort = "MF_INFLUX_WRITER_PORT" - envBatchSize = "MF_INFLUX_WRITER_BATCH_SIZE" - envBatchTimeout = "MF_INFLUX_WRITER_BATCH_TIMEOUT" - envDBName = "MF_INFLUX_WRITER_DB_NAME" - envDBHost = "MF_INFLUX_WRITER_DB_HOST" - envDBPort = "MF_INFLUX_WRITER_DB_PORT" - envDBUser = "MF_INFLUX_WRITER_DB_USER" - envDBPass = "MF_INFLUX_WRITER_DB_PASS" - envChanCfgPath = "MF_INFLUX_WRITER_CHANNELS_CONFIG" + defNatsURL = nats.DefaultURL + defLogLevel = "error" + defPort = "8180" + defDBName = "mainflux" + defDBHost = "localhost" + defDBPort = "8086" + defDBUser = "mainflux" + defDBPass = "mainflux" + defChanCfgPath = "/config/channels.toml" + + envNatsURL = "MF_NATS_URL" + envLogLevel = "MF_INFLUX_WRITER_LOG_LEVEL" + envPort = "MF_INFLUX_WRITER_PORT" + envDBName = "MF_INFLUX_WRITER_DB_NAME" + envDBHost = "MF_INFLUX_WRITER_DB_HOST" + envDBPort = "MF_INFLUX_WRITER_DB_PORT" + envDBUser = "MF_INFLUX_WRITER_DB_USER" + envDBPass = "MF_INFLUX_WRITER_DB_PASS" + envChanCfgPath = "MF_INFLUX_WRITER_CHANNELS_CONFIG" ) type config struct { - natsURL string - logLevel string - port string - batchSize string - batchTimeout string - dbName string - dbHost string - dbPort string - dbUser string - dbPass string - channels map[string]bool + natsURL string + logLevel string + port string + dbName string + dbHost string + dbPort string + dbUser string + dbPass string + channels map[string]bool } func main() { @@ -90,29 +83,13 @@ func main() { } defer client.Close() - batchTimeout, err := strconv.Atoi(cfg.batchTimeout) - if err != nil { - logger.Error(fmt.Sprintf("Invalid value for batch timeout: %s", err)) - os.Exit(1) - } - - batchSize, err := strconv.Atoi(cfg.batchSize) - if err != nil { - logger.Error(fmt.Sprintf("Invalid value of batch size: %s", err)) - os.Exit(1) - } - - timeout := time.Duration(batchTimeout) * time.Second - repo, err := influxdb.New(client, cfg.dbName, batchSize, timeout) - if err != nil { - logger.Error(fmt.Sprintf("Failed to create InfluxDB writer: %s", err)) - os.Exit(1) - } + repo := influxdb.New(client, cfg.dbName) counter, latency := makeMetrics() repo = api.LoggingMiddleware(repo, logger) repo = api.MetricsMiddleware(repo, counter, latency) - if err := writers.Start(nc, repo, svcName, cfg.channels, logger); err != nil { + norm := normalizer.New() + if err := writers.Start(nc, repo, norm, svcName, cfg.channels, logger); err != nil { logger.Error(fmt.Sprintf("Failed to start InfluxDB writer: %s", err)) os.Exit(1) } @@ -133,17 +110,15 @@ func main() { func loadConfigs() (config, influxdata.HTTPConfig) { chanCfgPath := mainflux.Env(envChanCfgPath, defChanCfgPath) cfg := config{ - natsURL: mainflux.Env(envNatsURL, defNatsURL), - logLevel: mainflux.Env(envLogLevel, defLogLevel), - port: mainflux.Env(envPort, defPort), - batchSize: mainflux.Env(envBatchSize, defBatchSize), - batchTimeout: mainflux.Env(envBatchTimeout, defBatchTimeout), - dbName: mainflux.Env(envDBName, defDBName), - dbHost: mainflux.Env(envDBHost, defDBHost), - dbPort: mainflux.Env(envDBPort, defDBPort), - dbUser: mainflux.Env(envDBUser, defDBUser), - dbPass: mainflux.Env(envDBPass, defDBPass), - channels: loadChansConfig(chanCfgPath), + natsURL: mainflux.Env(envNatsURL, defNatsURL), + logLevel: mainflux.Env(envLogLevel, defLogLevel), + port: mainflux.Env(envPort, defPort), + dbName: mainflux.Env(envDBName, defDBName), + dbHost: mainflux.Env(envDBHost, defDBHost), + dbPort: mainflux.Env(envDBPort, defDBPort), + dbUser: mainflux.Env(envDBUser, defDBUser), + dbPass: mainflux.Env(envDBPass, defDBPass), + channels: loadChansConfig(chanCfgPath), } clientCfg := influxdata.HTTPConfig{ diff --git a/cmd/mongodb-writer/main.go b/cmd/mongodb-writer/main.go index 4fca1182a5..363e54ac6f 100644 --- a/cmd/mongodb-writer/main.go +++ b/cmd/mongodb-writer/main.go @@ -17,6 +17,7 @@ import ( kitprometheus "github.com/go-kit/kit/metrics/prometheus" "github.com/mainflux/mainflux" "github.com/mainflux/mainflux/logger" + "github.com/mainflux/mainflux/normalizer" "github.com/mainflux/mainflux/writers" "github.com/mainflux/mainflux/writers/api" "github.com/mainflux/mainflux/writers/mongodb" @@ -84,7 +85,8 @@ func main() { counter, latency := makeMetrics() repo = api.LoggingMiddleware(repo, logger) repo = api.MetricsMiddleware(repo, counter, latency) - if err := writers.Start(nc, repo, svcName, cfg.channels, logger); err != nil { + norm := normalizer.New() + if err := writers.Start(nc, repo, norm, svcName, cfg.channels, logger); err != nil { logger.Error(fmt.Sprintf("Failed to start MongoDB writer: %s", err)) os.Exit(1) } diff --git a/cmd/postgres-writer/main.go b/cmd/postgres-writer/main.go index a616bcea21..a0320f94fe 100644 --- a/cmd/postgres-writer/main.go +++ b/cmd/postgres-writer/main.go @@ -17,6 +17,7 @@ import ( "github.com/jmoiron/sqlx" "github.com/mainflux/mainflux" "github.com/mainflux/mainflux/logger" + "github.com/mainflux/mainflux/normalizer" "github.com/mainflux/mainflux/writers" "github.com/mainflux/mainflux/writers/api" "github.com/mainflux/mainflux/writers/postgres" @@ -80,7 +81,8 @@ func main() { defer db.Close() repo := newService(db, logger) - if err = writers.Start(nc, repo, svcName, cfg.channels, logger); err != nil { + norm := normalizer.New() + if err = writers.Start(nc, repo, norm, svcName, cfg.channels, logger); err != nil { logger.Error(fmt.Sprintf("Failed to create Postgres writer: %s", err)) } diff --git a/docker/addons/bootstrap/docker-compose.yml b/docker/addons/bootstrap/docker-compose.yml index 7c461b8543..287ef038b4 100644 --- a/docker/addons/bootstrap/docker-compose.yml +++ b/docker/addons/bootstrap/docker-compose.yml @@ -1,3 +1,11 @@ +# Copyright (c) Mainflux +# SPDX-License-Identifier: Apache-2.0 + +# This docker-compose file contains optional bootstrap services. Since it's optional, this file is +# dependent of docker-compose file from /docker. In order to run this services, execute command: +# docker-compose -f docker/docker-compose.yml -f docker/addons/bootstrap/docker-compose.yml up +# from project root. + version: "3.7" networks: diff --git a/docker/addons/cassandra-reader/docker-compose.yml b/docker/addons/cassandra-reader/docker-compose.yml index d5d5fc1d68..6093cc54ad 100644 --- a/docker/addons/cassandra-reader/docker-compose.yml +++ b/docker/addons/cassandra-reader/docker-compose.yml @@ -1,10 +1,10 @@ -### -# This docker-compose file contains optional Cassandra and cassandra-reader. Since these are optional, this file is -# dependent of docker-compose file from /docker. In order to run -# these optional service, execute command: +# Copyright (c) Mainflux +# SPDX-License-Identifier: Apache-2.0 + +# This docker-compose file contains optional cassandra-reader. Since it's optional, this file is +# dependent of docker-compose file from /docker. In order to run this service, execute command: # docker-compose -f docker/docker-compose.yml -f docker/addons/cassandra-reader/docker-compose.yml up # from project root. -### version: "3.7" diff --git a/docker/addons/cassandra-writer/docker-compose.yml b/docker/addons/cassandra-writer/docker-compose.yml index d4ec96b04e..81e10c3e4e 100644 --- a/docker/addons/cassandra-writer/docker-compose.yml +++ b/docker/addons/cassandra-writer/docker-compose.yml @@ -1,10 +1,10 @@ -### +# Copyright (c) Mainflux +# SPDX-License-Identifier: Apache-2.0 + # This docker-compose file contains optional Cassandra and cassandra-writer. Since these are optional, this file is -# dependent of docker-compose file from /docker. In order to run -# these optional service, execute command: +# dependent of docker-compose file from /docker. In order to run these services, execute command: # docker-compose -f docker/docker-compose.yml -f docker/addons/cassandra-writer/docker-compose.yml up # from project root. -### version: "3.7" diff --git a/docker/addons/influxdb-reader/docker-compose.yml b/docker/addons/influxdb-reader/docker-compose.yml index 36b687958a..bfd3882339 100644 --- a/docker/addons/influxdb-reader/docker-compose.yml +++ b/docker/addons/influxdb-reader/docker-compose.yml @@ -1,7 +1,10 @@ +# Copyright (c) Mainflux +# SPDX-License-Identifier: Apache-2.0 + ### # This docker-compose file contains optional InfluxDB-reader service for the Mainflux # platform. Since this service is optional, this file is dependent on the docker-compose.yml -# file from /docker/. In order to run InfluxDB-reader service, core services, +# file from /docker/. In order to run this service, core services, # as well as the network from the core composition, should be already running. ### diff --git a/docker/addons/influxdb-writer/docker-compose.yml b/docker/addons/influxdb-writer/docker-compose.yml index a1a179238e..02582ab105 100644 --- a/docker/addons/influxdb-writer/docker-compose.yml +++ b/docker/addons/influxdb-writer/docker-compose.yml @@ -1,9 +1,10 @@ -### +# Copyright (c) Mainflux +# SPDX-License-Identifier: Apache-2.0 + # This docker-compose file contains optional InfluxDB, InfluxDB-writer and Grafana services # for the Mainflux platform. Since this services are optional, this file is dependent on the # docker-compose.yml file from /docker/. In order to run these services, # core services, as well as the network from the core composition, should be already running. -### version: "3.7" @@ -16,7 +17,6 @@ volumes: mainflux-grafana-volume: services: - influxdb: image: influxdb:1.6.4-alpine container_name: mainflux-influxdb diff --git a/docker/addons/lora-adapter/docker-compose.yml b/docker/addons/lora-adapter/docker-compose.yml index 186279e726..fdb9231f9f 100644 --- a/docker/addons/lora-adapter/docker-compose.yml +++ b/docker/addons/lora-adapter/docker-compose.yml @@ -1,9 +1,10 @@ -### +# Copyright (c) Mainflux +# SPDX-License-Identifier: Apache-2.0 + # This docker-compose file contains optional lora-adapter and lora-redis services # for the Mainflux platform. Since this services are optional, this file is dependent on the # docker-compose.yml file from /docker/. In order to run these services, # core services, as well as the network from the core composition, should be already running. -### version: "3.7" diff --git a/docker/addons/mongodb-reader/docker-compose.yml b/docker/addons/mongodb-reader/docker-compose.yml index 5a3f24280f..1886aa2424 100644 --- a/docker/addons/mongodb-reader/docker-compose.yml +++ b/docker/addons/mongodb-reader/docker-compose.yml @@ -1,10 +1,11 @@ -### +# Copyright (c) Mainflux +# SPDX-License-Identifier: Apache-2.0 + # This docker-compose file contains optional MongoDB-reader service # for Mainflux platform. Since these are optional, this file is dependent of docker-compose file -# from /docker. In order to run these optional service, execute command: +# from /docker. In order to run this service, execute command: # docker-compose -f docker/docker-compose.yml -f docker/addons/mongodb-reader/docker-compose.yml up # from project root. MongoDB service is defined in docker/addons/mongodb-writer/docker-compose.yml. -### version: "3.7" diff --git a/docker/addons/mongodb-writer/docker-compose.yml b/docker/addons/mongodb-writer/docker-compose.yml index ef8ef1fd87..d408db56a8 100644 --- a/docker/addons/mongodb-writer/docker-compose.yml +++ b/docker/addons/mongodb-writer/docker-compose.yml @@ -1,11 +1,12 @@ -### +# Copyright (c) Mainflux +# SPDX-License-Identifier: Apache-2.0 + # This docker-compose file contains optional MongoDB and MongoDB-writer services # for Mainflux platform. Since these are optional, this file is dependent of docker-compose file -# from /docker. In order to run these optional service, execute command: +# from /docker. In order to run these services, execute command: # docker-compose -f docker/docker-compose.yml -f docker/addons/mongodb-writer/docker-compose.yml up # from project root. MongoDB default port (27017) is exposed, so you can use various tools for database # inspection and data visualization. -### version: "3.7" diff --git a/docker/addons/normalizer/docker-compose.yml b/docker/addons/normalizer/docker-compose.yml new file mode 100644 index 0000000000..f02f535369 --- /dev/null +++ b/docker/addons/normalizer/docker-compose.yml @@ -0,0 +1,29 @@ +# Copyright (c) Mainflux +# SPDX-License-Identifier: Apache-2.0 + +# This docker-compose file contains optional lora-adapter and lora-redis services +# for the Mainflux platform. Since this service is optional, this file is dependent on the +# docker-compose.yml file from /docker/. In order to run this service, +# core services, as well as the network from the core composition, should be already running. + +version: "3.7" + +networks: + docker_mainflux-base-net: + external: true + +services: + normalizer: + image: mainflux/normalizer:latest + container_name: mainflux-normalizer + restart: on-failure + depends_on: + - nats + environment: + MF_NORMALIZER_LOG_LEVEL: ${MF_NORMALIZER_LOG_LEVEL} + MF_NATS_URL: ${MF_NATS_URL} + MF_NORMALIZER_PORT: ${MF_NORMALIZER_PORT} + ports: + - ${MF_NORMALIZER_PORT}:${MF_NORMALIZER_PORT} + networks: + - mainflux-base-net diff --git a/docker/addons/opcua-adapter/docker-compose.yml b/docker/addons/opcua-adapter/docker-compose.yml index 7a024e89ef..6e450f9bad 100644 --- a/docker/addons/opcua-adapter/docker-compose.yml +++ b/docker/addons/opcua-adapter/docker-compose.yml @@ -1,9 +1,10 @@ -### +# Copyright (c) Mainflux +# SPDX-License-Identifier: Apache-2.0 + # This docker-compose file contains optional opcua-adapter and opcua-redis services # for the Mainflux platform. Since this services are optional, this file is dependent on the # docker-compose.yml file from /docker/. In order to run these services, # core services, as well as the network from the core composition, should be already running. -### version: "3.7" diff --git a/docker/addons/postgres-reader/docker-compose.yml b/docker/addons/postgres-reader/docker-compose.yml index e9a62bf199..236feaf8e5 100644 --- a/docker/addons/postgres-reader/docker-compose.yml +++ b/docker/addons/postgres-reader/docker-compose.yml @@ -1,10 +1,11 @@ -### +# Copyright (c) Mainflux +# SPDX-License-Identifier: Apache-2.0 + # This docker-compose file contains optional Postgres-reader service for Mainflux platform. # Since this service is optional, this file is dependent of docker-compose.yml file -# from /docker. In order to run these optional service, execute command: +# from /docker. In order to run this service, execute command: # docker-compose -f docker/docker-compose.yml -f docker/addons/postgres-reader/docker-compose.yml up # from project root. -### version: "3.7" diff --git a/docker/addons/postgres-writer/docker-compose.yml b/docker/addons/postgres-writer/docker-compose.yml index 22ea606dea..8859551d75 100644 --- a/docker/addons/postgres-writer/docker-compose.yml +++ b/docker/addons/postgres-writer/docker-compose.yml @@ -1,11 +1,12 @@ -### +# Copyright (c) Mainflux +# SPDX-License-Identifier: Apache-2.0 + # This docker-compose file contains optional Postgres and Postgres-writer services # for Mainflux platform. Since these are optional, this file is dependent of docker-compose file -# from /docker. In order to run these optional service, execute command: +# from /docker. In order to run these services, execute command: # docker-compose -f docker/docker-compose.yml -f docker/addons/postgres-writer/docker-compose.yml up # from project root. PostgreSQL default port (5432) is exposed, so you can use various tools for database # inspection and data visualization. -### version: "3.7" diff --git a/docs/architecture.md b/docs/architecture.md index 4be01baeb1..01d99e1627 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -6,13 +6,12 @@ Mainflux IoT platform is comprised of the following services: |:--------------------------------------------------------------------------|:------------------------------------------------------------------------| | [users](https://github.com/mainflux/mainflux/tree/master/users) | Manages platform's users and auth concerns | | [things](https://github.com/mainflux/mainflux/tree/master/things) | Manages platform's things, channels and access policies | -| [normalizer](https://github.com/mainflux/mainflux/tree/master/normalizer) | Normalizes SenML messages and generates the "processed" messages stream | | [http-adapter](https://github.com/mainflux/mainflux/tree/master/http) | Provides an HTTP interface for accessing communication channels | | [ws-adapter](https://github.com/mainflux/mainflux/tree/master/ws) | Provides a WebSocket interface for accessing communication channels | | [mqtt-adapter](https://github.com/mainflux/mainflux/tree/master/mqtt) | Provides an MQTT interface for accessing communication channels | - | [coap-adapter](https://github.com/mainflux/mainflux/tree/master/coap) | Provides a CoAP interface for accessing communication channels | - | [lora-adapter](https://github.com/mainflux/mainflux/tree/master/lora) | Provides a LoRa Server forwarder for accessing communication channels | - | [mainflux-cli](https://github.com/mainflux/mainflux/tree/master/cli) | Command line interface | +| [coap-adapter](https://github.com/mainflux/mainflux/tree/master/coap) | Provides a CoAP interface for accessing communication channels | +| [lora-adapter](https://github.com/mainflux/mainflux/tree/master/lora) | Provides a LoRa Server forwarder for accessing communication channels | +| [mainflux-cli](https://github.com/mainflux/mainflux/tree/master/cli) | Command line interface | ![arch](img/architecture.jpg) diff --git a/docs/getting-started.md b/docs/getting-started.md index 9714aa5b92..e33fd7dce7 100644 --- a/docs/getting-started.md +++ b/docs/getting-started.md @@ -101,7 +101,6 @@ In the Mainflux system terminal you should see following logs: ```bash mainflux-things | {"level":"info","message":"Method can_access for channel b7bfc4b6-c18d-47c5-b343-98235c5acc19 and thing 513d02d2-16c1-4f23-98be-9e12f8fee898 took 1.410194ms to complete without errors.","ts":"2019-01-08T22:19:30.148097648Z"} mainflux-http | {"level":"info","message":"Method publish took 336.685µs to complete without errors.","ts":"2019-01-08T22:19:30.148689601Z"} -mainflux-normalizer | {"level":"info","message":"Method normalize took 108.126µs to complete without errors.","ts":"2019-01-08T22:19:30.149500543Z"} ``` -This proves that messages have been correctly sent through the system, via the protocol adapter (`mainflux-http`) and the `normalizer` service has correctly parsed the messages. +This proves that messages have been correctly sent through the system via the protocol adapter (`mainflux-http`). diff --git a/k8s/README.md b/k8s/README.md index 4621ced400..af9e1f51d5 100644 --- a/k8s/README.md +++ b/k8s/README.md @@ -43,15 +43,7 @@ kubectl create -f k8s/mainflux/things-postgres.yml kubectl create -f k8s/mainflux/things.yml ``` -### 4. Setup Normalizer service - -- Deploy Normalizer service: - -``` -kubectl create -f k8s/mainflux/normalizer.yml -``` - -### 5. Setup adapter services +### 4. Setup adapter services - Deploy adapter service: @@ -60,7 +52,7 @@ kubectl create -f k8s/mainflux/tcp-services.yml kubectl create -f k8s/mainflux/.yml ``` -### 6. Setup Dashflux +### 5. Setup Dashflux - Deploy Dashflux service: @@ -68,6 +60,6 @@ kubectl create -f k8s/mainflux/.yml kubectl create -f k8s/mainflux/dashflux.yml ``` -### 7. Configure Internet Access +### 6. Configure Internet Access Configure NAT on your Firewall to forward ports 80 (HTTP) and 443 (HTTPS) to nginx ingress service diff --git a/normalizer/README.md b/normalizer/README.md index 5e20a52513..ba0eb36df7 100644 --- a/normalizer/README.md +++ b/normalizer/README.md @@ -2,6 +2,10 @@ Normalizer service consumes events published by adapters, normalizes SenML-formatted ones, and publishes them to the post-processing stream. +Normalizer can also be imported as a package and used independently for message normalization. +This reduces internal traffic because messages are not published back to the broker, but transformed +on the message consumer side. Mainflux (writers) [https://github.com/mainflux/mainflux/tree/master/writers] +are using Normalizer to preprocess messages before storing them. ## Configuration @@ -22,7 +26,7 @@ provides a compose file template that can be used to deploy the service containe locally: ```yaml -version: "2" +version: "3.7" services: normalizer: image: mainflux/normalizer:[version] diff --git a/normalizer/api/logging.go b/normalizer/api/logging.go index 16fc96d629..278c5dde2a 100644 --- a/normalizer/api/logging.go +++ b/normalizer/api/logging.go @@ -27,7 +27,7 @@ func LoggingMiddleware(svc normalizer.Service, logger logger.Logger) normalizer. } } -func (lm loggingMiddleware) Normalize(msg mainflux.RawMessage) (nd normalizer.NormalizedData, err error) { +func (lm loggingMiddleware) Normalize(msg mainflux.RawMessage) (msgs []mainflux.Message, err error) { defer func(begin time.Time) { message := fmt.Sprintf("Method normalize took %s to complete", time.Since(begin)) if err != nil { diff --git a/normalizer/api/metrics.go b/normalizer/api/metrics.go index 4d97ce2f50..5516d9c0e6 100644 --- a/normalizer/api/metrics.go +++ b/normalizer/api/metrics.go @@ -29,7 +29,7 @@ func MetricsMiddleware(svc normalizer.Service, counter metrics.Counter, latency } } -func (mm *metricsMiddleware) Normalize(msg mainflux.RawMessage) (normalizer.NormalizedData, error) { +func (mm *metricsMiddleware) Normalize(msg mainflux.RawMessage) ([]mainflux.Message, error) { defer func(begin time.Time) { mm.counter.With("method", "normalize").Add(1) mm.latency.With("method", "normalize").Observe(time.Since(begin).Seconds()) diff --git a/normalizer/nats/pubsub.go b/normalizer/nats/pubsub.go index 8c059d4ca2..eb8a5299d6 100644 --- a/normalizer/nats/pubsub.go +++ b/normalizer/nats/pubsub.go @@ -68,7 +68,7 @@ func (ps pubsub) publish(msg mainflux.RawMessage) error { } } - for _, v := range normalized.Messages { + for _, v := range normalized { data, err := proto.Marshal(&v) if err != nil { ps.logger.Warn(fmt.Sprintf("Marshalling failed: %s", err)) diff --git a/normalizer/normalizer.go b/normalizer/normalizer.go index c7c6aab785..99086d6263 100644 --- a/normalizer/normalizer.go +++ b/normalizer/normalizer.go @@ -4,8 +4,6 @@ package normalizer import ( - "strings" - "github.com/cisco/senml" "github.com/mainflux/mainflux" ) @@ -22,7 +20,7 @@ func New() Service { return normalizer{} } -func (n normalizer) Normalize(msg mainflux.RawMessage) (NormalizedData, error) { +func (n normalizer) Normalize(msg mainflux.RawMessage) ([]mainflux.Message, error) { format, ok := formats[msg.ContentType] if !ok { format = senml.JSON @@ -30,7 +28,7 @@ func (n normalizer) Normalize(msg mainflux.RawMessage) (NormalizedData, error) { raw, err := senml.Decode(msg.Payload, format) if err != nil { - return NormalizedData{}, err + return nil, err } normalized := senml.Normalize(raw) @@ -67,10 +65,5 @@ func (n normalizer) Normalize(msg mainflux.RawMessage) (NormalizedData, error) { msgs[k] = m } - output := strings.ToLower(msg.ContentType) - - return NormalizedData{ - ContentType: output, - Messages: msgs, - }, nil + return msgs, nil } diff --git a/normalizer/service.go b/normalizer/service.go index 41c91fef2b..04f07f76a4 100644 --- a/normalizer/service.go +++ b/normalizer/service.go @@ -8,11 +8,5 @@ import "github.com/mainflux/mainflux" // Service specifies API for normalizing messages. type Service interface { // Normalizes raw message to array of standard SenML messages. - Normalize(mainflux.RawMessage) (NormalizedData, error) -} - -// NormalizedData contains normalized messages and their content type. -type NormalizedData struct { - ContentType string - Messages []mainflux.Message + Normalize(mainflux.RawMessage) ([]mainflux.Message, error) } diff --git a/readers/api/logging.go b/readers/api/logging.go index 9fe3215875..96dfb7223b 100644 --- a/readers/api/logging.go +++ b/readers/api/logging.go @@ -28,9 +28,14 @@ func LoggingMiddleware(svc readers.MessageRepository, logger logger.Logger) read } } -func (lm *loggingMiddleware) ReadAll(chanID string, offset, limit uint64, query map[string]string) (readers.MessagesPage, error) { +func (lm *loggingMiddleware) ReadAll(chanID string, offset, limit uint64, query map[string]string) (page readers.MessagesPage, err error) { defer func(begin time.Time) { - lm.logger.Info(fmt.Sprintf(`Method read_all for offset %d and limit %d took %s to complete without errors.`, offset, limit, time.Since(begin))) + message := fmt.Sprintf("Method read_all for channel %s with offset %d and limit %d took %s to complete", chanID, offset, limit, time.Since(begin)) + if err != nil { + lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err)) + return + } + lm.logger.Info(fmt.Sprintf("%s without errors.", message)) }(time.Now()) return lm.svc.ReadAll(chanID, offset, limit, query) diff --git a/readers/cassandra/README.md b/readers/cassandra/README.md index c84697057b..c0d3cc0117 100644 --- a/readers/cassandra/README.md +++ b/readers/cassandra/README.md @@ -26,7 +26,7 @@ default values. ## Deployment ```yaml - version: "2" + version: "3.7" cassandra-reader: image: mainflux/cassandra-reader:[version] container_name: [instance name] diff --git a/readers/cassandra/messages_test.go b/readers/cassandra/messages_test.go index 11bcd4d860..232a9a75f9 100644 --- a/readers/cassandra/messages_test.go +++ b/readers/cassandra/messages_test.go @@ -65,15 +65,15 @@ func TestReadAll(t *testing.T) { msg.ValueSum = &mainflux.SumValue{Value: 45} } msg.Time = float64(now - int64(i)) - - err := writer.Save(msg) - require.Nil(t, err, fmt.Sprintf("failed to store message to Cassandra: %s", err)) messages = append(messages, msg) if count == 0 { subtopicMsgs = append(subtopicMsgs, msg) } } + err = writer.Save(messages...) + require.Nil(t, err, fmt.Sprintf("failed to store message to Cassandra: %s", err)) + reader := creaders.New(session) // Since messages are not saved in natural order, diff --git a/readers/influxdb/README.md b/readers/influxdb/README.md index 352213caf2..3d7fcacb78 100644 --- a/readers/influxdb/README.md +++ b/readers/influxdb/README.md @@ -24,7 +24,7 @@ default values. ## Deployment ```yaml - version: "2" + version: "3.7" influxdb-reader: image: mainflux/influxdb-reader:[version] container_name: [instance name] diff --git a/readers/influxdb/messages_test.go b/readers/influxdb/messages_test.go index ca0aecda00..051e225d7a 100644 --- a/readers/influxdb/messages_test.go +++ b/readers/influxdb/messages_test.go @@ -50,8 +50,7 @@ var ( ) func TestReadAll(t *testing.T) { - writer, err := writer.New(client, testDB, 1, time.Second) - require.Nil(t, err, fmt.Sprintf("Creating new InfluxDB writer expected to succeed: %s.\n", err)) + writer := writer.New(client, testDB) messages := []mainflux.Message{} subtopicMsgs := []mainflux.Message{} @@ -76,15 +75,15 @@ func TestReadAll(t *testing.T) { msg.ValueSum = &mainflux.SumValue{Value: 45} } msg.Time = float64(now - int64(i)) - - err := writer.Save(msg) - require.Nil(t, err, fmt.Sprintf("failed to store message to InfluxDB: %s", err)) messages = append(messages, msg) if count == 0 { subtopicMsgs = append(subtopicMsgs, msg) } } + err := writer.Save(messages...) + require.Nil(t, err, fmt.Sprintf("failed to store message to InfluxDB: %s", err)) + reader := reader.New(client, testDB) require.Nil(t, err, fmt.Sprintf("Creating new InfluxDB reader expected to succeed: %s.\n", err)) diff --git a/readers/mongodb/README.md b/readers/mongodb/README.md index 1780e70fda..422ee7fb09 100644 --- a/readers/mongodb/README.md +++ b/readers/mongodb/README.md @@ -23,7 +23,7 @@ default values. ## Deployment ```yaml - version: "2" + version: "3.7" mongodb-reader: image: mainflux/mongodb-reader:[version] container_name: [instance name] diff --git a/readers/mongodb/messages_test.go b/readers/mongodb/messages_test.go index ec23d517eb..b6bc847a4e 100644 --- a/readers/mongodb/messages_test.go +++ b/readers/mongodb/messages_test.go @@ -74,13 +74,13 @@ func TestReadAll(t *testing.T) { } msg.Time = float64(now - int64(i)) - err := writer.Save(msg) - require.Nil(t, err, fmt.Sprintf("failed to store message to MongoDB: %s", err)) messages = append(messages, msg) if count == 0 { subtopicMsgs = append(subtopicMsgs, msg) } } + err = writer.Save(messages...) + require.Nil(t, err, fmt.Sprintf("failed to store message to MongoDB: %s", err)) reader := mreaders.New(db) diff --git a/readers/postgres/README.md b/readers/postgres/README.md index 76fd10d1f5..1d83eb68ad 100644 --- a/readers/postgres/README.md +++ b/readers/postgres/README.md @@ -30,6 +30,7 @@ default values. ## Deployment ```yaml + version: "3.7" postgres-writer: image: mainflux/postgres-writer:[version] container_name: [instance name] diff --git a/readers/postgres/messages_test.go b/readers/postgres/messages_test.go index 965e756640..43b76fc127 100644 --- a/readers/postgres/messages_test.go +++ b/readers/postgres/messages_test.go @@ -61,14 +61,15 @@ func TestMessageReadAll(t *testing.T) { } msg.Time = float64(now - int64(i)) - err := messageRepo.Save(msg) - assert.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err)) messages = append(messages, msg) if count == 0 { subtopicMsgs = append(subtopicMsgs, msg) } } + err = messageRepo.Save(messages...) + assert.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err)) + reader := preader.New(db) // Since messages are not saved in natural order, diff --git a/topics.go b/topics.go index 0184f57ce5..a8d8747c7f 100644 --- a/topics.go +++ b/topics.go @@ -6,4 +6,9 @@ package mainflux // File topics.go contains all NATS subjects that are shared between services. // OutputSenML represents subject SenML messages will be published to. +// Messages published to this topic are Protobuf serialized Messages. const OutputSenML = "out.senml" + +// InputChannels represents subject all messages will be published to. +// Messages received on this topic are Protobuf serialized RawMessages. +const InputChannels = "channel.>" diff --git a/writers/README.md b/writers/README.md index cc83d40db9..b715a11901 100644 --- a/writers/README.md +++ b/writers/README.md @@ -1,7 +1,7 @@ # Writers Writers provide an implementation of various `message writers`. -Message writers are services that consume normalized (in `SenML` format) +Message writers are services that normalize (in `SenML` format) Mainflux messages and store them in specific data store. Writers are optional services and are treated as plugins. In order to diff --git a/writers/api/logging.go b/writers/api/logging.go index 435cca353b..903043449e 100644 --- a/writers/api/logging.go +++ b/writers/api/logging.go @@ -26,9 +26,9 @@ func LoggingMiddleware(svc writers.MessageRepository, logger log.Logger) writers return &loggingMiddleware{logger, svc} } -func (lm *loggingMiddleware) Save(msg mainflux.Message) (err error) { +func (lm *loggingMiddleware) Save(msgs ...mainflux.Message) (err error) { defer func(begin time.Time) { - message := fmt.Sprintf("Method Save took %s to complete", time.Since(begin)) + message := fmt.Sprintf("Method save took %s to complete", time.Since(begin)) if err != nil { lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err)) return @@ -36,5 +36,5 @@ func (lm *loggingMiddleware) Save(msg mainflux.Message) (err error) { lm.logger.Info(fmt.Sprintf("%s without errors.", message)) }(time.Now()) - return lm.svc.Save(msg) + return lm.svc.Save(msgs...) } diff --git a/writers/api/metrics.go b/writers/api/metrics.go index b247fd92cc..b655bc0116 100644 --- a/writers/api/metrics.go +++ b/writers/api/metrics.go @@ -27,10 +27,10 @@ func MetricsMiddleware(repo writers.MessageRepository, counter metrics.Counter, } } -func (mm *metricsMiddleware) Save(msg mainflux.Message) error { +func (mm *metricsMiddleware) Save(msgs ...mainflux.Message) error { defer func(begin time.Time) { mm.counter.With("method", "handle_message").Add(1) mm.latency.With("method", "handle_message").Observe(time.Since(begin).Seconds()) }(time.Now()) - return mm.repo.Save(msg) + return mm.repo.Save(msgs...) } diff --git a/writers/cassandra/README.md b/writers/cassandra/README.md index 6db3fde668..aef70ba15f 100644 --- a/writers/cassandra/README.md +++ b/writers/cassandra/README.md @@ -22,7 +22,7 @@ default values. ## Deployment ```yaml - version: "2" + version: "3.7" cassandra-writer: image: mainflux/cassandra-writer:[version] container_name: [instance name] diff --git a/writers/cassandra/messages.go b/writers/cassandra/messages.go index 0ea305212a..001e355377 100644 --- a/writers/cassandra/messages.go +++ b/writers/cassandra/messages.go @@ -20,37 +20,44 @@ func New(session *gocql.Session) writers.MessageRepository { return &cassandraRepository{session} } -func (cr *cassandraRepository) Save(msg mainflux.Message) error { +func (cr *cassandraRepository) Save(messages ...mainflux.Message) error { cql := `INSERT INTO messages (id, channel, subtopic, publisher, protocol, name, unit, value, string_value, bool_value, data_value, value_sum, time, update_time, link) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` id := gocql.TimeUUID() - var floatVal, valSum *float64 - var strVal, dataVal *string - var boolVal *bool - switch msg.Value.(type) { - case *mainflux.Message_FloatValue: - v := msg.GetFloatValue() - floatVal = &v - case *mainflux.Message_StringValue: - v := msg.GetStringValue() - strVal = &v - case *mainflux.Message_DataValue: - v := msg.GetDataValue() - dataVal = &v - case *mainflux.Message_BoolValue: - v := msg.GetBoolValue() - boolVal = &v + for _, msg := range messages { + var floatVal, valSum *float64 + var strVal, dataVal *string + var boolVal *bool + switch msg.Value.(type) { + case *mainflux.Message_FloatValue: + v := msg.GetFloatValue() + floatVal = &v + case *mainflux.Message_StringValue: + v := msg.GetStringValue() + strVal = &v + case *mainflux.Message_DataValue: + v := msg.GetDataValue() + dataVal = &v + case *mainflux.Message_BoolValue: + v := msg.GetBoolValue() + boolVal = &v + } + + if msg.GetValueSum() != nil { + v := msg.GetValueSum().GetValue() + valSum = &v + } + + err := cr.session.Query(cql, id, msg.GetChannel(), msg.GetSubtopic(), msg.GetPublisher(), + msg.GetProtocol(), msg.GetName(), msg.GetUnit(), floatVal, + strVal, boolVal, dataVal, valSum, msg.GetTime(), msg.GetUpdateTime(), msg.GetLink()).Exec() + if err != nil { + return err + } } - if msg.GetValueSum() != nil { - v := msg.GetValueSum().GetValue() - valSum = &v - } - - return cr.session.Query(cql, id, msg.GetChannel(), msg.GetSubtopic(), msg.GetPublisher(), - msg.GetProtocol(), msg.GetName(), msg.GetUnit(), floatVal, - strVal, boolVal, dataVal, valSum, msg.GetTime(), msg.GetUpdateTime(), msg.GetLink()).Exec() + return nil } diff --git a/writers/cassandra/messages_test.go b/writers/cassandra/messages_test.go index d3cd58096e..e26e878b05 100644 --- a/writers/cassandra/messages_test.go +++ b/writers/cassandra/messages_test.go @@ -36,6 +36,7 @@ func TestSave(t *testing.T) { repo := cassandra.New(session) now := time.Now().Unix() + var msgs []mainflux.Message for i := 0; i < msgsNum; i++ { // Mix possible values as well as value sum. count := i % valueFields @@ -54,8 +55,9 @@ func TestSave(t *testing.T) { msg.ValueSum = &mainflux.SumValue{Value: 45} } msg.Time = float64(now + int64(i)) - - err = repo.Save(msg) - assert.Nil(t, err, fmt.Sprintf("expected no error, got %s", err)) + msgs = append(msgs, msg) } + + err = repo.Save(msgs...) + assert.Nil(t, err, fmt.Sprintf("expected no error, got %s", err)) } diff --git a/writers/influxdb/README.md b/writers/influxdb/README.md index c4eaf93a1b..bdcc74e570 100644 --- a/writers/influxdb/README.md +++ b/writers/influxdb/README.md @@ -13,8 +13,6 @@ default values. | MF_NATS_URL | NATS instance URL | nats://localhost:4222 | | MF_INFLUX_WRITER_LOG_LEVEL | Log level for InfluxDB writer (debug, info, warn, error) | error | | MF_INFLUX_WRITER_PORT | Service HTTP port | 8180 | -| MF_INFLUX_WRITER_BATCH_SIZE | Size of the writer points batch | 5000 | -| MF_INFLUX_WRITER_BATCH_TIMEOUT | Time interval in seconds to flush the batch | 1 second | | MF_INFLUX_WRITER_DB_NAME | InfluxDB database name | mainflux | | MF_INFLUX_WRITER_DB_HOST | InfluxDB host | localhost | | MF_INFLUX_WRITER_DB_PORT | Default port of InfluxDB database | 8086 | @@ -25,7 +23,7 @@ default values. ## Deployment ```yaml - version: "2" + version: "3.7" influxdb-writer: image: mainflux/influxdb:[version] container_name: [instance name] @@ -36,8 +34,6 @@ default values. MF_NATS_URL: [NATS instance URL] MF_INFLUX_WRITER_LOG_LEVEL: [Influx writer log level] MF_INFLUX_WRITER_PORT: [Service HTTP port] - MF_INFLUX_WRITER_BATCH_SIZE: [Size of the writer points batch] - MF_INFLUX_WRITER_BATCH_TIMEOUT: [Time interval in seconds to flush the batch] MF_INFLUX_WRITER_DB_NAME: [InfluxDB name] MF_INFLUX_WRITER_DB_HOST: [InfluxDB host] MF_INFLUX_WRITER_DB_PORT: [InfluxDB port] @@ -66,7 +62,7 @@ make influxdb make install # Set the environment variables and run the service -MF_NATS_URL=[NATS instance URL] MF_INFLUX_WRITER_LOG_LEVEL=[Influx writer log level] MF_INFLUX_WRITER_PORT=[Service HTTP port] MF_INFLUX_WRITER_BATCH_SIZE=[Size of the writer points batch] MF_INFLUX_WRITER_BATCH_TIMEOUT=[Time interval in seconds to flush the batch] MF_INFLUX_WRITER_DB_NAME=[InfluxDB database name] MF_INFLUX_WRITER_DB_HOST=[InfluxDB database host] MF_INFLUX_WRITER_DB_PORT=[InfluxDB database port] MF_INFLUX_WRITER_DB_USER=[InfluxDB admin user] MF_INFLUX_WRITER_DB_PASS=[InfluxDB admin password] MF_INFLUX_WRITER_CHANNELS_CONFIG=[Configuration file path with channels list] $GOBIN/mainflux-influxdb +MF_NATS_URL=[NATS instance URL] MF_INFLUX_WRITER_LOG_LEVEL=[Influx writer log level] MF_INFLUX_WRITER_PORT=[Service HTTP port] MF_INFLUX_WRITER_DB_NAME=[InfluxDB database name] MF_INFLUX_WRITER_DB_HOST=[InfluxDB database host] MF_INFLUX_WRITER_DB_PORT=[InfluxDB database port] MF_INFLUX_WRITER_DB_USER=[InfluxDB admin user] MF_INFLUX_WRITER_DB_PASS=[InfluxDB admin password] MF_INFLUX_WRITER_CHANNELS_CONFIG=[Configuration file path with channels list] $GOBIN/mainflux-influxdb ``` diff --git a/writers/influxdb/messages.go b/writers/influxdb/messages.go index 17d4cdfeb6..d85130fb81 100644 --- a/writers/influxdb/messages.go +++ b/writers/influxdb/messages.go @@ -4,10 +4,8 @@ package influxdb import ( - "errors" "math" "strconv" - "sync" "time" "github.com/mainflux/mainflux/writers" @@ -20,105 +18,43 @@ const pointName = "messages" var _ writers.MessageRepository = (*influxRepo)(nil) -var ( - errZeroValueSize = errors.New("zero value batch size") - errZeroValueTimeout = errors.New("zero value batch timeout") - errNilBatch = errors.New("nil batch") -) - type influxRepo struct { - client influxdata.Client - batch influxdata.BatchPoints - batchSize int - mu sync.Mutex - tick <-chan time.Time - cfg influxdata.BatchPointsConfig + client influxdata.Client + cfg influxdata.BatchPointsConfig } type fields map[string]interface{} type tags map[string]string // New returns new InfluxDB writer. -func New(client influxdata.Client, database string, batchSize int, batchTimeout time.Duration) (writers.MessageRepository, error) { - if batchSize <= 0 { - return &influxRepo{}, errZeroValueSize - } - - if batchTimeout <= 0 { - return &influxRepo{}, errZeroValueTimeout - } - - repo := &influxRepo{ +func New(client influxdata.Client, database string) writers.MessageRepository { + return &influxRepo{ client: client, cfg: influxdata.BatchPointsConfig{ Database: database, }, - batchSize: batchSize, - } - - var err error - repo.batch, err = influxdata.NewBatchPoints(repo.cfg) - if err != nil { - return &influxRepo{}, err } - - repo.tick = time.NewTicker(batchTimeout).C - go func() { - for { - <-repo.tick - // Nil point indicates that savePoint method is triggered by the ticker. - repo.savePoint(nil) - } - }() - - return repo, nil } -func (repo *influxRepo) savePoint(point *influxdata.Point) error { - repo.mu.Lock() - defer repo.mu.Unlock() - if repo.batch == nil { - return errNilBatch - } - - // Ignore ticker if there is nothing to save. - if len(repo.batch.Points()) == 0 && point == nil { - return nil +func (repo *influxRepo) Save(messages ...mainflux.Message) error { + pts, err := influxdata.NewBatchPoints(repo.cfg) + if err != nil { + return err } + for _, msg := range messages { + tgs, flds := repo.tagsOf(&msg), repo.fieldsOf(&msg) - if point != nil { - repo.batch.AddPoint(point) - } + sec, dec := math.Modf(msg.Time) + t := time.Unix(int64(sec), int64(dec*(1e9))) - if len(repo.batch.Points())%repo.batchSize == 0 || point == nil { - if err := repo.client.Write(repo.batch); err != nil { - return err - } - // It would be nice to reset ticker at this point, which - // implies creating a new ticker and goroutine. It would - // introduce unnecessary complexity with no justified benefits. - var err error - repo.batch, err = influxdata.NewBatchPoints(repo.cfg) + pt, err := influxdata.NewPoint(pointName, tgs, flds, t) if err != nil { return err } + pts.AddPoint(pt) } - return nil -} - -func (repo *influxRepo) Save(msg mainflux.Message) error { - tgs, flds := repo.tagsOf(&msg), repo.fieldsOf(&msg) - - sec, dec := math.Modf(msg.Time) - t := time.Unix(int64(sec), int64(dec*(1e9))) - - pt, err := influxdata.NewPoint(pointName, tgs, flds, t) - if err != nil { - return err - } - - return repo.savePoint(pt) + return repo.client.Write(pts) } func (repo *influxRepo) tagsOf(msg *mainflux.Message) tags { diff --git a/writers/influxdb/messages_test.go b/writers/influxdb/messages_test.go index 726c62da61..62aea168fb 100644 --- a/writers/influxdb/messages_test.go +++ b/writers/influxdb/messages_test.go @@ -12,7 +12,6 @@ import ( influxdata "github.com/influxdata/influxdb/client/v2" "github.com/mainflux/mainflux" log "github.com/mainflux/mainflux/logger" - "github.com/mainflux/mainflux/writers" writer "github.com/mainflux/mainflux/writers/influxdb" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -21,16 +20,14 @@ import ( const valueFields = 6 var ( - port string - testLog, _ = log.New(os.Stdout, log.Info.String()) - testDB = "test" - saveTimeout = 2 * time.Second - saveBatchSize = 20 - streamsSize = 250 - selectMsgs = fmt.Sprintf("SELECT * FROM test..messages") - dropMsgs = fmt.Sprintf("DROP SERIES FROM messages") - client influxdata.Client - clientCfg = influxdata.HTTPConfig{ + port string + testLog, _ = log.New(os.Stdout, log.Info.String()) + testDB = "test" + streamsSize = 250 + selectMsgs = "SELECT * FROM test..messages" + dropMsgs = "DROP SERIES FROM messages" + client influxdata.Client + clientCfg = influxdata.HTTPConfig{ Username: "test", Password: "test", } @@ -69,63 +66,23 @@ func queryDB(cmd string) ([][]interface{}, error) { return response.Results[0].Series[0].Values, nil } -func TestNew(t *testing.T) { - cases := []struct { - desc string - batchSize int - err error - batchTimeout time.Duration - errText string - }{ - { - desc: "Create writer with zero value batch size", - batchSize: 0, - batchTimeout: time.Duration(5 * time.Second), - errText: "zero value batch size", - }, - { - desc: "Create writer with zero value batch timeout", - batchSize: 5, - batchTimeout: time.Duration(0 * time.Second), - errText: "zero value batch timeout", - }, - } - - for _, tc := range cases { - _, err := writer.New(client, testDB, tc.batchSize, tc.batchTimeout) - assert.Equal(t, tc.errText, err.Error(), fmt.Sprintf("%s expected to have error \"%s\", but got \"%s\"", tc.desc, tc.errText, err)) - } -} - func TestSave(t *testing.T) { - // Set batch size to 1 to simulate single point insert. - repo, err := writer.New(client, testDB, 1, saveTimeout) - require.Nil(t, err, fmt.Sprintf("Creating new InfluxDB repo expected to succeed: %s.\n", err)) - - // Set batch size to value > 1 to simulate real batch. - repo1, err := writer.New(client, testDB, saveBatchSize, saveTimeout) - require.Nil(t, err, fmt.Sprintf("Creating new InfluxDB repo expected to succeed: %s.\n", err)) + repo := writer.New(client, testDB) cases := []struct { desc string - repo writers.MessageRepository msgsNum int expectedSize int - isBatch bool }{ { desc: "save a single message", - repo: repo, msgsNum: 1, expectedSize: 1, - isBatch: false, }, { desc: "save a batch of messages", - repo: repo1, msgsNum: streamsSize, - expectedSize: streamsSize - (streamsSize % saveBatchSize), - isBatch: true, + expectedSize: streamsSize, }, } @@ -135,6 +92,7 @@ func TestSave(t *testing.T) { require.Nil(t, err, fmt.Sprintf("Cleaning data from InfluxDB expected to succeed: %s.\n", err)) now := time.Now().Unix() + var msgs []mainflux.Message for i := 0; i < tc.msgsNum; i++ { // Mix possible values as well as value sum. count := i % valueFields @@ -150,28 +108,19 @@ func TestSave(t *testing.T) { case 4: msg.ValueSum = nil case 5: - msg.ValueSum = &mainflux.SumValue{Value: 45} + msg.ValueSum = &mainflux.SumValue{Value: 42} } msg.Time = float64(now + int64(i)) - - err := tc.repo.Save(msg) - assert.Nil(t, err, fmt.Sprintf("Save operation expected to succeed: %s.\n", err)) + msgs = append(msgs, msg) } + err = repo.Save(msgs...) + assert.Nil(t, err, fmt.Sprintf("Save operation expected to succeed: %s.\n", err)) + row, err := queryDB(selectMsgs) assert.Nil(t, err, fmt.Sprintf("Querying InfluxDB to retrieve data expected to succeed: %s.\n", err)) count := len(row) assert.Equal(t, tc.expectedSize, count, fmt.Sprintf("Expected to have %d messages saved, found %d instead.\n", tc.expectedSize, count)) - - if tc.isBatch { - // Sleep for `saveBatchTime` to trigger ticker and check if the reset of the data is saved. - time.Sleep(saveTimeout) - - row, err = queryDB(selectMsgs) - assert.Nil(t, err, fmt.Sprintf("Querying InfluxDB to retrieve data count expected to succeed: %s.\n", err)) - count = len(row) - assert.Equal(t, tc.msgsNum, count, fmt.Sprintf("Expected to have %d messages, found %d instead.\n", tc.msgsNum, count)) - } } } diff --git a/writers/messages.go b/writers/messages.go index 43e3039fa6..a3088177b0 100644 --- a/writers/messages.go +++ b/writers/messages.go @@ -10,5 +10,5 @@ type MessageRepository interface { // Save method is used to save published message. A non-nil // error is returned to indicate operation failure. - Save(mainflux.Message) error + Save(...mainflux.Message) error } diff --git a/writers/mongodb/README.md b/writers/mongodb/README.md index 43f5a5029f..175813457c 100644 --- a/writers/mongodb/README.md +++ b/writers/mongodb/README.md @@ -21,6 +21,7 @@ default values. ## Deployment ```yaml + version: "3.7" mongodb-writer: image: mainflux/mongodb-writer:[version] container_name: [instance name] diff --git a/writers/mongodb/messages.go b/writers/mongodb/messages.go index eb3c476f24..fec267ccef 100644 --- a/writers/mongodb/messages.go +++ b/writers/mongodb/messages.go @@ -43,40 +43,45 @@ func New(db *mongo.Database) writers.MessageRepository { return &mongoRepo{db} } -func (repo *mongoRepo) Save(msg mainflux.Message) error { +func (repo *mongoRepo) Save(messages ...mainflux.Message) error { coll := repo.db.Collection(collectionName) - m := message{ - Channel: msg.Channel, - Subtopic: msg.Subtopic, - Publisher: msg.Publisher, - Protocol: msg.Protocol, - Name: msg.Name, - Unit: msg.Unit, - Time: msg.Time, - UpdateTime: msg.UpdateTime, - Link: msg.Link, - } + var msgs []interface{} + for _, msg := range messages { + m := message{ + Channel: msg.Channel, + Subtopic: msg.Subtopic, + Publisher: msg.Publisher, + Protocol: msg.Protocol, + Name: msg.Name, + Unit: msg.Unit, + Time: msg.Time, + UpdateTime: msg.UpdateTime, + Link: msg.Link, + } - switch msg.Value.(type) { - case *mainflux.Message_FloatValue: - v := msg.GetFloatValue() - m.FloatValue = &v - case *mainflux.Message_StringValue: - v := msg.GetStringValue() - m.StringValue = &v - case *mainflux.Message_DataValue: - v := msg.GetDataValue() - m.DataValue = &v - case *mainflux.Message_BoolValue: - v := msg.GetBoolValue() - m.BoolValue = &v - } + switch msg.Value.(type) { + case *mainflux.Message_FloatValue: + v := msg.GetFloatValue() + m.FloatValue = &v + case *mainflux.Message_StringValue: + v := msg.GetStringValue() + m.StringValue = &v + case *mainflux.Message_DataValue: + v := msg.GetDataValue() + m.DataValue = &v + case *mainflux.Message_BoolValue: + v := msg.GetBoolValue() + m.BoolValue = &v + } + + if msg.GetValueSum() != nil { + valueSum := msg.GetValueSum().Value + m.ValueSum = &valueSum + } - if msg.GetValueSum() != nil { - valueSum := msg.GetValueSum().Value - m.ValueSum = &valueSum + msgs = append(msgs, m) } - _, err := coll.InsertOne(context.Background(), m) + _, err := coll.InsertMany(context.Background(), msgs) return err } diff --git a/writers/mongodb/messages_test.go b/writers/mongodb/messages_test.go index 616c6c4aaa..ac71451084 100644 --- a/writers/mongodb/messages_test.go +++ b/writers/mongodb/messages_test.go @@ -54,6 +54,7 @@ func TestSave(t *testing.T) { repo := mongodb.New(db) now := time.Now().Unix() + var msgs []mainflux.Message for i := 0; i < msgsNum; i++ { // Mix possible values as well as value sum. count := i % valueFields @@ -72,9 +73,10 @@ func TestSave(t *testing.T) { msg.ValueSum = &mainflux.SumValue{Value: 45} } msg.Time = float64(now + int64(i)) - - err = repo.Save(msg) + msgs = append(msgs, msg) } + + err = repo.Save(msgs...) assert.Nil(t, err, fmt.Sprintf("Save operation expected to succeed: %s.\n", err)) count, err := db.Collection(collection).CountDocuments(context.Background(), bson.D{}) diff --git a/writers/postgres/README.md b/writers/postgres/README.md index 5d83d0d8e4..2d939636cb 100644 --- a/writers/postgres/README.md +++ b/writers/postgres/README.md @@ -27,6 +27,7 @@ default values. ## Deployment ```yaml + version: "3.7" postgres-writer: image: mainflux/postgres-writer:[version] container_name: [instance name] diff --git a/writers/postgres/messages.go b/writers/postgres/messages.go index 45515614a6..81850982d8 100644 --- a/writers/postgres/messages.go +++ b/writers/postgres/messages.go @@ -4,6 +4,7 @@ package postgres import ( + "context" "errors" "github.com/gofrs/uuid" @@ -31,7 +32,7 @@ func New(db *sqlx.DB) writers.MessageRepository { return &postgresRepo{db: db} } -func (pr postgresRepo) Save(msg mainflux.Message) error { +func (pr postgresRepo) Save(messages ...mainflux.Message) error { q := `INSERT INTO messages (id, channel, subtopic, publisher, protocol, name, unit, value, string_value, bool_value, data_value, value_sum, time, update_time, link) @@ -39,24 +40,31 @@ func (pr postgresRepo) Save(msg mainflux.Message) error { :value, :string_value, :bool_value, :data_value, :value_sum, :time, :update_time, :link);` - dbth, err := toDBMessage(msg) + tx, err := pr.db.BeginTxx(context.Background(), nil) if err != nil { return err } - if _, err := pr.db.NamedExec(q, dbth); err != nil { - pqErr, ok := err.(*pq.Error) - if ok { - switch pqErr.Code.Name() { - case errInvalid: - return ErrInvalidMessage - } + for _, msg := range messages { + dbth, err := toDBMessage(msg) + if err != nil { + return err } - return err + if _, err := tx.NamedExec(q, dbth); err != nil { + pqErr, ok := err.(*pq.Error) + if ok { + switch pqErr.Code.Name() { + case errInvalid: + return ErrInvalidMessage + } + } + + return err + } } - return nil + return tx.Commit() } type dbMessage struct { diff --git a/writers/postgres/messages_test.go b/writers/postgres/messages_test.go index f7223613b6..9b2d3b7c46 100644 --- a/writers/postgres/messages_test.go +++ b/writers/postgres/messages_test.go @@ -34,6 +34,7 @@ func TestMessageSave(t *testing.T) { msg.Publisher = pubid.String() now := time.Now().Unix() + var msgs []mainflux.Message for i := 0; i < msgsNum; i++ { // Mix possible values as well as value sum. count := i % valueFields @@ -52,9 +53,9 @@ func TestMessageSave(t *testing.T) { msg.ValueSum = &mainflux.SumValue{Value: 45} } msg.Time = float64(now + int64(i)) - - err := messageRepo.Save(msg) - assert.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err)) - + msgs = append(msgs, msg) } + + err = messageRepo.Save(msgs...) + assert.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err)) } diff --git a/writers/writer.go b/writers/writer.go index af6619f064..fd787d0e96 100644 --- a/writers/writer.go +++ b/writers/writer.go @@ -9,41 +9,52 @@ import ( "github.com/gogo/protobuf/proto" "github.com/mainflux/mainflux" log "github.com/mainflux/mainflux/logger" + "github.com/mainflux/mainflux/normalizer" nats "github.com/nats-io/go-nats" ) type consumer struct { - nc *nats.Conn - channels map[string]bool - repo MessageRepository - logger log.Logger + nc *nats.Conn + channels map[string]bool + repo MessageRepository + normalizer normalizer.Service + logger log.Logger } // Start method starts to consume normalized messages received from NATS. -func Start(nc *nats.Conn, repo MessageRepository, queue string, channels map[string]bool, logger log.Logger) error { +func Start(nc *nats.Conn, repo MessageRepository, norm normalizer.Service, queue string, channels map[string]bool, logger log.Logger) error { c := consumer{ - nc: nc, - channels: channels, - repo: repo, - logger: logger, + nc: nc, + channels: channels, + repo: repo, + normalizer: norm, + logger: logger, } - _, err := nc.QueueSubscribe(mainflux.OutputSenML, queue, c.consume) + _, err := nc.QueueSubscribe(mainflux.InputChannels, queue, c.consume) return err } func (c *consumer) consume(m *nats.Msg) { - msg := &mainflux.Message{} - if err := proto.Unmarshal(m.Data, msg); err != nil { + var msg mainflux.RawMessage + if err := proto.Unmarshal(m.Data, &msg); err != nil { c.logger.Warn(fmt.Sprintf("Failed to unmarshal received message: %s", err)) return } - if !c.channelExists(msg.GetChannel()) { + norm, err := c.normalizer.Normalize(msg) + if err != nil { + c.logger.Warn(fmt.Sprintf("Failed to normalize received message: %s", err)) return } + var msgs []mainflux.Message + for _, v := range norm { + if c.channelExists(v.GetChannel()) { + msgs = append(msgs, v) + } + } - if err := c.repo.Save(*msg); err != nil { + if err := c.repo.Save(msgs...); err != nil { c.logger.Warn(fmt.Sprintf("Failed to save message: %s", err)) return }