From 139b22524ca70f0e6d8df7373a38b4c648630923 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleksandar=20Novakovi=C4=87?= Date: Wed, 17 Apr 2019 12:39:47 +0200 Subject: [PATCH] MF-711 - Create separate Redis instance for ES (#717) * Fix Redis connection error handling in MQTT adapter Signed-off-by: Aleksandar Novakovic * Add separate env vars for ES in MQTT adapter Signed-off-by: Aleksandar Novakovic * Create dedicated redis service for ES Signed-off-by: Aleksandar Novakovic * Update docs according to changes Signed-off-by: Aleksandar Novakovic --- docker/addons/bootstrap/docker-compose.yml | 11 ++------- docker/addons/lora-adapter/docker-compose.yml | 2 +- docker/docker-compose.yml | 10 +++++++- docs/dev-guide.md | 6 ++--- mqtt/README.md | 20 +++++++++++----- mqtt/mqtt.js | 23 ++++++++++++++++--- 6 files changed, 49 insertions(+), 23 deletions(-) diff --git a/docker/addons/bootstrap/docker-compose.yml b/docker/addons/bootstrap/docker-compose.yml index 8b1eda9552..dca7fe0d04 100644 --- a/docker/addons/bootstrap/docker-compose.yml +++ b/docker/addons/bootstrap/docker-compose.yml @@ -15,13 +15,6 @@ services: POSTGRES_DB: bootstrap networks: - docker_mainflux-base-net - - bootstrap-redis: - image: redis:5.0-alpine - container_name: mainflux-bootstrap-redis - restart: on-failure - networks: - - docker_mainflux-base-net bootstrap: image: mainflux/bootstrap:latest @@ -42,7 +35,7 @@ services: MF_BOOTSTRAP_PORT: 8200 MF_SDK_BASE_URL: http://mainflux-things:8182 MF_USERS_URL: mainflux-users:8181 - MF_THINGS_ES_URL: things-redis:6379 - MF_BOOTSTRAP_ES_URL: bootstrap-redis:6379 + MF_THINGS_ES_URL: es-redis:6379 + MF_BOOTSTRAP_ES_URL: es-redis:6379 networks: - docker_mainflux-base-net diff --git a/docker/addons/lora-adapter/docker-compose.yml b/docker/addons/lora-adapter/docker-compose.yml index 36faee2230..1f60a0ca57 100644 --- a/docker/addons/lora-adapter/docker-compose.yml +++ b/docker/addons/lora-adapter/docker-compose.yml @@ -25,7 +25,7 @@ services: restart: on-failure environment: MF_LORA_ADAPTER_LOG_LEVEL: debug - MF_THINGS_ES_URL: things-redis:6379 + MF_THINGS_ES_URL: es-redis:6379 MF_LORA_ADAPTER_ROUTEMAP_URL: lora-redis:6379 MF_LORA_ADAPTER_MESSAGES_URL: tcp://lora.mqtt.mainflux.io:1883 MF_LORA_ADAPTER_HTTP_PORT: 8187 diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 153d1c30c0..7b60c5cd63 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -105,7 +105,7 @@ services: MF_THINGS_DB_PASS: mainflux MF_THINGS_DB: things MF_THINGS_CACHE_URL: things-redis:6379 - MF_THINGS_ES_URL: things-redis:6379 + MF_THINGS_ES_URL: es-redis:6379 MF_THINGS_HTTP_PORT: 8182 MF_THINGS_GRPC_PORT: 8183 MF_USERS_URL: users:8181 @@ -178,6 +178,13 @@ services: networks: - mainflux-base-net + es-redis: + image: redis:5.0-alpine + container_name: mainflux-es-redis + restart: on-failure + networks: + - mainflux-base-net + mqtt-redis: image: redis:5.0-alpine container_name: mainflux-mqtt-redis @@ -199,6 +206,7 @@ services: MF_MQTT_ADAPTER_PORT: 1883 MF_MQTT_ADAPTER_WS_PORT: 8880 MF_MQTT_ADAPTER_REDIS_HOST: mqtt-redis + MF_MQTT_ADAPTER_ES_HOST: es-redis MF_NATS_URL: nats://nats:4222 MF_THINGS_URL: things:8183 ports: diff --git a/docs/dev-guide.md b/docs/dev-guide.md index 75de706728..a9593c1223 100644 --- a/docs/dev-guide.md +++ b/docs/dev-guide.md @@ -279,7 +279,7 @@ By fetching and processing these events you can reconstruct `things` service sta If you store some of your custom data in `metadata` field, this is the perfect way to fetch it and process it. If you want to integrate through [docker-compose.yml](https://github.com/mainflux/mainflux/blob/master/docker/docker-compose.yml) -you can use `mainflux-things-redis` service. Just connect to it and consume events +you can use `mainflux-es-redis` service. Just connect to it and consume events from Redis Stream named `mainflux.things`. #### Thing create event @@ -420,7 +420,7 @@ the following event types: If you want to integrate through [docker-compose.yml](https://github.com/mainflux/mainflux/blob/master/docker/addons/bootstrap/docker-compose.yml) -you can use `mainflux-bootstrap-redis` service. Just connect to it and consume events +you can use `mainflux-es-redis` service. Just connect to it and consume events from Redis Stream named `mainflux.bootstrap`. #### Configuration create event @@ -534,7 +534,7 @@ Events that are coming from MQTT adapter have following fields: If you want to integrate through [docker-compose.yml](https://github.com/mainflux/mainflux/blob/master/docker/docker-compose.yml) -you can use `mainflux-mqtt-redis` service. Just connect to it and consume events +you can use `mainflux-es-redis` service. Just connect to it and consume events from Redis Stream named `mainflux.mqtt`. Example of connect event: diff --git a/mqtt/README.md b/mqtt/README.md index 8263378845..d68e0a0e69 100644 --- a/mqtt/README.md +++ b/mqtt/README.md @@ -20,6 +20,10 @@ default values. | MF_MQTT_ADAPTER_REDIS_HOST | Redis host | localhost | | MF_MQTT_ADAPTER_REDIS_PASS | Redis pass | mqtt | | MF_MQTT_ADAPTER_REDIS_DB | Redis db | 0 | +| MF_MQTT_ADAPTER_ES_PORT | Event stream port | 6379 | +| MF_MQTT_ADAPTER_ES_HOST | Event stream host | localhost | +| MF_MQTT_ADAPTER_ES_PASS | Event stream pass | mqtt | +| MF_MQTT_ADAPTER_ES_DB | Event stream db | 0 | | MF_MQTT_CONCURRENT_MESSAGES | Number of messages that can be concurrently exchanged | 100 | | MF_THINGS_URL | Things service URL | localhost:8181 | | MF_MQTT_ADAPTER_CLIENT_TLS | Flag that indicates if TLS should be turned on | false | @@ -44,11 +48,15 @@ services: MF_MQTT_ADAPTER_LOG_LEVEL: [MQTT adapter log level] MF_MQTT_INSTANCE_ID: [ID of MQTT adapter instance] MF_MQTT_ADAPTER_PORT: [Service MQTT port] - MF_MQTT_WS_PORT: [Service WS port] - MF_MQTT_REDIS_PORT: [Redis port] - MF_MQTT_REDIS_HOST: [Redis host] - MF_MQTT_REDIS_PASS: [Redis pass] - MF_MQTT_REDIS_DB: [Redis db] + MF_MQTT_ADAPTER_WS_PORT: [Service WS port] + MF_MQTT_ADAPTER_REDIS_PORT: [Redis port] + MF_MQTT_ADAPTER_REDIS_HOST: [Redis host] + MF_MQTT_ADAPTER_REDIS_PASS: [Redis pass] + MF_MQTT_ADAPTER_REDIS_DB: [Redis db] + MF_MQTT_ADAPTER_ES_PORT: [Event stream port] + MF_MQTT_ADAPTER_ES_HOST: [Event stream host] + MF_MQTT_ADAPTER_ES_PASS: [Event stream pass] + MF_MQTT_ADAPTER_ES_DB: [Event stream db] MF_MQTT_CONCURRENT_MESSAGES: [Number of messages that can be concurrently exchanged] MF_MQTT_ADAPTER_CLIENT_TLS: [Flag that indicates if TLS should be turned on] MF_MQTT_ADAPTER_CA_CERTS: [Path to trusted CAs in PEM format] @@ -66,7 +74,7 @@ cd $GOPATH/src/github.com/mainflux/mainflux/mqtt npm install # set the environment variables and run the service -MF_THINGS_URL=[Things service URL] MF_NATS_URL=[NATS instance URL] MF_MQTT_ADAPTER_LOG_LEVEL=[MQTT adapter log level] MF_MQTT_INSTANCE_ID=[ID of MQTT adapter instance] MF_MQTT_ADAPTER_PORT=[Service MQTT port] MF_MQTT_WS_PORT=[Service WS port] MF_MQTT_REDIS_PORT=[Redis port] MF_MQTT_REDIS_HOST=[Redis host] MF_MQTT_REDIS_PASS=[Redis pass] MF_MQTT_REDIS_DB=[Redis db] MF_MQTT_CONCURRENT_MESSAGES=[Number of messages that can be concurrently exchanged] MF_MQTT_ADAPTER_CLIENT_TLS=[Flag that indicates if TLS should be turned on] MF_MQTT_ADAPTER_CA_CERTS=[Path to trusted CAs in PEM format] node mqtt.js .. +MF_THINGS_URL=[Things service URL] MF_NATS_URL=[NATS instance URL] MF_MQTT_ADAPTER_LOG_LEVEL=[MQTT adapter log level] MF_MQTT_INSTANCE_ID=[ID of MQTT adapter instance] MF_MQTT_ADAPTER_PORT=[Service MQTT port] MF_MQTT_ADAPTER_WS_PORT=[Service WS port] MF_MQTT_ADAPTER_REDIS_PORT=[Redis port] MF_MQTT_ADAPTER_REDIS_HOST=[Redis host] MF_MQTT_ADAPTER_REDIS_PASS=[Redis pass] MF_MQTT_ADAPTER_REDIS_DB=[Redis db] MF_MQTT_ADAPTER_ES_PORT=[Event stream port] MF_MQTT_ADAPTER_ES_HOST=[Event stream host] MF_MQTT_ADAPTER_ES_PASS=[Event stream pass] MF_MQTT_ADAPTER_ES_DB=[Event stream db] MF_MQTT_CONCURRENT_MESSAGES=[Number of messages that can be concurrently exchanged] MF_MQTT_ADAPTER_CLIENT_TLS=[Flag that indicates if TLS should be turned on] MF_MQTT_ADAPTER_CA_CERTS=[Path to trusted CAs in PEM format] node mqtt.js .. ``` ## Usage diff --git a/mqtt/mqtt.js b/mqtt/mqtt.js index 66e6cf7e20..92e8621806 100644 --- a/mqtt/mqtt.js +++ b/mqtt/mqtt.js @@ -6,7 +6,7 @@ 'use strict'; var http = require('http'), - redis = require("redis"), + redis = require('redis'), net = require('net'), protobuf = require('protocol-buffers'), websocket = require('websocket-stream'), @@ -27,6 +27,10 @@ var config = { redis_host: process.env.MF_MQTT_ADAPTER_REDIS_HOST || 'localhost', redis_pass: process.env.MF_MQTT_ADAPTER_REDIS_PASS || 'mqtt', redis_db: Number(process.env.MF_MQTT_ADAPTER_REDIS_DB) || 0, + es_port: Number(process.env.MF_MQTT_ADAPTER_ES_PORT) || 6379, + es_host: process.env.MF_MQTT_ADAPTER_ES_HOST || 'localhost', + es_pass: process.env.MF_MQTT_ADAPTER_ES_PASS || 'mqtt', + es_db: Number(process.env.MF_MQTT_ADAPTER_ES_DB) || 0, client_tls: (process.env.MF_MQTT_ADAPTER_CLIENT_TLS == 'true') || false, ca_certs: process.env.MF_MQTT_ADAPTER_CA_CERTS || '', concurrency: Number(process.env.MF_MQTT_CONCURRENT_MESSAGES) || 100, @@ -63,7 +67,12 @@ var config = { } return new thingsSchema.ThingsService(config.auth_url, certs); })(), - esclient = redis.createClient(config.redis_port, config.redis_host), + esclient = redis.createClient({ + port: config.es_port, + host: config.es_host, + password: config.es_pass, + db: config.es_db + }), servers = [ startMqtt(), startWs() @@ -77,6 +86,10 @@ logging({ logger.level(config.log_level); +esclient.on('error', function(err) { + logger.warn('error on redis connection: %s', err.message); +}); + // MQTT over WebSocket function startWs() { var server = http.createServer(); @@ -183,7 +196,7 @@ aedes.authorizeSubscribe = function (client, packet, subscribe) { logger.info('authorized subscribe'); subscribe(null, packet); } else { - logger.warn('unauthorized subscribe: %s', err); + logger.warn('unauthorized subscribe: %s', err.message); subscribe(4, packet); // Bad username or password } }; @@ -224,6 +237,10 @@ aedes.on('connectionError', function (client, err) { logger.warn('client error: client: %s, error: %s', client.id, err.message); }); +aedes.on('error', function(err) { + logger.warn('aedes error: %s', err.message); +}); + function publishConnEvent(id, type) { var onPublish = function(err) { if (err) {