From b8c9eee4c79b2639a4054030c81b6e280d0b8270 Mon Sep 17 00:00:00 2001 From: nmarcetic Date: Fri, 25 Jan 2019 19:20:17 +0100 Subject: [PATCH 1/4] Enabled MQTT subtopic Signed-off-by: nmarcetic --- mqtt/mqtt.js | 28 ++++++++++++++++++++++++---- normalizer/nats/pubsub.go | 2 +- 2 files changed, 25 insertions(+), 5 deletions(-) diff --git a/mqtt/mqtt.js b/mqtt/mqtt.js index f47ae52458..f6231e9389 100644 --- a/mqtt/mqtt.js +++ b/mqtt/mqtt.js @@ -98,7 +98,8 @@ nats.subscribe('channel.*', function (msg) { aedes.authorizePublish = function (client, packet, publish) { // Topics are in the form `channels//messages` - var channel = /^channels\/(.+?)\/messages$/.exec(packet.topic); + // Subtopic's are in the form `channels//messages/` + var channel = /^channels\/(.+?)\/messages\/?(.+?)?$/.exec(packet.topic); if (!channel) { logger.warn('unknown topic'); publish(4); // Bad username or password @@ -109,6 +110,18 @@ aedes.authorizePublish = function (client, packet, publish) { token: client.password, chanID: channelId }, + // Parse unlimited subtopics + baseLength = 3, // First few elements which represents the base part of topic. + elements = packet.topic.split('/').slice(baseLength), + baseTopic = 'channel.' + channelId; + // Remove empty elements + for (var i = 0; i < elements.length; i++) { + if (elements[i] === '') { + elements.pop(i) + } + } + var channelTopic = elements.length ? baseTopic + '.' + elements.join('.') : baseTopic, + onAuthorize = function (err, res) { var rawMsg; if (!err) { @@ -120,7 +133,7 @@ aedes.authorizePublish = function (client, packet, publish) { Protocol: 'mqtt', Payload: packet.payload }); - nats.publish('channel.' + channelId, rawMsg); + nats.publish(channelTopic, rawMsg); publish(0); } else { @@ -129,14 +142,21 @@ aedes.authorizePublish = function (client, packet, publish) { } }; + console.log("Here it is the filan NATS topic:", channelTopic) things.CanAccess(accessReq, onAuthorize); }; aedes.authorizeSubscribe = function (client, packet, subscribe) { // Topics are in the form `channels//messages` - var channel = /^channels\/(.+?)\/messages$/.exec(packet.topic), - channelId = channel[1], + // Subtopic's are in the form `channels//messages/` + var channel = /^channels\/(.+?)\/messages\/?(.+?)?$/.exec(packet.topic); + if (!channel) { + logger.warn('unknown topic'); + subscribe(4, packet); // Bad username or password + return; + } + var channelId = channel[1], accessReq = { token: client.password, chanID: channelId diff --git a/normalizer/nats/pubsub.go b/normalizer/nats/pubsub.go index b1b86c3e89..bdf317fc34 100644 --- a/normalizer/nats/pubsub.go +++ b/normalizer/nats/pubsub.go @@ -19,7 +19,7 @@ import ( const ( queue = "normalizers" - input = "channel.*" + input = "channel.>" outputUnknown = "out.unknown" senML = "application/senml+json" ) From 327330a40a14d9ac420f7b5ee9762df5adce582e Mon Sep 17 00:00:00 2001 From: nmarcetic Date: Fri, 25 Jan 2019 19:25:59 +0100 Subject: [PATCH 2/4] Removed debug logs Signed-off-by: nmarcetic --- mqtt/mqtt.js | 1 - 1 file changed, 1 deletion(-) diff --git a/mqtt/mqtt.js b/mqtt/mqtt.js index f6231e9389..5c2464ca00 100644 --- a/mqtt/mqtt.js +++ b/mqtt/mqtt.js @@ -142,7 +142,6 @@ aedes.authorizePublish = function (client, packet, publish) { } }; - console.log("Here it is the filan NATS topic:", channelTopic) things.CanAccess(accessReq, onAuthorize); }; From 018507d847ebcff5f356e5bd7eed60e024c66660 Mon Sep 17 00:00:00 2001 From: nmarcetic Date: Fri, 25 Jan 2019 21:19:25 +0100 Subject: [PATCH 3/4] Resolved remarks Signed-off-by: nmarcetic --- mqtt/mqtt.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mqtt/mqtt.js b/mqtt/mqtt.js index 5c2464ca00..782b755cdf 100644 --- a/mqtt/mqtt.js +++ b/mqtt/mqtt.js @@ -111,7 +111,7 @@ aedes.authorizePublish = function (client, packet, publish) { chanID: channelId }, // Parse unlimited subtopics - baseLength = 3, // First few elements which represents the base part of topic. + baseLength = 3, // First 3 elements which represents the base part of topic. elements = packet.topic.split('/').slice(baseLength), baseTopic = 'channel.' + channelId; // Remove empty elements From 02c2744ffc36fc16f7d0a739df734014e7f05870 Mon Sep 17 00:00:00 2001 From: nmarcetic Date: Fri, 25 Jan 2019 21:27:17 +0100 Subject: [PATCH 4/4] Resolved remarks Signed-off-by: nmarcetic --- mqtt/mqtt.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mqtt/mqtt.js b/mqtt/mqtt.js index 782b755cdf..1aa4d32201 100644 --- a/mqtt/mqtt.js +++ b/mqtt/mqtt.js @@ -114,7 +114,7 @@ aedes.authorizePublish = function (client, packet, publish) { baseLength = 3, // First 3 elements which represents the base part of topic. elements = packet.topic.split('/').slice(baseLength), baseTopic = 'channel.' + channelId; - // Remove empty elements + // Remove empty elements for (var i = 0; i < elements.length; i++) { if (elements[i] === '') { elements.pop(i)