diff --git a/server/services/mqtt/index.js b/server/services/mqtt/index.js index 5b5658bb10..125db85592 100644 --- a/server/services/mqtt/index.js +++ b/server/services/mqtt/index.js @@ -6,9 +6,9 @@ const MQTT_URL_KEY = 'MQTT_URL'; const MQTT_USERNAME_KEY = 'MQTT_USERNAME'; const MQTT_PASSWORD_KEY = 'MQTT_PASSWORD'; -module.exports = function PhilipsHueService(gladys, serviceId) { +module.exports = function MqttService(gladys, serviceId) { const mqtt = require('mqtt'); - const mqttHandler = new MqttHandler(gladys, mqtt, serviceId); + let mqttHandler = null; /** * @public @@ -20,11 +20,13 @@ module.exports = function PhilipsHueService(gladys, serviceId) { const mqttUrl = await gladys.variable.getValue(MQTT_URL_KEY, serviceId); const mqttUsername = await gladys.variable.getValue(MQTT_USERNAME_KEY, serviceId); const mqttPassword = await gladys.variable.getValue(MQTT_PASSWORD_KEY, serviceId); - const variablesFound = (mqttUrl && mqttUsername && mqttPassword); + const variablesFound = (mqttUrl); if (!variablesFound) { throw new ServiceNotConfiguredError('MQTT is not configured.'); } logger.log('starting MQTT service'); + mqttHandler = new MqttHandler(gladys, mqtt, mqttUrl, mqttUsername, mqttPassword, serviceId); + mqttHandler.connect(); } /** diff --git a/server/services/mqtt/lib/connect.js b/server/services/mqtt/lib/connect.js new file mode 100644 index 0000000000..eadedd6030 --- /dev/null +++ b/server/services/mqtt/lib/connect.js @@ -0,0 +1,31 @@ +const logger = require('../../../utils/logger'); + +/** + * @description Connect and listen to all topics + * @example + * connect(); + */ +function connect() { + logger.debug(`Trying to connect to MQTT server ${this.mqttUrl}...`); + this.mqttClient = this.mqttLibrary.connect(this.mqttUrl, { + username: this.mqttUsername, + password: this.mqttPassword, + }); + this.mqttClient.on('connect', () => { + logger.info(`Connected to MQTT server ${this.mqttUrl}`); + // Gladys master remote + this.mqttClient.subscribe('gladys/master/#'); + // Owntracks topic + this.mqttClient.subscribe('owntracks/+/+'); + }); + this.mqttClient.on('error', (err) => { + logger.warn(`Error while connecting to MQTT - ${err}`); + }); + this.mqttClient.on('message', (topic, message) => { + this.handleNewMessage(topic, message.toString()); + }); +} + +module.exports = { + connect, +}; diff --git a/server/services/mqtt/lib/handleNewMessage.js b/server/services/mqtt/lib/handleNewMessage.js new file mode 100644 index 0000000000..8b5b7438b4 --- /dev/null +++ b/server/services/mqtt/lib/handleNewMessage.js @@ -0,0 +1,29 @@ +const logger = require('../../../utils/logger'); +const { EVENTS } = require('../../../utils/constants'); + +/** + * @description Handle a new message receive in MQTT. + * @param {string} topic - The topic where the message was posted. + * @param {string} message - The message sent. + * @example + * handleNewMessage('/gladys/master/heartbeat', '{}'); + */ +function handleNewMessage(topic, message) { + try { + switch (topic) { + case 'gladys/master/device/create': + this.gladys.event.emit(EVENTS.DEVICE.NEW, JSON.parse(message)); + break; + default: + logger.info(`MQTT : Topic ${topic} not handled.`); + break; + } + } catch (e) { + logger.warn(`Unable to handle new MQTT message in topic ${topic}`); + logger.warn(e); + } +} + +module.exports = { + handleNewMessage, +}; diff --git a/server/services/mqtt/lib/index.js b/server/services/mqtt/lib/index.js index ac538d0452..e2de61d835 100644 --- a/server/services/mqtt/lib/index.js +++ b/server/services/mqtt/lib/index.js @@ -1,16 +1,28 @@ +const { connect } = require('./connect'); +const { handleNewMessage } = require('./handleNewMessage'); /** * @description Add ability to connect to a MQTT broker. * @param {Object} gladys - Gladys instance. - * @param {Object} mqttClient - MqttClient. + * @param {Object} mqttLibrary - MQTT lib. + * @param {Object} mqttUrl - MQTT server URL. + * @param {Object} mqttUsername - MQTT username. + * @param {Object} mqttPassword - MQTT password. * @param {string} serviceId - UUID of the service in DB. * @example * const mqttHandler = new MqttHandler(gladys, client, serviceId); */ -const MqttHandler = function MqttHandler(gladys, mqttClient, serviceId) { +const MqttHandler = function MqttHandler(gladys, mqttLibrary, mqttUrl, mqttUsername, mqttPassword, serviceId) { this.gladys = gladys; - this.mqttClient = mqttClient; + this.mqttLibrary = mqttLibrary; + this.mqttUrl = mqttUrl; + this.mqttUsername = mqttUsername; + this.mqttPassword = mqttPassword; this.serviceId = serviceId; + this.mqttClient = null; }; +MqttHandler.prototype.connect = connect; +MqttHandler.prototype.handleNewMessage = handleNewMessage; + module.exports = MqttHandler; diff --git a/server/test/services/mqtt/index.test.js b/server/test/services/mqtt/index.test.js new file mode 100644 index 0000000000..41c8e19042 --- /dev/null +++ b/server/test/services/mqtt/index.test.js @@ -0,0 +1,21 @@ +const { assert, fake } = require('sinon'); +const proxyquire = require('proxyquire').noCallThru(); +const MockedMqttClient = require('./mocks.test'); + +const MqttService = proxyquire('../../../services/mqtt/index', { + mqtt: MockedMqttClient, +}); + +const gladys = { + variable: { + getValue: fake.resolves('result'), + }, +}; + +describe('MqttService', () => { + const mqttService = MqttService(gladys, 'faea9c35-759a-44d5-bcc9-2af1de37b8b4'); + it('should start service', async () => { + await mqttService.start(); + assert.called(MockedMqttClient.connect); + }); +}); diff --git a/server/test/services/mqtt/mocks.test.js b/server/test/services/mqtt/mocks.test.js new file mode 100644 index 0000000000..92f6fb27fe --- /dev/null +++ b/server/test/services/mqtt/mocks.test.js @@ -0,0 +1,10 @@ +const { fake } = require('sinon'); + +const MockedMqttClient = { + connect: fake.returns({ + on: fake.returns(null), + emit: fake.returns(null), + }), +}; + +module.exports = MockedMqttClient; diff --git a/server/test/services/mqtt/mqttHandler.test.js b/server/test/services/mqtt/mqttHandler.test.js new file mode 100644 index 0000000000..993dc09c3b --- /dev/null +++ b/server/test/services/mqtt/mqttHandler.test.js @@ -0,0 +1,29 @@ +const { assert, fake } = require('sinon'); +const { EVENTS } = require('../../../utils/constants'); +const MockedMqttClient = require('./mocks.test'); + +const gladys = { + variable: { + getValue: fake.resolves('result'), + }, + event: { + emit: fake.returns(null), + }, +}; + +const MqttHandler = require('../../../services/mqtt/lib'); + +describe('MqttHandler', () => { + const mqttHandler = new MqttHandler(gladys, MockedMqttClient, 'url', 'username', 'password', 'faea9c35-759a-44d5-bcc9-2af1de37b8b4'); + it('should call connect function', () => { + mqttHandler.connect(); + assert.calledOnce(MockedMqttClient.connect); + }); + it('should create device', () => { + mqttHandler.handleNewMessage('gladys/master/device/create', '{}'); + assert.calledWith(gladys.event.emit, EVENTS.DEVICE.NEW, {}); + }); + it('should not do anything, topic not found', () => { + mqttHandler.handleNewMessage('UNKNOWN_TOPIC', '{}'); + }); +});