diff --git a/mqtt/mqtt.js b/mqtt/mqtt.js index f47ae52458..1aa4d32201 100644 --- a/mqtt/mqtt.js +++ b/mqtt/mqtt.js @@ -98,7 +98,8 @@ nats.subscribe('channel.*', function (msg) { aedes.authorizePublish = function (client, packet, publish) { // Topics are in the form `channels//messages` - var channel = /^channels\/(.+?)\/messages$/.exec(packet.topic); + // Subtopic's are in the form `channels//messages/` + var channel = /^channels\/(.+?)\/messages\/?(.+?)?$/.exec(packet.topic); if (!channel) { logger.warn('unknown topic'); publish(4); // Bad username or password @@ -109,6 +110,18 @@ aedes.authorizePublish = function (client, packet, publish) { token: client.password, chanID: channelId }, + // Parse unlimited subtopics + baseLength = 3, // First 3 elements which represents the base part of topic. + elements = packet.topic.split('/').slice(baseLength), + baseTopic = 'channel.' + channelId; + // Remove empty elements + for (var i = 0; i < elements.length; i++) { + if (elements[i] === '') { + elements.pop(i) + } + } + var channelTopic = elements.length ? baseTopic + '.' + elements.join('.') : baseTopic, + onAuthorize = function (err, res) { var rawMsg; if (!err) { @@ -120,7 +133,7 @@ aedes.authorizePublish = function (client, packet, publish) { Protocol: 'mqtt', Payload: packet.payload }); - nats.publish('channel.' + channelId, rawMsg); + nats.publish(channelTopic, rawMsg); publish(0); } else { @@ -135,8 +148,14 @@ aedes.authorizePublish = function (client, packet, publish) { aedes.authorizeSubscribe = function (client, packet, subscribe) { // Topics are in the form `channels//messages` - var channel = /^channels\/(.+?)\/messages$/.exec(packet.topic), - channelId = channel[1], + // Subtopic's are in the form `channels//messages/` + var channel = /^channels\/(.+?)\/messages\/?(.+?)?$/.exec(packet.topic); + if (!channel) { + logger.warn('unknown topic'); + subscribe(4, packet); // Bad username or password + return; + } + var channelId = channel[1], accessReq = { token: client.password, chanID: channelId diff --git a/normalizer/nats/pubsub.go b/normalizer/nats/pubsub.go index b1b86c3e89..bdf317fc34 100644 --- a/normalizer/nats/pubsub.go +++ b/normalizer/nats/pubsub.go @@ -19,7 +19,7 @@ import ( const ( queue = "normalizers" - input = "channel.*" + input = "channel.>" outputUnknown = "out.unknown" senML = "application/senml+json" )