Skip to content

Commit

Permalink
MF-429 -Enabled MQTT subtopic's (absmach#554)
Browse files Browse the repository at this point in the history
* Enabled MQTT subtopic

Signed-off-by: nmarcetic <n.marcetic86@gmail.com>

* Removed debug logs

Signed-off-by: nmarcetic <n.marcetic86@gmail.com>

* Resolved remarks

Signed-off-by: nmarcetic <n.marcetic86@gmail.com>

* Resolved remarks

Signed-off-by: nmarcetic <n.marcetic86@gmail.com>
  • Loading branch information
nmarcetic authored and juanmagal committed Feb 5, 2019
1 parent c079b3f commit 069fa3c
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 5 deletions.
27 changes: 23 additions & 4 deletions mqtt/mqtt.js
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ nats.subscribe('channel.*', function (msg) {

aedes.authorizePublish = function (client, packet, publish) {
// Topics are in the form `channels/<channel_id>/messages`
var channel = /^channels\/(.+?)\/messages$/.exec(packet.topic);
// Subtopic's are in the form `channels/<channel_id>/messages/<subtopic>`
var channel = /^channels\/(.+?)\/messages\/?(.+?)?$/.exec(packet.topic);
if (!channel) {
logger.warn('unknown topic');
publish(4); // Bad username or password
Expand All @@ -109,6 +110,18 @@ aedes.authorizePublish = function (client, packet, publish) {
token: client.password,
chanID: channelId
},
// Parse unlimited subtopics
baseLength = 3, // First 3 elements which represents the base part of topic.
elements = packet.topic.split('/').slice(baseLength),
baseTopic = 'channel.' + channelId;
// Remove empty elements
for (var i = 0; i < elements.length; i++) {
if (elements[i] === '') {
elements.pop(i)
}
}
var channelTopic = elements.length ? baseTopic + '.' + elements.join('.') : baseTopic,

onAuthorize = function (err, res) {
var rawMsg;
if (!err) {
Expand All @@ -120,7 +133,7 @@ aedes.authorizePublish = function (client, packet, publish) {
Protocol: 'mqtt',
Payload: packet.payload
});
nats.publish('channel.' + channelId, rawMsg);
nats.publish(channelTopic, rawMsg);

publish(0);
} else {
Expand All @@ -135,8 +148,14 @@ aedes.authorizePublish = function (client, packet, publish) {

aedes.authorizeSubscribe = function (client, packet, subscribe) {
// Topics are in the form `channels/<channel_id>/messages`
var channel = /^channels\/(.+?)\/messages$/.exec(packet.topic),
channelId = channel[1],
// Subtopic's are in the form `channels/<channel_id>/messages/<subtopic>`
var channel = /^channels\/(.+?)\/messages\/?(.+?)?$/.exec(packet.topic);
if (!channel) {
logger.warn('unknown topic');
subscribe(4, packet); // Bad username or password
return;
}
var channelId = channel[1],
accessReq = {
token: client.password,
chanID: channelId
Expand Down
2 changes: 1 addition & 1 deletion normalizer/nats/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (

const (
queue = "normalizers"
input = "channel.*"
input = "channel.>"
outputUnknown = "out.unknown"
senML = "application/senml+json"
)
Expand Down

0 comments on commit 069fa3c

Please sign in to comment.