diff --git a/Makefile b/Makefile index 1a138ced49..bde37e8fb3 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,7 @@ # SPDX-License-Identifier: Apache-2.0 BUILD_DIR = build -SERVICES = users things http normalizer ws coap lora influxdb-writer influxdb-reader mongodb-writer mongodb-reader cassandra-writer cassandra-reader postgres-writer postgres-reader cli bootstrap opcua +SERVICES = users things http ws coap lora normalizer influxdb-writer influxdb-reader mongodb-writer mongodb-reader cassandra-writer cassandra-reader postgres-writer postgres-reader cli bootstrap opcua DOCKERS = $(addprefix docker_,$(SERVICES)) DOCKERS_DEV = $(addprefix docker_dev_,$(SERVICES)) CGO_ENABLED ?= 0 diff --git a/cmd/cassandra-writer/main.go b/cmd/cassandra-writer/main.go index 81de29964d..fa9cce3fa5 100644 --- a/cmd/cassandra-writer/main.go +++ b/cmd/cassandra-writer/main.go @@ -19,6 +19,7 @@ import ( "github.com/gocql/gocql" "github.com/mainflux/mainflux" "github.com/mainflux/mainflux/logger" + "github.com/mainflux/mainflux/normalizer" "github.com/mainflux/mainflux/writers" "github.com/mainflux/mainflux/writers/api" "github.com/mainflux/mainflux/writers/cassandra" @@ -74,7 +75,8 @@ func main() { defer session.Close() repo := newService(session, logger) - if err := writers.Start(nc, repo, svcName, cfg.channels, logger); err != nil { + norm := normalizer.New() + if err := writers.Start(nc, repo, norm, svcName, cfg.channels, logger); err != nil { logger.Error(fmt.Sprintf("Failed to create Cassandra writer: %s", err)) } diff --git a/cmd/influxdb-writer/main.go b/cmd/influxdb-writer/main.go index 7c28a5bf7c..02f744d5d3 100644 --- a/cmd/influxdb-writer/main.go +++ b/cmd/influxdb-writer/main.go @@ -10,15 +10,14 @@ import ( "net/http" "os" "os/signal" - "strconv" "syscall" - "time" "github.com/BurntSushi/toml" kitprometheus "github.com/go-kit/kit/metrics/prometheus" influxdata "github.com/influxdata/influxdb/client/v2" "github.com/mainflux/mainflux" "github.com/mainflux/mainflux/logger" + "github.com/mainflux/mainflux/normalizer" "github.com/mainflux/mainflux/writers" "github.com/mainflux/mainflux/writers/api" "github.com/mainflux/mainflux/writers/influxdb" @@ -29,43 +28,37 @@ import ( const ( svcName = "influxdb-writer" - defNatsURL = nats.DefaultURL - defLogLevel = "error" - defPort = "8180" - defBatchSize = "5000" - defBatchTimeout = "5" - defDBName = "mainflux" - defDBHost = "localhost" - defDBPort = "8086" - defDBUser = "mainflux" - defDBPass = "mainflux" - defChanCfgPath = "/config/channels.toml" - - envNatsURL = "MF_NATS_URL" - envLogLevel = "MF_INFLUX_WRITER_LOG_LEVEL" - envPort = "MF_INFLUX_WRITER_PORT" - envBatchSize = "MF_INFLUX_WRITER_BATCH_SIZE" - envBatchTimeout = "MF_INFLUX_WRITER_BATCH_TIMEOUT" - envDBName = "MF_INFLUX_WRITER_DB_NAME" - envDBHost = "MF_INFLUX_WRITER_DB_HOST" - envDBPort = "MF_INFLUX_WRITER_DB_PORT" - envDBUser = "MF_INFLUX_WRITER_DB_USER" - envDBPass = "MF_INFLUX_WRITER_DB_PASS" - envChanCfgPath = "MF_INFLUX_WRITER_CHANNELS_CONFIG" + defNatsURL = nats.DefaultURL + defLogLevel = "error" + defPort = "8180" + defDBName = "mainflux" + defDBHost = "localhost" + defDBPort = "8086" + defDBUser = "mainflux" + defDBPass = "mainflux" + defChanCfgPath = "/config/channels.toml" + + envNatsURL = "MF_NATS_URL" + envLogLevel = "MF_INFLUX_WRITER_LOG_LEVEL" + envPort = "MF_INFLUX_WRITER_PORT" + envDBName = "MF_INFLUX_WRITER_DB_NAME" + envDBHost = "MF_INFLUX_WRITER_DB_HOST" + envDBPort = "MF_INFLUX_WRITER_DB_PORT" + envDBUser = "MF_INFLUX_WRITER_DB_USER" + envDBPass = "MF_INFLUX_WRITER_DB_PASS" + envChanCfgPath = "MF_INFLUX_WRITER_CHANNELS_CONFIG" ) type config struct { - natsURL string - logLevel string - port string - batchSize string - batchTimeout string - dbName string - dbHost string - dbPort string - dbUser string - dbPass string - channels map[string]bool + natsURL string + logLevel string + port string + dbName string + dbHost string + dbPort string + dbUser string + dbPass string + channels map[string]bool } func main() { @@ -90,29 +83,13 @@ func main() { } defer client.Close() - batchTimeout, err := strconv.Atoi(cfg.batchTimeout) - if err != nil { - logger.Error(fmt.Sprintf("Invalid value for batch timeout: %s", err)) - os.Exit(1) - } - - batchSize, err := strconv.Atoi(cfg.batchSize) - if err != nil { - logger.Error(fmt.Sprintf("Invalid value of batch size: %s", err)) - os.Exit(1) - } - - timeout := time.Duration(batchTimeout) * time.Second - repo, err := influxdb.New(client, cfg.dbName, batchSize, timeout) - if err != nil { - logger.Error(fmt.Sprintf("Failed to create InfluxDB writer: %s", err)) - os.Exit(1) - } + repo := influxdb.New(client, cfg.dbName) counter, latency := makeMetrics() repo = api.LoggingMiddleware(repo, logger) repo = api.MetricsMiddleware(repo, counter, latency) - if err := writers.Start(nc, repo, svcName, cfg.channels, logger); err != nil { + norm := normalizer.New() + if err := writers.Start(nc, repo, norm, svcName, cfg.channels, logger); err != nil { logger.Error(fmt.Sprintf("Failed to start InfluxDB writer: %s", err)) os.Exit(1) } @@ -133,17 +110,15 @@ func main() { func loadConfigs() (config, influxdata.HTTPConfig) { chanCfgPath := mainflux.Env(envChanCfgPath, defChanCfgPath) cfg := config{ - natsURL: mainflux.Env(envNatsURL, defNatsURL), - logLevel: mainflux.Env(envLogLevel, defLogLevel), - port: mainflux.Env(envPort, defPort), - batchSize: mainflux.Env(envBatchSize, defBatchSize), - batchTimeout: mainflux.Env(envBatchTimeout, defBatchTimeout), - dbName: mainflux.Env(envDBName, defDBName), - dbHost: mainflux.Env(envDBHost, defDBHost), - dbPort: mainflux.Env(envDBPort, defDBPort), - dbUser: mainflux.Env(envDBUser, defDBUser), - dbPass: mainflux.Env(envDBPass, defDBPass), - channels: loadChansConfig(chanCfgPath), + natsURL: mainflux.Env(envNatsURL, defNatsURL), + logLevel: mainflux.Env(envLogLevel, defLogLevel), + port: mainflux.Env(envPort, defPort), + dbName: mainflux.Env(envDBName, defDBName), + dbHost: mainflux.Env(envDBHost, defDBHost), + dbPort: mainflux.Env(envDBPort, defDBPort), + dbUser: mainflux.Env(envDBUser, defDBUser), + dbPass: mainflux.Env(envDBPass, defDBPass), + channels: loadChansConfig(chanCfgPath), } clientCfg := influxdata.HTTPConfig{ diff --git a/cmd/mongodb-writer/main.go b/cmd/mongodb-writer/main.go index 4fca1182a5..363e54ac6f 100644 --- a/cmd/mongodb-writer/main.go +++ b/cmd/mongodb-writer/main.go @@ -17,6 +17,7 @@ import ( kitprometheus "github.com/go-kit/kit/metrics/prometheus" "github.com/mainflux/mainflux" "github.com/mainflux/mainflux/logger" + "github.com/mainflux/mainflux/normalizer" "github.com/mainflux/mainflux/writers" "github.com/mainflux/mainflux/writers/api" "github.com/mainflux/mainflux/writers/mongodb" @@ -84,7 +85,8 @@ func main() { counter, latency := makeMetrics() repo = api.LoggingMiddleware(repo, logger) repo = api.MetricsMiddleware(repo, counter, latency) - if err := writers.Start(nc, repo, svcName, cfg.channels, logger); err != nil { + norm := normalizer.New() + if err := writers.Start(nc, repo, norm, svcName, cfg.channels, logger); err != nil { logger.Error(fmt.Sprintf("Failed to start MongoDB writer: %s", err)) os.Exit(1) } diff --git a/cmd/postgres-writer/main.go b/cmd/postgres-writer/main.go index a616bcea21..a0320f94fe 100644 --- a/cmd/postgres-writer/main.go +++ b/cmd/postgres-writer/main.go @@ -17,6 +17,7 @@ import ( "github.com/jmoiron/sqlx" "github.com/mainflux/mainflux" "github.com/mainflux/mainflux/logger" + "github.com/mainflux/mainflux/normalizer" "github.com/mainflux/mainflux/writers" "github.com/mainflux/mainflux/writers/api" "github.com/mainflux/mainflux/writers/postgres" @@ -80,7 +81,8 @@ func main() { defer db.Close() repo := newService(db, logger) - if err = writers.Start(nc, repo, svcName, cfg.channels, logger); err != nil { + norm := normalizer.New() + if err = writers.Start(nc, repo, norm, svcName, cfg.channels, logger); err != nil { logger.Error(fmt.Sprintf("Failed to create Postgres writer: %s", err)) } diff --git a/docker/addons/bootstrap/docker-compose.yml b/docker/addons/bootstrap/docker-compose.yml index 7c461b8543..287ef038b4 100644 --- a/docker/addons/bootstrap/docker-compose.yml +++ b/docker/addons/bootstrap/docker-compose.yml @@ -1,3 +1,11 @@ +# Copyright (c) Mainflux +# SPDX-License-Identifier: Apache-2.0 + +# This docker-compose file contains optional bootstrap services. Since it's optional, this file is +# dependent of docker-compose file from /docker. In order to run this services, execute command: +# docker-compose -f docker/docker-compose.yml -f docker/addons/bootstrap/docker-compose.yml up +# from project root. + version: "3.7" networks: diff --git a/docker/addons/cassandra-reader/docker-compose.yml b/docker/addons/cassandra-reader/docker-compose.yml index d5d5fc1d68..6093cc54ad 100644 --- a/docker/addons/cassandra-reader/docker-compose.yml +++ b/docker/addons/cassandra-reader/docker-compose.yml @@ -1,10 +1,10 @@ -### -# This docker-compose file contains optional Cassandra and cassandra-reader. Since these are optional, this file is -# dependent of docker-compose file from /docker. In order to run -# these optional service, execute command: +# Copyright (c) Mainflux +# SPDX-License-Identifier: Apache-2.0 + +# This docker-compose file contains optional cassandra-reader. Since it's optional, this file is +# dependent of docker-compose file from /docker. In order to run this service, execute command: # docker-compose -f docker/docker-compose.yml -f docker/addons/cassandra-reader/docker-compose.yml up # from project root. -### version: "3.7" diff --git a/docker/addons/cassandra-writer/docker-compose.yml b/docker/addons/cassandra-writer/docker-compose.yml index d4ec96b04e..81e10c3e4e 100644 --- a/docker/addons/cassandra-writer/docker-compose.yml +++ b/docker/addons/cassandra-writer/docker-compose.yml @@ -1,10 +1,10 @@ -### +# Copyright (c) Mainflux +# SPDX-License-Identifier: Apache-2.0 + # This docker-compose file contains optional Cassandra and cassandra-writer. Since these are optional, this file is -# dependent of docker-compose file from /docker. In order to run -# these optional service, execute command: +# dependent of docker-compose file from /docker. In order to run these services, execute command: # docker-compose -f docker/docker-compose.yml -f docker/addons/cassandra-writer/docker-compose.yml up # from project root. -### version: "3.7" diff --git a/docker/addons/influxdb-reader/docker-compose.yml b/docker/addons/influxdb-reader/docker-compose.yml index 36b687958a..bfd3882339 100644 --- a/docker/addons/influxdb-reader/docker-compose.yml +++ b/docker/addons/influxdb-reader/docker-compose.yml @@ -1,7 +1,10 @@ +# Copyright (c) Mainflux +# SPDX-License-Identifier: Apache-2.0 + ### # This docker-compose file contains optional InfluxDB-reader service for the Mainflux # platform. Since this service is optional, this file is dependent on the docker-compose.yml -# file from /docker/. In order to run InfluxDB-reader service, core services, +# file from /docker/. In order to run this service, core services, # as well as the network from the core composition, should be already running. ### diff --git a/docker/addons/influxdb-writer/docker-compose.yml b/docker/addons/influxdb-writer/docker-compose.yml index a1a179238e..02582ab105 100644 --- a/docker/addons/influxdb-writer/docker-compose.yml +++ b/docker/addons/influxdb-writer/docker-compose.yml @@ -1,9 +1,10 @@ -### +# Copyright (c) Mainflux +# SPDX-License-Identifier: Apache-2.0 + # This docker-compose file contains optional InfluxDB, InfluxDB-writer and Grafana services # for the Mainflux platform. Since this services are optional, this file is dependent on the # docker-compose.yml file from /docker/. In order to run these services, # core services, as well as the network from the core composition, should be already running. -### version: "3.7" @@ -16,7 +17,6 @@ volumes: mainflux-grafana-volume: services: - influxdb: image: influxdb:1.6.4-alpine container_name: mainflux-influxdb diff --git a/docker/addons/lora-adapter/docker-compose.yml b/docker/addons/lora-adapter/docker-compose.yml index 186279e726..fdb9231f9f 100644 --- a/docker/addons/lora-adapter/docker-compose.yml +++ b/docker/addons/lora-adapter/docker-compose.yml @@ -1,9 +1,10 @@ -### +# Copyright (c) Mainflux +# SPDX-License-Identifier: Apache-2.0 + # This docker-compose file contains optional lora-adapter and lora-redis services # for the Mainflux platform. Since this services are optional, this file is dependent on the # docker-compose.yml file from /docker/. In order to run these services, # core services, as well as the network from the core composition, should be already running. -### version: "3.7" diff --git a/docker/addons/mongodb-reader/docker-compose.yml b/docker/addons/mongodb-reader/docker-compose.yml index 5a3f24280f..1886aa2424 100644 --- a/docker/addons/mongodb-reader/docker-compose.yml +++ b/docker/addons/mongodb-reader/docker-compose.yml @@ -1,10 +1,11 @@ -### +# Copyright (c) Mainflux +# SPDX-License-Identifier: Apache-2.0 + # This docker-compose file contains optional MongoDB-reader service # for Mainflux platform. Since these are optional, this file is dependent of docker-compose file -# from /docker. In order to run these optional service, execute command: +# from /docker. In order to run this service, execute command: # docker-compose -f docker/docker-compose.yml -f docker/addons/mongodb-reader/docker-compose.yml up # from project root. MongoDB service is defined in docker/addons/mongodb-writer/docker-compose.yml. -### version: "3.7" diff --git a/docker/addons/mongodb-writer/docker-compose.yml b/docker/addons/mongodb-writer/docker-compose.yml index ef8ef1fd87..d408db56a8 100644 --- a/docker/addons/mongodb-writer/docker-compose.yml +++ b/docker/addons/mongodb-writer/docker-compose.yml @@ -1,11 +1,12 @@ -### +# Copyright (c) Mainflux +# SPDX-License-Identifier: Apache-2.0 + # This docker-compose file contains optional MongoDB and MongoDB-writer services # for Mainflux platform. Since these are optional, this file is dependent of docker-compose file -# from /docker. In order to run these optional service, execute command: +# from /docker. In order to run these services, execute command: # docker-compose -f docker/docker-compose.yml -f docker/addons/mongodb-writer/docker-compose.yml up # from project root. MongoDB default port (27017) is exposed, so you can use various tools for database # inspection and data visualization. -### version: "3.7" diff --git a/docker/addons/normalizer/docker-compose.yml b/docker/addons/normalizer/docker-compose.yml new file mode 100644 index 0000000000..f02f535369 --- /dev/null +++ b/docker/addons/normalizer/docker-compose.yml @@ -0,0 +1,29 @@ +# Copyright (c) Mainflux +# SPDX-License-Identifier: Apache-2.0 + +# This docker-compose file contains optional lora-adapter and lora-redis services +# for the Mainflux platform. Since this service is optional, this file is dependent on the +# docker-compose.yml file from /docker/. In order to run this service, +# core services, as well as the network from the core composition, should be already running. + +version: "3.7" + +networks: + docker_mainflux-base-net: + external: true + +services: + normalizer: + image: mainflux/normalizer:latest + container_name: mainflux-normalizer + restart: on-failure + depends_on: + - nats + environment: + MF_NORMALIZER_LOG_LEVEL: ${MF_NORMALIZER_LOG_LEVEL} + MF_NATS_URL: ${MF_NATS_URL} + MF_NORMALIZER_PORT: ${MF_NORMALIZER_PORT} + ports: + - ${MF_NORMALIZER_PORT}:${MF_NORMALIZER_PORT} + networks: + - mainflux-base-net diff --git a/docker/addons/opcua-adapter/docker-compose.yml b/docker/addons/opcua-adapter/docker-compose.yml index 7a024e89ef..6e450f9bad 100644 --- a/docker/addons/opcua-adapter/docker-compose.yml +++ b/docker/addons/opcua-adapter/docker-compose.yml @@ -1,9 +1,10 @@ -### +# Copyright (c) Mainflux +# SPDX-License-Identifier: Apache-2.0 + # This docker-compose file contains optional opcua-adapter and opcua-redis services # for the Mainflux platform. Since this services are optional, this file is dependent on the # docker-compose.yml file from /docker/. In order to run these services, # core services, as well as the network from the core composition, should be already running. -### version: "3.7" diff --git a/docker/addons/postgres-reader/docker-compose.yml b/docker/addons/postgres-reader/docker-compose.yml index e9a62bf199..236feaf8e5 100644 --- a/docker/addons/postgres-reader/docker-compose.yml +++ b/docker/addons/postgres-reader/docker-compose.yml @@ -1,10 +1,11 @@ -### +# Copyright (c) Mainflux +# SPDX-License-Identifier: Apache-2.0 + # This docker-compose file contains optional Postgres-reader service for Mainflux platform. # Since this service is optional, this file is dependent of docker-compose.yml file -# from /docker. In order to run these optional service, execute command: +# from /docker. In order to run this service, execute command: # docker-compose -f docker/docker-compose.yml -f docker/addons/postgres-reader/docker-compose.yml up # from project root. -### version: "3.7" diff --git a/docker/addons/postgres-writer/docker-compose.yml b/docker/addons/postgres-writer/docker-compose.yml index 22ea606dea..8859551d75 100644 --- a/docker/addons/postgres-writer/docker-compose.yml +++ b/docker/addons/postgres-writer/docker-compose.yml @@ -1,11 +1,12 @@ -### +# Copyright (c) Mainflux +# SPDX-License-Identifier: Apache-2.0 + # This docker-compose file contains optional Postgres and Postgres-writer services # for Mainflux platform. Since these are optional, this file is dependent of docker-compose file -# from /docker. In order to run these optional service, execute command: +# from /docker. In order to run these services, execute command: # docker-compose -f docker/docker-compose.yml -f docker/addons/postgres-writer/docker-compose.yml up # from project root. PostgreSQL default port (5432) is exposed, so you can use various tools for database # inspection and data visualization. -### version: "3.7" diff --git a/docs/architecture.md b/docs/architecture.md index 4be01baeb1..01d99e1627 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -6,13 +6,12 @@ Mainflux IoT platform is comprised of the following services: |:--------------------------------------------------------------------------|:------------------------------------------------------------------------| | [users](https://github.com/mainflux/mainflux/tree/master/users) | Manages platform's users and auth concerns | | [things](https://github.com/mainflux/mainflux/tree/master/things) | Manages platform's things, channels and access policies | -| [normalizer](https://github.com/mainflux/mainflux/tree/master/normalizer) | Normalizes SenML messages and generates the "processed" messages stream | | [http-adapter](https://github.com/mainflux/mainflux/tree/master/http) | Provides an HTTP interface for accessing communication channels | | [ws-adapter](https://github.com/mainflux/mainflux/tree/master/ws) | Provides a WebSocket interface for accessing communication channels | | [mqtt-adapter](https://github.com/mainflux/mainflux/tree/master/mqtt) | Provides an MQTT interface for accessing communication channels | - | [coap-adapter](https://github.com/mainflux/mainflux/tree/master/coap) | Provides a CoAP interface for accessing communication channels | - | [lora-adapter](https://github.com/mainflux/mainflux/tree/master/lora) | Provides a LoRa Server forwarder for accessing communication channels | - | [mainflux-cli](https://github.com/mainflux/mainflux/tree/master/cli) | Command line interface | +| [coap-adapter](https://github.com/mainflux/mainflux/tree/master/coap) | Provides a CoAP interface for accessing communication channels | +| [lora-adapter](https://github.com/mainflux/mainflux/tree/master/lora) | Provides a LoRa Server forwarder for accessing communication channels | +| [mainflux-cli](https://github.com/mainflux/mainflux/tree/master/cli) | Command line interface | ![arch](img/architecture.jpg) diff --git a/docs/getting-started.md b/docs/getting-started.md index 9714aa5b92..e33fd7dce7 100644 --- a/docs/getting-started.md +++ b/docs/getting-started.md @@ -101,7 +101,6 @@ In the Mainflux system terminal you should see following logs: ```bash mainflux-things | {"level":"info","message":"Method can_access for channel b7bfc4b6-c18d-47c5-b343-98235c5acc19 and thing 513d02d2-16c1-4f23-98be-9e12f8fee898 took 1.410194ms to complete without errors.","ts":"2019-01-08T22:19:30.148097648Z"} mainflux-http | {"level":"info","message":"Method publish took 336.685µs to complete without errors.","ts":"2019-01-08T22:19:30.148689601Z"} -mainflux-normalizer | {"level":"info","message":"Method normalize took 108.126µs to complete without errors.","ts":"2019-01-08T22:19:30.149500543Z"} ``` -This proves that messages have been correctly sent through the system, via the protocol adapter (`mainflux-http`) and the `normalizer` service has correctly parsed the messages. +This proves that messages have been correctly sent through the system via the protocol adapter (`mainflux-http`). diff --git a/k8s/README.md b/k8s/README.md index 4621ced400..af9e1f51d5 100644 --- a/k8s/README.md +++ b/k8s/README.md @@ -43,15 +43,7 @@ kubectl create -f k8s/mainflux/things-postgres.yml kubectl create -f k8s/mainflux/things.yml ``` -### 4. Setup Normalizer service - -- Deploy Normalizer service: - -``` -kubectl create -f k8s/mainflux/normalizer.yml -``` - -### 5. Setup adapter services +### 4. Setup adapter services - Deploy adapter service: @@ -60,7 +52,7 @@ kubectl create -f k8s/mainflux/tcp-services.yml kubectl create -f k8s/mainflux/.yml ``` -### 6. Setup Dashflux +### 5. Setup Dashflux - Deploy Dashflux service: @@ -68,6 +60,6 @@ kubectl create -f k8s/mainflux/.yml kubectl create -f k8s/mainflux/dashflux.yml ``` -### 7. Configure Internet Access +### 6. Configure Internet Access Configure NAT on your Firewall to forward ports 80 (HTTP) and 443 (HTTPS) to nginx ingress service diff --git a/normalizer/README.md b/normalizer/README.md index 5e20a52513..ba0eb36df7 100644 --- a/normalizer/README.md +++ b/normalizer/README.md @@ -2,6 +2,10 @@ Normalizer service consumes events published by adapters, normalizes SenML-formatted ones, and publishes them to the post-processing stream. +Normalizer can also be imported as a package and used independently for message normalization. +This reduces internal traffic because messages are not published back to the broker, but transformed +on the message consumer side. Mainflux (writers) [https://github.com/mainflux/mainflux/tree/master/writers] +are using Normalizer to preprocess messages before storing them. ## Configuration @@ -22,7 +26,7 @@ provides a compose file template that can be used to deploy the service containe locally: ```yaml -version: "2" +version: "3.7" services: normalizer: image: mainflux/normalizer:[version] diff --git a/normalizer/api/logging.go b/normalizer/api/logging.go index 16fc96d629..278c5dde2a 100644 --- a/normalizer/api/logging.go +++ b/normalizer/api/logging.go @@ -27,7 +27,7 @@ func LoggingMiddleware(svc normalizer.Service, logger logger.Logger) normalizer. } } -func (lm loggingMiddleware) Normalize(msg mainflux.RawMessage) (nd normalizer.NormalizedData, err error) { +func (lm loggingMiddleware) Normalize(msg mainflux.RawMessage) (msgs []mainflux.Message, err error) { defer func(begin time.Time) { message := fmt.Sprintf("Method normalize took %s to complete", time.Since(begin)) if err != nil { diff --git a/normalizer/api/metrics.go b/normalizer/api/metrics.go index 4d97ce2f50..5516d9c0e6 100644 --- a/normalizer/api/metrics.go +++ b/normalizer/api/metrics.go @@ -29,7 +29,7 @@ func MetricsMiddleware(svc normalizer.Service, counter metrics.Counter, latency } } -func (mm *metricsMiddleware) Normalize(msg mainflux.RawMessage) (normalizer.NormalizedData, error) { +func (mm *metricsMiddleware) Normalize(msg mainflux.RawMessage) ([]mainflux.Message, error) { defer func(begin time.Time) { mm.counter.With("method", "normalize").Add(1) mm.latency.With("method", "normalize").Observe(time.Since(begin).Seconds()) diff --git a/normalizer/nats/pubsub.go b/normalizer/nats/pubsub.go index 8c059d4ca2..eb8a5299d6 100644 --- a/normalizer/nats/pubsub.go +++ b/normalizer/nats/pubsub.go @@ -68,7 +68,7 @@ func (ps pubsub) publish(msg mainflux.RawMessage) error { } } - for _, v := range normalized.Messages { + for _, v := range normalized { data, err := proto.Marshal(&v) if err != nil { ps.logger.Warn(fmt.Sprintf("Marshalling failed: %s", err)) diff --git a/normalizer/normalizer.go b/normalizer/normalizer.go index c7c6aab785..99086d6263 100644 --- a/normalizer/normalizer.go +++ b/normalizer/normalizer.go @@ -4,8 +4,6 @@ package normalizer import ( - "strings" - "github.com/cisco/senml" "github.com/mainflux/mainflux" ) @@ -22,7 +20,7 @@ func New() Service { return normalizer{} } -func (n normalizer) Normalize(msg mainflux.RawMessage) (NormalizedData, error) { +func (n normalizer) Normalize(msg mainflux.RawMessage) ([]mainflux.Message, error) { format, ok := formats[msg.ContentType] if !ok { format = senml.JSON @@ -30,7 +28,7 @@ func (n normalizer) Normalize(msg mainflux.RawMessage) (NormalizedData, error) { raw, err := senml.Decode(msg.Payload, format) if err != nil { - return NormalizedData{}, err + return nil, err } normalized := senml.Normalize(raw) @@ -67,10 +65,5 @@ func (n normalizer) Normalize(msg mainflux.RawMessage) (NormalizedData, error) { msgs[k] = m } - output := strings.ToLower(msg.ContentType) - - return NormalizedData{ - ContentType: output, - Messages: msgs, - }, nil + return msgs, nil } diff --git a/normalizer/service.go b/normalizer/service.go index 41c91fef2b..04f07f76a4 100644 --- a/normalizer/service.go +++ b/normalizer/service.go @@ -8,11 +8,5 @@ import "github.com/mainflux/mainflux" // Service specifies API for normalizing messages. type Service interface { // Normalizes raw message to array of standard SenML messages. - Normalize(mainflux.RawMessage) (NormalizedData, error) -} - -// NormalizedData contains normalized messages and their content type. -type NormalizedData struct { - ContentType string - Messages []mainflux.Message + Normalize(mainflux.RawMessage) ([]mainflux.Message, error) } diff --git a/readers/api/logging.go b/readers/api/logging.go index 9fe3215875..96dfb7223b 100644 --- a/readers/api/logging.go +++ b/readers/api/logging.go @@ -28,9 +28,14 @@ func LoggingMiddleware(svc readers.MessageRepository, logger logger.Logger) read } } -func (lm *loggingMiddleware) ReadAll(chanID string, offset, limit uint64, query map[string]string) (readers.MessagesPage, error) { +func (lm *loggingMiddleware) ReadAll(chanID string, offset, limit uint64, query map[string]string) (page readers.MessagesPage, err error) { defer func(begin time.Time) { - lm.logger.Info(fmt.Sprintf(`Method read_all for offset %d and limit %d took %s to complete without errors.`, offset, limit, time.Since(begin))) + message := fmt.Sprintf("Method read_all for channel %s with offset %d and limit %d took %s to complete", chanID, offset, limit, time.Since(begin)) + if err != nil { + lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err)) + return + } + lm.logger.Info(fmt.Sprintf("%s without errors.", message)) }(time.Now()) return lm.svc.ReadAll(chanID, offset, limit, query) diff --git a/readers/cassandra/README.md b/readers/cassandra/README.md index c84697057b..c0d3cc0117 100644 --- a/readers/cassandra/README.md +++ b/readers/cassandra/README.md @@ -26,7 +26,7 @@ default values. ## Deployment ```yaml - version: "2" + version: "3.7" cassandra-reader: image: mainflux/cassandra-reader:[version] container_name: [instance name] diff --git a/readers/cassandra/messages_test.go b/readers/cassandra/messages_test.go index 11bcd4d860..232a9a75f9 100644 --- a/readers/cassandra/messages_test.go +++ b/readers/cassandra/messages_test.go @@ -65,15 +65,15 @@ func TestReadAll(t *testing.T) { msg.ValueSum = &mainflux.SumValue{Value: 45} } msg.Time = float64(now - int64(i)) - - err := writer.Save(msg) - require.Nil(t, err, fmt.Sprintf("failed to store message to Cassandra: %s", err)) messages = append(messages, msg) if count == 0 { subtopicMsgs = append(subtopicMsgs, msg) } } + err = writer.Save(messages...) + require.Nil(t, err, fmt.Sprintf("failed to store message to Cassandra: %s", err)) + reader := creaders.New(session) // Since messages are not saved in natural order, diff --git a/readers/influxdb/README.md b/readers/influxdb/README.md index 352213caf2..3d7fcacb78 100644 --- a/readers/influxdb/README.md +++ b/readers/influxdb/README.md @@ -24,7 +24,7 @@ default values. ## Deployment ```yaml - version: "2" + version: "3.7" influxdb-reader: image: mainflux/influxdb-reader:[version] container_name: [instance name] diff --git a/readers/influxdb/messages_test.go b/readers/influxdb/messages_test.go index ca0aecda00..051e225d7a 100644 --- a/readers/influxdb/messages_test.go +++ b/readers/influxdb/messages_test.go @@ -50,8 +50,7 @@ var ( ) func TestReadAll(t *testing.T) { - writer, err := writer.New(client, testDB, 1, time.Second) - require.Nil(t, err, fmt.Sprintf("Creating new InfluxDB writer expected to succeed: %s.\n", err)) + writer := writer.New(client, testDB) messages := []mainflux.Message{} subtopicMsgs := []mainflux.Message{} @@ -76,15 +75,15 @@ func TestReadAll(t *testing.T) { msg.ValueSum = &mainflux.SumValue{Value: 45} } msg.Time = float64(now - int64(i)) - - err := writer.Save(msg) - require.Nil(t, err, fmt.Sprintf("failed to store message to InfluxDB: %s", err)) messages = append(messages, msg) if count == 0 { subtopicMsgs = append(subtopicMsgs, msg) } } + err := writer.Save(messages...) + require.Nil(t, err, fmt.Sprintf("failed to store message to InfluxDB: %s", err)) + reader := reader.New(client, testDB) require.Nil(t, err, fmt.Sprintf("Creating new InfluxDB reader expected to succeed: %s.\n", err)) diff --git a/readers/mongodb/README.md b/readers/mongodb/README.md index 1780e70fda..422ee7fb09 100644 --- a/readers/mongodb/README.md +++ b/readers/mongodb/README.md @@ -23,7 +23,7 @@ default values. ## Deployment ```yaml - version: "2" + version: "3.7" mongodb-reader: image: mainflux/mongodb-reader:[version] container_name: [instance name] diff --git a/readers/mongodb/messages_test.go b/readers/mongodb/messages_test.go index ec23d517eb..b6bc847a4e 100644 --- a/readers/mongodb/messages_test.go +++ b/readers/mongodb/messages_test.go @@ -74,13 +74,13 @@ func TestReadAll(t *testing.T) { } msg.Time = float64(now - int64(i)) - err := writer.Save(msg) - require.Nil(t, err, fmt.Sprintf("failed to store message to MongoDB: %s", err)) messages = append(messages, msg) if count == 0 { subtopicMsgs = append(subtopicMsgs, msg) } } + err = writer.Save(messages...) + require.Nil(t, err, fmt.Sprintf("failed to store message to MongoDB: %s", err)) reader := mreaders.New(db) diff --git a/readers/postgres/README.md b/readers/postgres/README.md index 76fd10d1f5..1d83eb68ad 100644 --- a/readers/postgres/README.md +++ b/readers/postgres/README.md @@ -30,6 +30,7 @@ default values. ## Deployment ```yaml + version: "3.7" postgres-writer: image: mainflux/postgres-writer:[version] container_name: [instance name] diff --git a/readers/postgres/messages_test.go b/readers/postgres/messages_test.go index 965e756640..43b76fc127 100644 --- a/readers/postgres/messages_test.go +++ b/readers/postgres/messages_test.go @@ -61,14 +61,15 @@ func TestMessageReadAll(t *testing.T) { } msg.Time = float64(now - int64(i)) - err := messageRepo.Save(msg) - assert.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err)) messages = append(messages, msg) if count == 0 { subtopicMsgs = append(subtopicMsgs, msg) } } + err = messageRepo.Save(messages...) + assert.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err)) + reader := preader.New(db) // Since messages are not saved in natural order, diff --git a/topics.go b/topics.go index 0184f57ce5..a8d8747c7f 100644 --- a/topics.go +++ b/topics.go @@ -6,4 +6,9 @@ package mainflux // File topics.go contains all NATS subjects that are shared between services. // OutputSenML represents subject SenML messages will be published to. +// Messages published to this topic are Protobuf serialized Messages. const OutputSenML = "out.senml" + +// InputChannels represents subject all messages will be published to. +// Messages received on this topic are Protobuf serialized RawMessages. +const InputChannels = "channel.>" diff --git a/writers/README.md b/writers/README.md index cc83d40db9..b715a11901 100644 --- a/writers/README.md +++ b/writers/README.md @@ -1,7 +1,7 @@ # Writers Writers provide an implementation of various `message writers`. -Message writers are services that consume normalized (in `SenML` format) +Message writers are services that normalize (in `SenML` format) Mainflux messages and store them in specific data store. Writers are optional services and are treated as plugins. In order to diff --git a/writers/api/logging.go b/writers/api/logging.go index 435cca353b..903043449e 100644 --- a/writers/api/logging.go +++ b/writers/api/logging.go @@ -26,9 +26,9 @@ func LoggingMiddleware(svc writers.MessageRepository, logger log.Logger) writers return &loggingMiddleware{logger, svc} } -func (lm *loggingMiddleware) Save(msg mainflux.Message) (err error) { +func (lm *loggingMiddleware) Save(msgs ...mainflux.Message) (err error) { defer func(begin time.Time) { - message := fmt.Sprintf("Method Save took %s to complete", time.Since(begin)) + message := fmt.Sprintf("Method save took %s to complete", time.Since(begin)) if err != nil { lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err)) return @@ -36,5 +36,5 @@ func (lm *loggingMiddleware) Save(msg mainflux.Message) (err error) { lm.logger.Info(fmt.Sprintf("%s without errors.", message)) }(time.Now()) - return lm.svc.Save(msg) + return lm.svc.Save(msgs...) } diff --git a/writers/api/metrics.go b/writers/api/metrics.go index b247fd92cc..b655bc0116 100644 --- a/writers/api/metrics.go +++ b/writers/api/metrics.go @@ -27,10 +27,10 @@ func MetricsMiddleware(repo writers.MessageRepository, counter metrics.Counter, } } -func (mm *metricsMiddleware) Save(msg mainflux.Message) error { +func (mm *metricsMiddleware) Save(msgs ...mainflux.Message) error { defer func(begin time.Time) { mm.counter.With("method", "handle_message").Add(1) mm.latency.With("method", "handle_message").Observe(time.Since(begin).Seconds()) }(time.Now()) - return mm.repo.Save(msg) + return mm.repo.Save(msgs...) } diff --git a/writers/cassandra/README.md b/writers/cassandra/README.md index 6db3fde668..aef70ba15f 100644 --- a/writers/cassandra/README.md +++ b/writers/cassandra/README.md @@ -22,7 +22,7 @@ default values. ## Deployment ```yaml - version: "2" + version: "3.7" cassandra-writer: image: mainflux/cassandra-writer:[version] container_name: [instance name] diff --git a/writers/cassandra/messages.go b/writers/cassandra/messages.go index 0ea305212a..001e355377 100644 --- a/writers/cassandra/messages.go +++ b/writers/cassandra/messages.go @@ -20,37 +20,44 @@ func New(session *gocql.Session) writers.MessageRepository { return &cassandraRepository{session} } -func (cr *cassandraRepository) Save(msg mainflux.Message) error { +func (cr *cassandraRepository) Save(messages ...mainflux.Message) error { cql := `INSERT INTO messages (id, channel, subtopic, publisher, protocol, name, unit, value, string_value, bool_value, data_value, value_sum, time, update_time, link) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` id := gocql.TimeUUID() - var floatVal, valSum *float64 - var strVal, dataVal *string - var boolVal *bool - switch msg.Value.(type) { - case *mainflux.Message_FloatValue: - v := msg.GetFloatValue() - floatVal = &v - case *mainflux.Message_StringValue: - v := msg.GetStringValue() - strVal = &v - case *mainflux.Message_DataValue: - v := msg.GetDataValue() - dataVal = &v - case *mainflux.Message_BoolValue: - v := msg.GetBoolValue() - boolVal = &v + for _, msg := range messages { + var floatVal, valSum *float64 + var strVal, dataVal *string + var boolVal *bool + switch msg.Value.(type) { + case *mainflux.Message_FloatValue: + v := msg.GetFloatValue() + floatVal = &v + case *mainflux.Message_StringValue: + v := msg.GetStringValue() + strVal = &v + case *mainflux.Message_DataValue: + v := msg.GetDataValue() + dataVal = &v + case *mainflux.Message_BoolValue: + v := msg.GetBoolValue() + boolVal = &v + } + + if msg.GetValueSum() != nil { + v := msg.GetValueSum().GetValue() + valSum = &v + } + + err := cr.session.Query(cql, id, msg.GetChannel(), msg.GetSubtopic(), msg.GetPublisher(), + msg.GetProtocol(), msg.GetName(), msg.GetUnit(), floatVal, + strVal, boolVal, dataVal, valSum, msg.GetTime(), msg.GetUpdateTime(), msg.GetLink()).Exec() + if err != nil { + return err + } } - if msg.GetValueSum() != nil { - v := msg.GetValueSum().GetValue() - valSum = &v - } - - return cr.session.Query(cql, id, msg.GetChannel(), msg.GetSubtopic(), msg.GetPublisher(), - msg.GetProtocol(), msg.GetName(), msg.GetUnit(), floatVal, - strVal, boolVal, dataVal, valSum, msg.GetTime(), msg.GetUpdateTime(), msg.GetLink()).Exec() + return nil } diff --git a/writers/cassandra/messages_test.go b/writers/cassandra/messages_test.go index d3cd58096e..e26e878b05 100644 --- a/writers/cassandra/messages_test.go +++ b/writers/cassandra/messages_test.go @@ -36,6 +36,7 @@ func TestSave(t *testing.T) { repo := cassandra.New(session) now := time.Now().Unix() + var msgs []mainflux.Message for i := 0; i < msgsNum; i++ { // Mix possible values as well as value sum. count := i % valueFields @@ -54,8 +55,9 @@ func TestSave(t *testing.T) { msg.ValueSum = &mainflux.SumValue{Value: 45} } msg.Time = float64(now + int64(i)) - - err = repo.Save(msg) - assert.Nil(t, err, fmt.Sprintf("expected no error, got %s", err)) + msgs = append(msgs, msg) } + + err = repo.Save(msgs...) + assert.Nil(t, err, fmt.Sprintf("expected no error, got %s", err)) } diff --git a/writers/influxdb/README.md b/writers/influxdb/README.md index c4eaf93a1b..bdcc74e570 100644 --- a/writers/influxdb/README.md +++ b/writers/influxdb/README.md @@ -13,8 +13,6 @@ default values. | MF_NATS_URL | NATS instance URL | nats://localhost:4222 | | MF_INFLUX_WRITER_LOG_LEVEL | Log level for InfluxDB writer (debug, info, warn, error) | error | | MF_INFLUX_WRITER_PORT | Service HTTP port | 8180 | -| MF_INFLUX_WRITER_BATCH_SIZE | Size of the writer points batch | 5000 | -| MF_INFLUX_WRITER_BATCH_TIMEOUT | Time interval in seconds to flush the batch | 1 second | | MF_INFLUX_WRITER_DB_NAME | InfluxDB database name | mainflux | | MF_INFLUX_WRITER_DB_HOST | InfluxDB host | localhost | | MF_INFLUX_WRITER_DB_PORT | Default port of InfluxDB database | 8086 | @@ -25,7 +23,7 @@ default values. ## Deployment ```yaml - version: "2" + version: "3.7" influxdb-writer: image: mainflux/influxdb:[version] container_name: [instance name] @@ -36,8 +34,6 @@ default values. MF_NATS_URL: [NATS instance URL] MF_INFLUX_WRITER_LOG_LEVEL: [Influx writer log level] MF_INFLUX_WRITER_PORT: [Service HTTP port] - MF_INFLUX_WRITER_BATCH_SIZE: [Size of the writer points batch] - MF_INFLUX_WRITER_BATCH_TIMEOUT: [Time interval in seconds to flush the batch] MF_INFLUX_WRITER_DB_NAME: [InfluxDB name] MF_INFLUX_WRITER_DB_HOST: [InfluxDB host] MF_INFLUX_WRITER_DB_PORT: [InfluxDB port] @@ -66,7 +62,7 @@ make influxdb make install # Set the environment variables and run the service -MF_NATS_URL=[NATS instance URL] MF_INFLUX_WRITER_LOG_LEVEL=[Influx writer log level] MF_INFLUX_WRITER_PORT=[Service HTTP port] MF_INFLUX_WRITER_BATCH_SIZE=[Size of the writer points batch] MF_INFLUX_WRITER_BATCH_TIMEOUT=[Time interval in seconds to flush the batch] MF_INFLUX_WRITER_DB_NAME=[InfluxDB database name] MF_INFLUX_WRITER_DB_HOST=[InfluxDB database host] MF_INFLUX_WRITER_DB_PORT=[InfluxDB database port] MF_INFLUX_WRITER_DB_USER=[InfluxDB admin user] MF_INFLUX_WRITER_DB_PASS=[InfluxDB admin password] MF_INFLUX_WRITER_CHANNELS_CONFIG=[Configuration file path with channels list] $GOBIN/mainflux-influxdb +MF_NATS_URL=[NATS instance URL] MF_INFLUX_WRITER_LOG_LEVEL=[Influx writer log level] MF_INFLUX_WRITER_PORT=[Service HTTP port] MF_INFLUX_WRITER_DB_NAME=[InfluxDB database name] MF_INFLUX_WRITER_DB_HOST=[InfluxDB database host] MF_INFLUX_WRITER_DB_PORT=[InfluxDB database port] MF_INFLUX_WRITER_DB_USER=[InfluxDB admin user] MF_INFLUX_WRITER_DB_PASS=[InfluxDB admin password] MF_INFLUX_WRITER_CHANNELS_CONFIG=[Configuration file path with channels list] $GOBIN/mainflux-influxdb ``` diff --git a/writers/influxdb/messages.go b/writers/influxdb/messages.go index 17d4cdfeb6..d85130fb81 100644 --- a/writers/influxdb/messages.go +++ b/writers/influxdb/messages.go @@ -4,10 +4,8 @@ package influxdb import ( - "errors" "math" "strconv" - "sync" "time" "github.com/mainflux/mainflux/writers" @@ -20,105 +18,43 @@ const pointName = "messages" var _ writers.MessageRepository = (*influxRepo)(nil) -var ( - errZeroValueSize = errors.New("zero value batch size") - errZeroValueTimeout = errors.New("zero value batch timeout") - errNilBatch = errors.New("nil batch") -) - type influxRepo struct { - client influxdata.Client - batch influxdata.BatchPoints - batchSize int - mu sync.Mutex - tick <-chan time.Time - cfg influxdata.BatchPointsConfig + client influxdata.Client + cfg influxdata.BatchPointsConfig } type fields map[string]interface{} type tags map[string]string // New returns new InfluxDB writer. -func New(client influxdata.Client, database string, batchSize int, batchTimeout time.Duration) (writers.MessageRepository, error) { - if batchSize <= 0 { - return &influxRepo{}, errZeroValueSize - } - - if batchTimeout <= 0 { - return &influxRepo{}, errZeroValueTimeout - } - - repo := &influxRepo{ +func New(client influxdata.Client, database string) writers.MessageRepository { + return &influxRepo{ client: client, cfg: influxdata.BatchPointsConfig{ Database: database, }, - batchSize: batchSize, - } - - var err error - repo.batch, err = influxdata.NewBatchPoints(repo.cfg) - if err != nil { - return &influxRepo{}, err } - - repo.tick = time.NewTicker(batchTimeout).C - go func() { - for { - <-repo.tick - // Nil point indicates that savePoint method is triggered by the ticker. - repo.savePoint(nil) - } - }() - - return repo, nil } -func (repo *influxRepo) savePoint(point *influxdata.Point) error { - repo.mu.Lock() - defer repo.mu.Unlock() - if repo.batch == nil { - return errNilBatch - } - - // Ignore ticker if there is nothing to save. - if len(repo.batch.Points()) == 0 && point == nil { - return nil +func (repo *influxRepo) Save(messages ...mainflux.Message) error { + pts, err := influxdata.NewBatchPoints(repo.cfg) + if err != nil { + return err } + for _, msg := range messages { + tgs, flds := repo.tagsOf(&msg), repo.fieldsOf(&msg) - if point != nil { - repo.batch.AddPoint(point) - } + sec, dec := math.Modf(msg.Time) + t := time.Unix(int64(sec), int64(dec*(1e9))) - if len(repo.batch.Points())%repo.batchSize == 0 || point == nil { - if err := repo.client.Write(repo.batch); err != nil { - return err - } - // It would be nice to reset ticker at this point, which - // implies creating a new ticker and goroutine. It would - // introduce unnecessary complexity with no justified benefits. - var err error - repo.batch, err = influxdata.NewBatchPoints(repo.cfg) + pt, err := influxdata.NewPoint(pointName, tgs, flds, t) if err != nil { return err } + pts.AddPoint(pt) } - return nil -} - -func (repo *influxRepo) Save(msg mainflux.Message) error { - tgs, flds := repo.tagsOf(&msg), repo.fieldsOf(&msg) - - sec, dec := math.Modf(msg.Time) - t := time.Unix(int64(sec), int64(dec*(1e9))) - - pt, err := influxdata.NewPoint(pointName, tgs, flds, t) - if err != nil { - return err - } - - return repo.savePoint(pt) + return repo.client.Write(pts) } func (repo *influxRepo) tagsOf(msg *mainflux.Message) tags { diff --git a/writers/influxdb/messages_test.go b/writers/influxdb/messages_test.go index 726c62da61..62aea168fb 100644 --- a/writers/influxdb/messages_test.go +++ b/writers/influxdb/messages_test.go @@ -12,7 +12,6 @@ import ( influxdata "github.com/influxdata/influxdb/client/v2" "github.com/mainflux/mainflux" log "github.com/mainflux/mainflux/logger" - "github.com/mainflux/mainflux/writers" writer "github.com/mainflux/mainflux/writers/influxdb" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -21,16 +20,14 @@ import ( const valueFields = 6 var ( - port string - testLog, _ = log.New(os.Stdout, log.Info.String()) - testDB = "test" - saveTimeout = 2 * time.Second - saveBatchSize = 20 - streamsSize = 250 - selectMsgs = fmt.Sprintf("SELECT * FROM test..messages") - dropMsgs = fmt.Sprintf("DROP SERIES FROM messages") - client influxdata.Client - clientCfg = influxdata.HTTPConfig{ + port string + testLog, _ = log.New(os.Stdout, log.Info.String()) + testDB = "test" + streamsSize = 250 + selectMsgs = "SELECT * FROM test..messages" + dropMsgs = "DROP SERIES FROM messages" + client influxdata.Client + clientCfg = influxdata.HTTPConfig{ Username: "test", Password: "test", } @@ -69,63 +66,23 @@ func queryDB(cmd string) ([][]interface{}, error) { return response.Results[0].Series[0].Values, nil } -func TestNew(t *testing.T) { - cases := []struct { - desc string - batchSize int - err error - batchTimeout time.Duration - errText string - }{ - { - desc: "Create writer with zero value batch size", - batchSize: 0, - batchTimeout: time.Duration(5 * time.Second), - errText: "zero value batch size", - }, - { - desc: "Create writer with zero value batch timeout", - batchSize: 5, - batchTimeout: time.Duration(0 * time.Second), - errText: "zero value batch timeout", - }, - } - - for _, tc := range cases { - _, err := writer.New(client, testDB, tc.batchSize, tc.batchTimeout) - assert.Equal(t, tc.errText, err.Error(), fmt.Sprintf("%s expected to have error \"%s\", but got \"%s\"", tc.desc, tc.errText, err)) - } -} - func TestSave(t *testing.T) { - // Set batch size to 1 to simulate single point insert. - repo, err := writer.New(client, testDB, 1, saveTimeout) - require.Nil(t, err, fmt.Sprintf("Creating new InfluxDB repo expected to succeed: %s.\n", err)) - - // Set batch size to value > 1 to simulate real batch. - repo1, err := writer.New(client, testDB, saveBatchSize, saveTimeout) - require.Nil(t, err, fmt.Sprintf("Creating new InfluxDB repo expected to succeed: %s.\n", err)) + repo := writer.New(client, testDB) cases := []struct { desc string - repo writers.MessageRepository msgsNum int expectedSize int - isBatch bool }{ { desc: "save a single message", - repo: repo, msgsNum: 1, expectedSize: 1, - isBatch: false, }, { desc: "save a batch of messages", - repo: repo1, msgsNum: streamsSize, - expectedSize: streamsSize - (streamsSize % saveBatchSize), - isBatch: true, + expectedSize: streamsSize, }, } @@ -135,6 +92,7 @@ func TestSave(t *testing.T) { require.Nil(t, err, fmt.Sprintf("Cleaning data from InfluxDB expected to succeed: %s.\n", err)) now := time.Now().Unix() + var msgs []mainflux.Message for i := 0; i < tc.msgsNum; i++ { // Mix possible values as well as value sum. count := i % valueFields @@ -150,28 +108,19 @@ func TestSave(t *testing.T) { case 4: msg.ValueSum = nil case 5: - msg.ValueSum = &mainflux.SumValue{Value: 45} + msg.ValueSum = &mainflux.SumValue{Value: 42} } msg.Time = float64(now + int64(i)) - - err := tc.repo.Save(msg) - assert.Nil(t, err, fmt.Sprintf("Save operation expected to succeed: %s.\n", err)) + msgs = append(msgs, msg) } + err = repo.Save(msgs...) + assert.Nil(t, err, fmt.Sprintf("Save operation expected to succeed: %s.\n", err)) + row, err := queryDB(selectMsgs) assert.Nil(t, err, fmt.Sprintf("Querying InfluxDB to retrieve data expected to succeed: %s.\n", err)) count := len(row) assert.Equal(t, tc.expectedSize, count, fmt.Sprintf("Expected to have %d messages saved, found %d instead.\n", tc.expectedSize, count)) - - if tc.isBatch { - // Sleep for `saveBatchTime` to trigger ticker and check if the reset of the data is saved. - time.Sleep(saveTimeout) - - row, err = queryDB(selectMsgs) - assert.Nil(t, err, fmt.Sprintf("Querying InfluxDB to retrieve data count expected to succeed: %s.\n", err)) - count = len(row) - assert.Equal(t, tc.msgsNum, count, fmt.Sprintf("Expected to have %d messages, found %d instead.\n", tc.msgsNum, count)) - } } } diff --git a/writers/messages.go b/writers/messages.go index 43e3039fa6..a3088177b0 100644 --- a/writers/messages.go +++ b/writers/messages.go @@ -10,5 +10,5 @@ type MessageRepository interface { // Save method is used to save published message. A non-nil // error is returned to indicate operation failure. - Save(mainflux.Message) error + Save(...mainflux.Message) error } diff --git a/writers/mongodb/README.md b/writers/mongodb/README.md index 43f5a5029f..175813457c 100644 --- a/writers/mongodb/README.md +++ b/writers/mongodb/README.md @@ -21,6 +21,7 @@ default values. ## Deployment ```yaml + version: "3.7" mongodb-writer: image: mainflux/mongodb-writer:[version] container_name: [instance name] diff --git a/writers/mongodb/messages.go b/writers/mongodb/messages.go index eb3c476f24..fec267ccef 100644 --- a/writers/mongodb/messages.go +++ b/writers/mongodb/messages.go @@ -43,40 +43,45 @@ func New(db *mongo.Database) writers.MessageRepository { return &mongoRepo{db} } -func (repo *mongoRepo) Save(msg mainflux.Message) error { +func (repo *mongoRepo) Save(messages ...mainflux.Message) error { coll := repo.db.Collection(collectionName) - m := message{ - Channel: msg.Channel, - Subtopic: msg.Subtopic, - Publisher: msg.Publisher, - Protocol: msg.Protocol, - Name: msg.Name, - Unit: msg.Unit, - Time: msg.Time, - UpdateTime: msg.UpdateTime, - Link: msg.Link, - } + var msgs []interface{} + for _, msg := range messages { + m := message{ + Channel: msg.Channel, + Subtopic: msg.Subtopic, + Publisher: msg.Publisher, + Protocol: msg.Protocol, + Name: msg.Name, + Unit: msg.Unit, + Time: msg.Time, + UpdateTime: msg.UpdateTime, + Link: msg.Link, + } - switch msg.Value.(type) { - case *mainflux.Message_FloatValue: - v := msg.GetFloatValue() - m.FloatValue = &v - case *mainflux.Message_StringValue: - v := msg.GetStringValue() - m.StringValue = &v - case *mainflux.Message_DataValue: - v := msg.GetDataValue() - m.DataValue = &v - case *mainflux.Message_BoolValue: - v := msg.GetBoolValue() - m.BoolValue = &v - } + switch msg.Value.(type) { + case *mainflux.Message_FloatValue: + v := msg.GetFloatValue() + m.FloatValue = &v + case *mainflux.Message_StringValue: + v := msg.GetStringValue() + m.StringValue = &v + case *mainflux.Message_DataValue: + v := msg.GetDataValue() + m.DataValue = &v + case *mainflux.Message_BoolValue: + v := msg.GetBoolValue() + m.BoolValue = &v + } + + if msg.GetValueSum() != nil { + valueSum := msg.GetValueSum().Value + m.ValueSum = &valueSum + } - if msg.GetValueSum() != nil { - valueSum := msg.GetValueSum().Value - m.ValueSum = &valueSum + msgs = append(msgs, m) } - _, err := coll.InsertOne(context.Background(), m) + _, err := coll.InsertMany(context.Background(), msgs) return err } diff --git a/writers/mongodb/messages_test.go b/writers/mongodb/messages_test.go index 616c6c4aaa..ac71451084 100644 --- a/writers/mongodb/messages_test.go +++ b/writers/mongodb/messages_test.go @@ -54,6 +54,7 @@ func TestSave(t *testing.T) { repo := mongodb.New(db) now := time.Now().Unix() + var msgs []mainflux.Message for i := 0; i < msgsNum; i++ { // Mix possible values as well as value sum. count := i % valueFields @@ -72,9 +73,10 @@ func TestSave(t *testing.T) { msg.ValueSum = &mainflux.SumValue{Value: 45} } msg.Time = float64(now + int64(i)) - - err = repo.Save(msg) + msgs = append(msgs, msg) } + + err = repo.Save(msgs...) assert.Nil(t, err, fmt.Sprintf("Save operation expected to succeed: %s.\n", err)) count, err := db.Collection(collection).CountDocuments(context.Background(), bson.D{}) diff --git a/writers/postgres/README.md b/writers/postgres/README.md index 5d83d0d8e4..2d939636cb 100644 --- a/writers/postgres/README.md +++ b/writers/postgres/README.md @@ -27,6 +27,7 @@ default values. ## Deployment ```yaml + version: "3.7" postgres-writer: image: mainflux/postgres-writer:[version] container_name: [instance name] diff --git a/writers/postgres/messages.go b/writers/postgres/messages.go index 45515614a6..81850982d8 100644 --- a/writers/postgres/messages.go +++ b/writers/postgres/messages.go @@ -4,6 +4,7 @@ package postgres import ( + "context" "errors" "github.com/gofrs/uuid" @@ -31,7 +32,7 @@ func New(db *sqlx.DB) writers.MessageRepository { return &postgresRepo{db: db} } -func (pr postgresRepo) Save(msg mainflux.Message) error { +func (pr postgresRepo) Save(messages ...mainflux.Message) error { q := `INSERT INTO messages (id, channel, subtopic, publisher, protocol, name, unit, value, string_value, bool_value, data_value, value_sum, time, update_time, link) @@ -39,24 +40,31 @@ func (pr postgresRepo) Save(msg mainflux.Message) error { :value, :string_value, :bool_value, :data_value, :value_sum, :time, :update_time, :link);` - dbth, err := toDBMessage(msg) + tx, err := pr.db.BeginTxx(context.Background(), nil) if err != nil { return err } - if _, err := pr.db.NamedExec(q, dbth); err != nil { - pqErr, ok := err.(*pq.Error) - if ok { - switch pqErr.Code.Name() { - case errInvalid: - return ErrInvalidMessage - } + for _, msg := range messages { + dbth, err := toDBMessage(msg) + if err != nil { + return err } - return err + if _, err := tx.NamedExec(q, dbth); err != nil { + pqErr, ok := err.(*pq.Error) + if ok { + switch pqErr.Code.Name() { + case errInvalid: + return ErrInvalidMessage + } + } + + return err + } } - return nil + return tx.Commit() } type dbMessage struct { diff --git a/writers/postgres/messages_test.go b/writers/postgres/messages_test.go index f7223613b6..9b2d3b7c46 100644 --- a/writers/postgres/messages_test.go +++ b/writers/postgres/messages_test.go @@ -34,6 +34,7 @@ func TestMessageSave(t *testing.T) { msg.Publisher = pubid.String() now := time.Now().Unix() + var msgs []mainflux.Message for i := 0; i < msgsNum; i++ { // Mix possible values as well as value sum. count := i % valueFields @@ -52,9 +53,9 @@ func TestMessageSave(t *testing.T) { msg.ValueSum = &mainflux.SumValue{Value: 45} } msg.Time = float64(now + int64(i)) - - err := messageRepo.Save(msg) - assert.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err)) - + msgs = append(msgs, msg) } + + err = messageRepo.Save(msgs...) + assert.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err)) } diff --git a/writers/writer.go b/writers/writer.go index af6619f064..fd787d0e96 100644 --- a/writers/writer.go +++ b/writers/writer.go @@ -9,41 +9,52 @@ import ( "github.com/gogo/protobuf/proto" "github.com/mainflux/mainflux" log "github.com/mainflux/mainflux/logger" + "github.com/mainflux/mainflux/normalizer" nats "github.com/nats-io/go-nats" ) type consumer struct { - nc *nats.Conn - channels map[string]bool - repo MessageRepository - logger log.Logger + nc *nats.Conn + channels map[string]bool + repo MessageRepository + normalizer normalizer.Service + logger log.Logger } // Start method starts to consume normalized messages received from NATS. -func Start(nc *nats.Conn, repo MessageRepository, queue string, channels map[string]bool, logger log.Logger) error { +func Start(nc *nats.Conn, repo MessageRepository, norm normalizer.Service, queue string, channels map[string]bool, logger log.Logger) error { c := consumer{ - nc: nc, - channels: channels, - repo: repo, - logger: logger, + nc: nc, + channels: channels, + repo: repo, + normalizer: norm, + logger: logger, } - _, err := nc.QueueSubscribe(mainflux.OutputSenML, queue, c.consume) + _, err := nc.QueueSubscribe(mainflux.InputChannels, queue, c.consume) return err } func (c *consumer) consume(m *nats.Msg) { - msg := &mainflux.Message{} - if err := proto.Unmarshal(m.Data, msg); err != nil { + var msg mainflux.RawMessage + if err := proto.Unmarshal(m.Data, &msg); err != nil { c.logger.Warn(fmt.Sprintf("Failed to unmarshal received message: %s", err)) return } - if !c.channelExists(msg.GetChannel()) { + norm, err := c.normalizer.Normalize(msg) + if err != nil { + c.logger.Warn(fmt.Sprintf("Failed to normalize received message: %s", err)) return } + var msgs []mainflux.Message + for _, v := range norm { + if c.channelExists(v.GetChannel()) { + msgs = append(msgs, v) + } + } - if err := c.repo.Save(*msg); err != nil { + if err := c.repo.Save(msgs...); err != nil { c.logger.Warn(fmt.Sprintf("Failed to save message: %s", err)) return }