From 3db846568753fb35d37a8f32225a6294c0284a78 Mon Sep 17 00:00:00 2001 From: Siarhei Buntsevich Date: Wed, 26 Jun 2019 17:07:25 +0300 Subject: [PATCH 1/2] some properties can be multiple --- parser.js | 11 +++- test.js | 39 ++++++++++++++ writeToStream.js | 133 ++++++++++++++++++++++++++--------------------- 3 files changed, 124 insertions(+), 59 deletions(-) diff --git a/parser.js b/parser.js index a476f21..f0d3d21 100644 --- a/parser.js +++ b/parser.js @@ -611,7 +611,16 @@ Parser.prototype._parseProperties = function () { } continue } - result[name] = this._parseByType(constants.propertiesTypes[name]) + if (result[name]) { + if (Array.isArray(result[name])) { + result[name].push(this._parseByType(constants.propertiesTypes[name])) + } else { + result[name] = [result[name]] + result[name].push(this._parseByType(constants.propertiesTypes[name])) + } + } else { + result[name] = this._parseByType(constants.propertiesTypes[name]) + } } return result } diff --git a/test.js b/test.js index a86e9be..199fed2 100644 --- a/test.js +++ b/test.js @@ -909,6 +909,45 @@ testParseGenerate('publish MQTT5 properties', { 116, 101, 115, 116 // Payload (test) ]), { protocolVersion: 5 }) +testParseGenerate('publish MQTT5 with multiple same properties', { + cmd: 'publish', + retain: true, + qos: 2, + dup: true, + length: 62, + topic: 'test', + payload: new Buffer('test'), + messageId: 10, + properties: { + payloadFormatIndicator: true, + messageExpiryInterval: 4321, + topicAlias: 100, + responseTopic: 'topic', + correlationData: Buffer.from([1, 2, 3, 4]), + userProperties: { + 'test': 'test' + }, + subscriptionIdentifier: [120, 121], + contentType: 'test' + } +}, Buffer.from([ + 61, 62, // Header + 0, 4, // Topic length + 116, 101, 115, 116, // Topic (test) + 0, 10, // Message ID + 49, // properties length + 1, 1, // payloadFormatIndicator + 2, 0, 0, 16, 225, // message expiry interval + 35, 0, 100, // topicAlias + 8, 0, 5, 116, 111, 112, 105, 99, // response topic + 9, 0, 4, 1, 2, 3, 4, // correlationData + 38, 0, 4, 116, 101, 115, 116, 0, 4, 116, 101, 115, 116, // userProperties + 11, 120, // subscriptionIdentifier + 11, 121, // subscriptionIdentifier + 3, 0, 4, 116, 101, 115, 116, // content type + 116, 101, 115, 116 // Payload (test) +]), { protocolVersion: 5 }) + ;(function () { var buffer = new Buffer(2048) testParseGenerate('2KB publish packet', { diff --git a/writeToStream.js b/writeToStream.js index 7ade926..accbdfc 100644 --- a/writeToStream.js +++ b/writeToStream.js @@ -859,9 +859,8 @@ function getProperties (stream, properties) { } } var propertiesLength = 0 - function getLengthProperty (name) { + function getLengthProperty (name, value) { var type = protocol.propertiesTypes[name] - var value = properties[name] var length = 0 switch (type) { case 'byte': { @@ -948,7 +947,15 @@ function getProperties (stream, properties) { } if (properties) { for (var propName in properties) { - var propLength = getLengthProperty(propName) + var propLength = 0 + var propValue = properties[propName] + if (Array.isArray(propValue)) { + for (var valueIndex = 0; valueIndex < propValue.length; valueIndex++) { + propLength += getLengthProperty(propName, propValue[valueIndex]) + } + } else { + propLength = getLengthProperty(propName, propValue) + } if (!propLength) return false propertiesLength += propLength } @@ -982,68 +989,78 @@ function getPropertiesByMaximumPacketSize (stream, properties, opts, length) { return propertiesData } +function writeProperty (stream, propName, value) { + var type = protocol.propertiesTypes[propName] + switch (type) { + case 'byte': { + stream.write(Buffer.from([protocol.properties[propName]])) + stream.write(Buffer.from([+value])) + break + } + case 'int8': { + stream.write(Buffer.from([protocol.properties[propName]])) + stream.write(Buffer.from([value])) + break + } + case 'binary': { + stream.write(Buffer.from([protocol.properties[propName]])) + writeStringOrBuffer(stream, value) + break + } + case 'int16': { + stream.write(Buffer.from([protocol.properties[propName]])) + writeNumber(stream, value) + break + } + case 'int32': { + stream.write(Buffer.from([protocol.properties[propName]])) + write4ByteNumber(stream, value) + break + } + case 'var': { + stream.write(Buffer.from([protocol.properties[propName]])) + writeVarByteInt(stream, value) + break + } + case 'string': { + stream.write(Buffer.from([protocol.properties[propName]])) + writeString(stream, value) + break + } + case 'pair': { + Object.getOwnPropertyNames(value).forEach(function (name) { + var currentValue = value[name] + if (Array.isArray(currentValue)) { + currentValue.forEach(function (value) { + stream.write(Buffer.from([protocol.properties[propName]])) + writeStringPair(stream, name.toString(), value.toString()) + }) + } else { + stream.write(Buffer.from([protocol.properties[propName]])) + writeStringPair(stream, name.toString(), currentValue.toString()) + } + }) + break + } + default: { + stream.emit('error', new Error('Invalid property ' + propName + ' value: ' + value)) + return false + } + } +} + function writeProperties (stream, properties, propertiesLength) { /* write properties to stream */ writeVarByteInt(stream, propertiesLength) for (var propName in properties) { if (properties.hasOwnProperty(propName) && properties[propName] !== null) { var value = properties[propName] - var type = protocol.propertiesTypes[propName] - switch (type) { - case 'byte': { - stream.write(Buffer.from([protocol.properties[propName]])) - stream.write(Buffer.from([+value])) - break - } - case 'int8': { - stream.write(Buffer.from([protocol.properties[propName]])) - stream.write(Buffer.from([value])) - break - } - case 'binary': { - stream.write(Buffer.from([protocol.properties[propName]])) - writeStringOrBuffer(stream, value) - break - } - case 'int16': { - stream.write(Buffer.from([protocol.properties[propName]])) - writeNumber(stream, value) - break - } - case 'int32': { - stream.write(Buffer.from([protocol.properties[propName]])) - write4ByteNumber(stream, value) - break - } - case 'var': { - stream.write(Buffer.from([protocol.properties[propName]])) - writeVarByteInt(stream, value) - break - } - case 'string': { - stream.write(Buffer.from([protocol.properties[propName]])) - writeString(stream, value) - break - } - case 'pair': { - Object.getOwnPropertyNames(value).forEach(function (name) { - var currentValue = value[name] - if (Array.isArray(currentValue)) { - currentValue.forEach(function (value) { - stream.write(Buffer.from([protocol.properties[propName]])) - writeStringPair(stream, name.toString(), value.toString()) - }) - } else { - stream.write(Buffer.from([protocol.properties[propName]])) - writeStringPair(stream, name.toString(), currentValue.toString()) - } - }) - break - } - default: { - stream.emit('error', new Error('Invalid property ' + propName)) - return false + if (Array.isArray(value)) { + for (var valueIndex = 0; valueIndex < value.length; valueIndex++) { + writeProperty(stream, propName, value[valueIndex]) } + } else { + writeProperty(stream, propName, value) } } } From ef418ecc2f37c4eda9e9216f7af36bfd5cccacda Mon Sep 17 00:00:00 2001 From: Siarhei Buntsevich Date: Mon, 1 Jul 2019 10:06:17 +0300 Subject: [PATCH 2/2] update docs --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index b3d0d8b..56e81c8 100644 --- a/README.md +++ b/README.md @@ -327,7 +327,7 @@ All properties are mandatory. userProperties: { 'test': 'test' }, - subscriptionIdentifier: 120, + subscriptionIdentifier: 120, // can be an Array in message from broker, if message included in few another subscriptions contentType: 'test' } }