From 7995f62067c2d6adac3f0447f53c4c6653d18fa7 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Sat, 17 Oct 2015 12:09:32 +0200 Subject: [PATCH 1/4] Write data directly to a stream instead of allocating a buffer --- benchmarks/generate.js | 2 +- benchmarks/generateTick.js | 53 ++++ benchmarks/writeToStream.js | 58 ++++ constants.js | 97 ++++-- generate.js | 612 +----------------------------------- mqtt.js | 1 + numbers.js | 14 + package.json | 1 + writeToStream.js | 579 ++++++++++++++++++++++++++++++++++ 9 files changed, 788 insertions(+), 629 deletions(-) create mode 100644 benchmarks/generateTick.js create mode 100644 benchmarks/writeToStream.js create mode 100644 numbers.js create mode 100644 writeToStream.js diff --git a/benchmarks/generate.js b/benchmarks/generate.js index 6389cf4..cac2a9d 100644 --- a/benchmarks/generate.js +++ b/benchmarks/generate.js @@ -1,6 +1,6 @@ var mqtt = require('../') - , max = 10000000 + , max = 100000 , i , start = Date.now() , time diff --git a/benchmarks/generateTick.js b/benchmarks/generateTick.js new file mode 100644 index 0000000..e4eefc4 --- /dev/null +++ b/benchmarks/generateTick.js @@ -0,0 +1,53 @@ + +var mqtt = require('../') + , max = 1000000 + , i = 0 + , start = Date.now() + , time + , buf = new Buffer(10) + , net = require('net') + , server = net.createServer(handle) + , dest + +buf.fill('test') + +function handle(sock) { + sock.resume(); +} + +server.listen(0, function() { + dest = net.connect(server.address()); + + dest.on('connect', tickWait); + dest.on('drain', tickWait); + + dest.on('finish', function () { + time = Date.now() - start; + console.log('Total time', time); + console.log('Total packets', max); + console.log('Packet/s', max / time * 1000); + server.close(); + }); +}); + +function tickWait () { + //console.log('tickWait', i) + var res = true + //var toSend = new Buffer(5 + buf.length) + + for (; i < max && res; i++) { + res = dest.write(mqtt.generate({ + cmd: 'publish' + , topic: 'test' + , payload: buf + })) + //buf.copy(toSend, 5) + //res = dest.write(toSend, 'buffer') + //console.log(res) + } + + if (i >= max) { + dest.end(); + return; + } +} diff --git a/benchmarks/writeToStream.js b/benchmarks/writeToStream.js new file mode 100644 index 0000000..4e34bca --- /dev/null +++ b/benchmarks/writeToStream.js @@ -0,0 +1,58 @@ + +var mqtt = require('../') + , max = 1000000 + , i = 0 + , start = Date.now() + , time + , buf = new Buffer(10) + , net = require('net') + , server = net.createServer(handle) + , dest + +function handle(sock) { + sock.resume(); +} + +buf.fill('test') + +server.listen(0, function() { + dest = net.connect(server.address()); + + dest.on('connect', tickWait); + dest.on('drain', tickWait); + + dest.on('finish', function () { + time = Date.now() - start; + console.log('Total time', time); + console.log('Total packets', max); + console.log('Packet/s', max / time * 1000); + server.close(); + }); +}); + +function tickWait() { + var res = true + //var toSend = new Buffer(5) + + for (; i < max && res; i++) { + dest.cork() + res = mqtt.writeToStream({ + cmd: 'publish' + , topic: 'test' + , payload: buf + }, dest) + //dest.write(toSend, 'buffer') + //res = dest.write(buf, 'buffer') + + process.nextTick(uncork, dest); + } + + if (i >= max) { + dest.end(); + return; + } +} + +function uncork (stream) { + stream.uncork() +} diff --git a/constants.js b/constants.js index 04e26fd..bc1cf71 100644 --- a/constants.js +++ b/constants.js @@ -1,7 +1,8 @@ /* Protocol - protocol constants */ +var protocol = module.exports; /* Command code => mnemonic */ -module.exports.types = { +protocol.types = { 0: 'reserved', 1: 'connect', 2: 'connack', @@ -21,32 +22,86 @@ module.exports.types = { }; /* Mnemonic => Command code */ -module.exports.codes = {} -for(var k in module.exports.types) { - var v = module.exports.types[k]; - module.exports.codes[v] = k; +protocol.codes = {} +for(var k in protocol.types) { + var v = protocol.types[k]; + protocol.codes[v] = k; } /* Header */ -module.exports.CMD_SHIFT = 4; -module.exports.CMD_MASK = 0xF0; -module.exports.DUP_MASK = 0x08; -module.exports.QOS_MASK = 0x03; -module.exports.QOS_SHIFT = 1; -module.exports.RETAIN_MASK = 0x01; +protocol.CMD_SHIFT = 4; +protocol.CMD_MASK = 0xF0; +protocol.DUP_MASK = 0x08; +protocol.QOS_MASK = 0x03; +protocol.QOS_SHIFT = 1; +protocol.RETAIN_MASK = 0x01; /* Length */ -module.exports.LENGTH_MASK = 0x7F; -module.exports.LENGTH_FIN_MASK = 0x80; +protocol.LENGTH_MASK = 0x7F; +protocol.LENGTH_FIN_MASK = 0x80; /* Connack */ -module.exports.SESSIONPRESENT_MASK = 0x01; +protocol.SESSIONPRESENT_MASK = 0x01; +protocol.SESSIONPRESENT_HEADER = new Buffer([protocol.SESSIONPRESENT_MASK]); +protocol.CONNACK_HEADER = new Buffer([protocol.codes['connack'] << protocol.CMD_SHIFT]) /* Connect */ -module.exports.USERNAME_MASK = 0x80; -module.exports.PASSWORD_MASK = 0x40; -module.exports.WILL_RETAIN_MASK = 0x20; -module.exports.WILL_QOS_MASK = 0x18; -module.exports.WILL_QOS_SHIFT = 3; -module.exports.WILL_FLAG_MASK = 0x04; -module.exports.CLEAN_SESSION_MASK = 0x02; +protocol.USERNAME_MASK = 0x80; +protocol.PASSWORD_MASK = 0x40; +protocol.WILL_RETAIN_MASK = 0x20; +protocol.WILL_QOS_MASK = 0x18; +protocol.WILL_QOS_SHIFT = 3; +protocol.WILL_FLAG_MASK = 0x04; +protocol.CLEAN_SESSION_MASK = 0x02; +protocol.CONNECT_HEADER = new Buffer([protocol.codes['connect'] << protocol.CMD_SHIFT]) + +function genHeader (type) { + return [0, 1, 2].map(function(qos) { + return [0, 1].map(function(dup) { + return [0, 1].map(function(retain) { + var buf = new Buffer(1) + buf.writeUInt8( + protocol.codes[type] << protocol.CMD_SHIFT | + (dup ? protocol.DUP_MASK : 0 ) | + qos << protocol.QOS_SHIFT | retain, 0, true) + return buf + }); + }); + }); +} + +/* Publish */ +protocol.PUBLISH_HEADER = genHeader('publish'); + +/* SUBSCRIBE */ +protocol.SUBSCRIBE_HEADER = genHeader('subscribe'); + +/* UNSUBSCRIBE */ +protocol.UNSUBSCRIBE_HEADER = genHeader('unsubscribe'); + +/* Confirmations */ +protocol.ACKS = { + unsuback: genHeader('unsuback'), + puback: genHeader('puback'), + pubcomp: genHeader('pubcomp'), + pubrel: genHeader('pubrel'), + pubrec: genHeader('pubrec') +}; + +protocol.SUBACK_HEADER = new Buffer([protocol.codes['suback'] << protocol.CMD_SHIFT]); + +/* Protocol versions */ +protocol.VERSION3 = new Buffer([3]) +protocol.VERSION4 = new Buffer([4]) + +/* QOS */ +protocol.QOS = [0, 1, 2].map(function(qos) { + return new Buffer([qos]) +}) + +/* empty packets */ +protocol.EMPTY = { + pingreq: new Buffer([protocol.codes['pingreq'] << 4, 0]), + pingresp: new Buffer([protocol.codes['pingresp'] << 4, 0]), + disconnect: new Buffer([protocol.codes['disconnect'] << 4, 0]) +}; diff --git a/generate.js b/generate.js index 2c4fad5..6168af8 100644 --- a/generate.js +++ b/generate.js @@ -1,614 +1,12 @@ - 'use strict'; -var protocol = require('./constants') - , empty = new Buffer(0) +var writeToStream = require('./writeToStream') +var bl = require('bl') function generate(packet) { - - switch (packet.cmd) { - case 'connect': - return connect(packet) - case 'connack': - return connack(packet) - case 'publish': - return publish(packet) - case 'puback': - case 'pubrec': - case 'pubrel': - case 'pubcomp': - case 'unsuback': - return confirmation(packet) - case 'subscribe': - return subscribe(packet) - case 'suback': - return suback(packet) - case 'unsubscribe': - return unsubscribe(packet) - case 'pingreq': - case 'pingresp': - case 'disconnect': - return emptyPacket(packet) - default: - throw new Error('unknown command') - } -} - -function connect(opts) { - var opts = opts || {} - , protocolId = opts.protocolId || 'MQTT' - , protocolVersion = opts.protocolVersion || 4 - , will = opts.will - , clean = opts.clean - , keepalive = opts.keepalive || 0 - , clientId = opts.clientId || "" - , username = opts.username - , password = opts.password - - if (clean === undefined) { - clean = true - } - - var length = 0 - - // Must be a string and non-falsy - if (!protocolId || - (typeof protocolId !== "string" && !Buffer.isBuffer(protocolId))) { - throw new Error('Invalid protocol id') - } else { - length += protocolId.length + 2 - } - - // Must be a 1 byte number - if (!protocolVersion || - 'number' !== typeof protocolVersion || - protocolVersion > 255 || - protocolVersion < 0) { - - throw new Error('Invalid protocol version') - } else { - length += 1 - } - - // ClientId might be omitted in 3.1.1, but only if cleanSession is set to 1 - if ((typeof clientId === "string" || Buffer.isBuffer(clientId)) && - (clientId || protocolVersion == 4) && - (clientId || clean)) { - - length += clientId.length + 2 - } else { - - if(protocolVersion < 4) { - - throw new Error('clientId must be supplied before 3.1.1'); - } - - if(clean == 0) { - - throw new Error('clientId must be given if cleanSession set to 0'); - } - } - - // Must be a two byte number - if ('number' !== typeof keepalive || - keepalive < 0 || - keepalive > 65535) { - throw new Error('Invalid keepalive') - } else { - length += 2 - } - - // Connect flags - length += 1 - - // If will exists... - if (will) { - // It must be an object - if ('object' !== typeof will) { - throw new Error('Invalid will') - } - // It must have topic typeof string - if (!will.topic || 'string' !== typeof will.topic) { - throw new Error('Invalid will topic') - } else { - length += Buffer.byteLength(will.topic) + 2 - } - - // Payload - if (will.payload && will.payload) { - if (will.payload.length >= 0) { - if ('string' === typeof will.payload) { - length += Buffer.byteLength(will.payload) + 2 - } else { - length += will.payload.length + 2 - } - } else { - throw new Error('Invalid will payload') - } - } else { - length += 2 - } - } - - // Username - if (username) { - if (username.length) { - length += Buffer.byteLength(username) + 2 - } else { - throw new Error('Invalid username') - } - } - - // Password - if (password) { - if (password.length) { - length += byteLength(password) + 2 - } else { - throw new Error('Invalid password') - } - } - - var buffer = new Buffer(1 + calcLengthLength(length) + length) - , pos = 0 - - // Generate header - buffer.writeUInt8(protocol.codes['connect'] << protocol.CMD_SHIFT, pos++, true) - - // Generate length - pos += writeLength(buffer, pos, length) - - // Generate protocol ID - pos += writeStringOrBuffer(buffer, pos, protocolId) - buffer.writeUInt8(protocolVersion, pos++, true) - - // Connect flags - var flags = 0 - flags |= username ? protocol.USERNAME_MASK : 0 - flags |= password ? protocol.PASSWORD_MASK : 0 - flags |= (will && will.retain) ? protocol.WILL_RETAIN_MASK : 0 - flags |= (will && will.qos) ? - will.qos << protocol.WILL_QOS_SHIFT : 0 - flags |= will ? protocol.WILL_FLAG_MASK : 0 - flags |= clean ? protocol.CLEAN_SESSION_MASK : 0 - - buffer.writeUInt8(flags, pos++, true) - - // Keepalive - pos += writeNumber(buffer, pos, keepalive) - - // Client ID - pos += writeStringOrBuffer(buffer, pos, clientId) - - // Will - if (will) { - pos += writeString(buffer, pos, will.topic) - pos += writeStringOrBuffer(buffer, pos, will.payload) - } - - // Username and password - if (username) - pos += writeStringOrBuffer(buffer, pos, username) - - if (password) - pos += writeStringOrBuffer(buffer, pos, password) - - return buffer -} - -function connack(opts) { - var opts = opts || {} - , rc = opts.returnCode; - - // Check return code - if ('number' !== typeof rc) - throw new Error('Invalid return code'); - - var buffer = new Buffer(4) - , pos = 0; - - buffer.writeUInt8(protocol.codes['connack'] << protocol.CMD_SHIFT, pos++, true); - pos += writeLength(buffer, pos, 2); - buffer.writeUInt8(opts.sessionPresent && protocol.SESSIONPRESENT_MASK || 0, pos++, true); - buffer.writeUInt8(rc, pos++, true); - - return buffer; -} - -function publish(opts) { - var opts = opts || {} - , dup = opts.dup ? protocol.DUP_MASK : 0 - , qos = opts.qos - , retain = opts.retain ? protocol.RETAIN_MASK : 0 - , topic = opts.topic - , payload = opts.payload || empty - , id = opts.messageId; - - var length = 0; - - // Topic must be a non-empty string or Buffer - if (typeof topic === "string") - length += Buffer.byteLength(topic) + 2; - else if (Buffer.isBuffer(topic)) - length += topic.length + 2; - else - throw new Error('Invalid topic'); - - // get the payload length - if (!Buffer.isBuffer(payload)) { - length += Buffer.byteLength(payload); - } else { - length += payload.length; - } - - // Message id must a number if qos > 0 - if (qos && 'number' !== typeof id) { - throw new Error('Invalid message id') - } else if (qos) { - length += 2; - } - - var buffer = new Buffer(1 + calcLengthLength(length) + length) - , pos = 0; - - // Header - buffer.writeUInt8( - protocol.codes['publish'] << protocol.CMD_SHIFT | - dup | - qos << protocol.QOS_SHIFT | - retain, pos++, true); - - // Remaining length - pos += writeLength(buffer, pos, length); - - // Topic - pos += writeStringOrBuffer(buffer, pos, topic); - - // Message ID - if (qos > 0) { - pos += writeNumber(buffer, pos, id); - } - - // Payload - if (!Buffer.isBuffer(payload)) { - writeStringNoPos(buffer, pos, payload); - } else { - writeBuffer(buffer, pos, payload); - } - - return buffer; -} - -/* Puback, pubrec, pubrel and pubcomp */ -function confirmation(opts) { - var opts = opts || {} - , type = opts.cmd || 'puback' - , id = opts.messageId - , dup = (opts.dup && type === 'pubrel') ? protocol.DUP_MASK : 0 - , qos = 0 - - if (type === 'pubrel') - qos = 1 - - // Check message ID - if ('number' !== typeof id) - throw new Error('Invalid message id'); - - var buffer = new Buffer(4) - , pos = 0; - - // Header - buffer[pos++] = - protocol.codes[type] << protocol.CMD_SHIFT | - dup | - qos << protocol.QOS_SHIFT; - - // Length - pos += writeLength(buffer, pos, 2); - - // Message ID - pos += writeNumber(buffer, pos, id); - - return buffer; -} - -function subscribe(opts) { - var opts = opts || {} - , dup = opts.dup ? protocol.DUP_MASK : 0 - , qos = opts.qos || 0 - , id = opts.messageId - , subs = opts.subscriptions; - - var length = 0; - - // Check mid - if ('number' !== typeof id) { - throw new Error('Invalid message id'); - } else { - length += 2; - } - // Check subscriptions - if ('object' === typeof subs && subs.length) { - for (var i = 0; i < subs.length; i += 1) { - var topic = subs[i].topic - , qos = subs[i].qos; - - if ('string' !== typeof topic) { - throw new Error('Invalid subscriptions - invalid topic'); - } - if ('number' !== typeof qos) { - throw new Error('Invalid subscriptions - invalid qos'); - } - - length += Buffer.byteLength(topic) + 2 + 1; - } - } else { - throw new Error('Invalid subscriptions'); - } - - var buffer = new Buffer(1 + calcLengthLength(length) + length) - , pos = 0; - - // Generate header - buffer.writeUInt8( - protocol.codes['subscribe'] << protocol.CMD_SHIFT | - dup | - 1 << protocol.QOS_SHIFT, pos++, true); - - // Generate length - pos += writeLength(buffer, pos, length); - - // Generate message ID - pos += writeNumber(buffer, pos, id); - - // Generate subs - for (var i = 0; i < subs.length; i++) { - var sub = subs[i] - , topic = sub.topic - , qos = sub.qos; - - // Write topic string - pos += writeString(buffer, pos, topic); - // Write qos - buffer.writeUInt8(qos, pos++, true); - } - - return buffer; -} - -function suback(opts) { - var opts = opts || {} - , id = opts.messageId - , granted = opts.granted; - - var length = 0; - - // Check message id - if ('number' !== typeof id) { - throw new Error('Invalid message id'); - } else { - length += 2; - } - // Check granted qos vector - if ('object' === typeof granted && granted.length) { - for (var i = 0; i < granted.length; i += 1) { - if ('number' !== typeof granted[i]) { - throw new Error('Invalid qos vector'); - } - length += 1; - } - } else { - throw new Error('Invalid qos vector'); - } - - var buffer = new Buffer(1 + calcLengthLength(length) + length) - , pos = 0; - - // Header - buffer.writeUInt8(protocol.codes['suback'] << protocol.CMD_SHIFT, pos++, true); - - // Length - pos += writeLength(buffer, pos, length); - - // Message ID - pos += writeNumber(buffer, pos, id); - - // Subscriptions - for (var i = 0; i < granted.length; i++) { - buffer.writeUInt8(granted[i], pos++, true); - } - - return buffer; -} - -function unsubscribe(opts) { - var opts = opts || {} - , id = opts.messageId - , dup = opts.dup ? protocol.DUP_MASK : 0 - , unsubs = opts.unsubscriptions; - - var length = 0; - - // Check message id - if ('number' !== typeof id) { - throw new Error('Invalid message id'); - } else { - length += 2; - } - // Check unsubs - if ('object' === typeof unsubs && unsubs.length) { - for (var i = 0; i < unsubs.length; i += 1) { - if ('string' !== typeof unsubs[i]) { - throw new Error('Invalid unsubscriptions'); - } - length += Buffer.byteLength(unsubs[i]) + 2; - } - } else { - throw new Error('Invalid unsubscriptions'); - } - - var buffer = new Buffer(1 + calcLengthLength(length) + length) - , pos = 0; - - // Header - buffer[pos++] = - protocol.codes['unsubscribe'] << protocol.CMD_SHIFT | - dup | - 1 << protocol.QOS_SHIFT; - - // Length - pos += writeLength(buffer, pos, length); - - // Message ID - pos += writeNumber(buffer, pos, id); - - // Unsubs - for (var i = 0; i < unsubs.length; i++) { - pos += writeString(buffer, pos, unsubs[i]); - } - - return buffer; -} - -function emptyPacket(opts) { - var buf = new Buffer(2); - buf[0] = protocol.codes[opts.cmd] << 4; - buf[1] = 0; - return buf; -} - -/** - * calcLengthLength - calculate the length of the remaining - * length field - * - * @api private - */ -function calcLengthLength(length) { - if (length >= 0 && length < 128) { - return 1 - } else if (length >= 128 && length < 16384) { - return 2 - } else if (length >= 16384 && length < 2097152) { - return 3 - } else if (length >= 2097152 && length < 268435456) { - return 4 - } else { - return 0 - } -} - -/** - * writeLength - write an MQTT style length field to the buffer - * - * @param buffer - destination - * @param pos - offset - * @param length - length (>0) - * @returns number of bytes written - * - * @api private - */ - -function writeLength(buffer, pos, length) { - var digit = 0 - , origPos = pos - - do { - digit = length % 128 | 0 - length = length / 128 | 0 - if (length > 0) { - digit = digit | 0x80 - } - buffer.writeUInt8(digit, pos++, true) - } while (length > 0) - - return pos - origPos -} - -/** - * writeString - write a utf8 string to the buffer - * - * @param buffer - destination - * @param pos - offset - * @param string - string to write - * @return number of bytes written - * - * @api private - */ - -function writeString(buffer, pos, string) { - var strlen = Buffer.byteLength(string) - writeNumber(buffer, pos, strlen) - - writeStringNoPos(buffer, pos + 2, string) - - return strlen + 2 -} - -function writeStringNoPos(buffer, pos, string) { - buffer.write(string, pos) -} - -/** - * write_buffer - write buffer to buffer - * - * @param buffer - dest buffer - * @param pos - offset - * @param src - source buffer - * @return number of bytes written - * - * @api private - */ - -function writeBuffer(buffer, pos, src) { - src.copy(buffer, pos) - return src.length -} - -/** - * writeNumber - write a two byte number to the buffer - * - * @param buffer - destination - * @param pos - offset - * @param number - number to write - * @return number of bytes written - * - * @api private - */ -function writeNumber(buffer, pos, number) { - buffer.writeUInt8(number >> 8, pos, true) - buffer.writeUInt8(number & 0x00FF, pos + 1, true) - - return 2 -} - -/** - * writeStringOrBuffer - write a String or Buffer with the its length prefix - * - * @param buffer - destination - * @param pos - offset - * @param toWrite - String or Buffer - * @return number of bytes written - */ -function writeStringOrBuffer(buffer, pos, toWrite) { - var written = 0 - - if (toWrite && typeof toWrite === 'string') { - written += writeString(buffer, pos + written, toWrite) - } else if (toWrite) { - written += writeNumber(buffer, pos + written, toWrite.length) - written += writeBuffer(buffer, pos + written, toWrite) - } else { - written += writeNumber(buffer, pos + written, 0) - } - - return written -} - -function byteLength(bufOrString) { - if (Buffer.isBuffer(bufOrString)) { - return bufOrString.length - } else { - return Buffer.byteLength(bufOrString) - } + var stream = bl() + writeToStream(packet, stream) + return stream.slice() } module.exports = generate diff --git a/mqtt.js b/mqtt.js index c2dbcfe..ccafe0b 100644 --- a/mqtt.js +++ b/mqtt.js @@ -3,3 +3,4 @@ exports.parser = require('./parser') exports.generate = require('./generate') +exports.writeToStream = require('./writeToStream') diff --git a/numbers.js b/numbers.js new file mode 100644 index 0000000..a05fe38 --- /dev/null +++ b/numbers.js @@ -0,0 +1,14 @@ +'use strict' + +var max = 65536 +var cache = {} +var buffer + +for (var i = 0; i < max; i++) { + buffer = new Buffer(2) + buffer.writeUInt8(i >> 8, 0, true) + buffer.writeUInt8(i & 0x00FF, 0 + 1, true) + cache[i] = buffer; +} + +module.exports = cache diff --git a/package.json b/package.json index cb2633e..638ae8c 100644 --- a/package.json +++ b/package.json @@ -29,6 +29,7 @@ }, "homepage": "https://github.com/mqttjs/mqtt-packet", "devDependencies": { + "dev-null": "^0.1.1", "faucet": "0.0.1", "pre-commit": "^1.1.1", "tape": "^4.2.0" diff --git a/writeToStream.js b/writeToStream.js new file mode 100644 index 0000000..b5d60d9 --- /dev/null +++ b/writeToStream.js @@ -0,0 +1,579 @@ + +'use strict'; + +var protocol = require('./constants') + , empty = new Buffer(0) + , zeroBuf = new Buffer([0]) + , numCache = require('./numbers') + +function generate(packet, stream) { + + switch (packet.cmd) { + case 'connect': + return connect(packet, stream); + case 'connack': + return connack(packet, stream); + case 'publish': + return publish(packet, stream); + case 'puback': + case 'pubrec': + case 'pubrel': + case 'pubcomp': + case 'unsuback': + return confirmation(packet, stream); + case 'subscribe': + return subscribe(packet, stream); + case 'suback': + return suback(packet, stream); + case 'unsubscribe': + return unsubscribe(packet, stream); + case 'pingreq': + case 'pingresp': + case 'disconnect': + return emptyPacket(packet, stream); + default: + stream.emit('error', new Error('unknown command')); + return false; + } +} + +function connect(opts, stream) { + var opts = opts || {} + , protocolId = opts.protocolId || 'MQTT' + , protocolVersion = opts.protocolVersion || 4 + , will = opts.will + , clean = opts.clean + , keepalive = opts.keepalive || 0 + , clientId = opts.clientId || "" + , username = opts.username + , password = opts.password + + if (clean === undefined) { + clean = true + } + + var length = 0 + + // Must be a string and non-falsy + if (!protocolId || + (typeof protocolId !== "string" && !Buffer.isBuffer(protocolId))) { + stream.emit('error', new Error('Invalid protocol id')) + } else { + length += protocolId.length + 2 + } + + // Must be 3 or 4 + if (protocolVersion !== 3 && protocolVersion !== 4) { + stream.emit('error', new Error('Invalid protocol version')) + } else { + length += 1 + } + + // ClientId might be omitted in 3.1.1, but only if cleanSession is set to 1 + if ((typeof clientId === "string" || Buffer.isBuffer(clientId)) && + (clientId || protocolVersion == 4) && + (clientId || clean)) { + + length += clientId.length + 2 + } else { + + if (protocolVersion < 4) { + stream.emit('error', new Error('clientId must be supplied before 3.1.1')); + } + + if (clean == 0) { + stream.emit('error', new Error('clientId must be given if cleanSession set to 0')); + } + } + + // Must be a two byte number + if ('number' !== typeof keepalive || + keepalive < 0 || + keepalive > 65535) { + stream.emit('error', new Error('Invalid keepalive')) + } else { + length += 2 + } + + // Connect flags + length += 1 + + // If will exists... + if (will) { + // It must be an object + if ('object' !== typeof will) { + stream.emit('error', new Error('Invalid will')) + } + // It must have topic typeof string + if (!will.topic || 'string' !== typeof will.topic) { + stream.emit('error', new Error('Invalid will topic')) + } else { + length += Buffer.byteLength(will.topic) + 2 + } + + // Payload + if (will.payload && will.payload) { + if (will.payload.length >= 0) { + if ('string' === typeof will.payload) { + length += Buffer.byteLength(will.payload) + 2 + } else { + length += will.payload.length + 2 + } + } else { + stream.emit('error', new Error('Invalid will payload')) + } + } else { + length += 2 + } + } + + // Username + if (username) { + if (username.length) { + length += Buffer.byteLength(username) + 2 + } else { + stream.emit('error', new Error('Invalid username')) + } + } + + // Password + if (password) { + if (password.length) { + length += byteLength(password) + 2 + } else { + stream.emit('error', new Error('Invalid password')) + } + } + + // Generate header + stream.write(protocol.CONNECT_HEADER); + + // Generate length + writeLength(stream, length) + + // Generate protocol ID + writeStringOrBuffer(stream, protocolId) + stream.write( + protocolVersion === 4 ? + protocol.VERSION4 : protocol.VERSION3 + ); + + // Connect flags + var flags = 0 + flags |= username ? protocol.USERNAME_MASK : 0 + flags |= password ? protocol.PASSWORD_MASK : 0 + flags |= (will && will.retain) ? protocol.WILL_RETAIN_MASK : 0 + flags |= (will && will.qos) ? + will.qos << protocol.WILL_QOS_SHIFT : 0 + flags |= will ? protocol.WILL_FLAG_MASK : 0 + flags |= clean ? protocol.CLEAN_SESSION_MASK : 0 + + stream.write(new Buffer([flags])); + + // Keepalive + writeNumber(stream, keepalive); + + // Client ID + writeStringOrBuffer(stream, clientId); + + // Will + if (will) { + writeString(stream, will.topic); + writeStringOrBuffer(stream, will.payload); + } + + // Username and password + if (username) + writeStringOrBuffer(stream, username); + + if (password) + writeStringOrBuffer(stream, password); + + // this is a small packet that + // happens only once on a stream + // we assume the stream is always free + // to receive more data after this + return true +} + +function connack(opts, stream) { + var opts = opts || {} + , rc = opts.returnCode; + + // Check return code + if ('number' !== typeof rc) + stream.emit('error', new Error('Invalid return code')); + + stream.write(protocol.CONNACK_HEADER); + writeLength(stream, 2); + stream.write(opts.sessionPresent ? + protocol.SESSIONPRESENT_HEADER : zeroBuf); + + return stream.write(new Buffer([rc])); +} + +function publish(opts, stream) { + var opts = opts || {} + , qos = opts.qos || 0 + , retain = opts.retain ? protocol.RETAIN_MASK : 0 + , topic = opts.topic + , payload = opts.payload || empty + , id = opts.messageId; + + var length = 0; + + // Topic must be a non-empty string or Buffer + if (typeof topic === "string") + length += Buffer.byteLength(topic) + 2; + else if (Buffer.isBuffer(topic)) + length += topic.length + 2; + else + stream.emit('error', new Error('Invalid topic')); + + // get the payload length + if (!Buffer.isBuffer(payload)) { + length += Buffer.byteLength(payload); + } else { + length += payload.length; + } + + // Message id must a number if qos > 0 + if (qos && 'number' !== typeof id) { + stream.emit('error', new Error('Invalid message id')) + } else if (qos) { + length += 2; + } + + // Header + stream.write(protocol.PUBLISH_HEADER[qos][opts.dup ? 1 : 0][retain ? 1 : 0]); + + // Remaining length + writeLength(stream, length); + + // Topic + writeNumber(stream, byteLength(topic)); + stream.write(topic); + + // Message ID + if (qos > 0) { + writeNumber(stream, id); + } + + // Payload + return stream.write(payload) +} + +/* Puback, pubrec, pubrel and pubcomp */ +function confirmation(opts, stream) { + var opts = opts || {} + , type = opts.cmd || 'puback' + , id = opts.messageId + , dup = (opts.dup && type === 'pubrel') ? protocol.DUP_MASK : 0 + , qos = 0 + + if (type === 'pubrel') + qos = 1 + + // Check message ID + if ('number' !== typeof id) + stream.emit('error', new Error('Invalid message id')); + + // Header + stream.write(protocol.ACKS[type][qos][dup][0]) + + // Length + writeLength(stream, 2); + + // Message ID + return writeNumber(stream, id); +} + +function subscribe(opts, stream) { + var opts = opts || {} + , dup = opts.dup ? protocol.DUP_MASK : 0 + , qos = opts.qos || 0 + , id = opts.messageId + , subs = opts.subscriptions; + + var length = 0; + + // Check mid + if ('number' !== typeof id) { + stream.emit('error', new Error('Invalid message id')); + } else { + length += 2; + } + // Check subscriptions + if ('object' === typeof subs && subs.length) { + for (var i = 0; i < subs.length; i += 1) { + var topic = subs[i].topic + , qos = subs[i].qos; + + if ('string' !== typeof topic) { + stream.emit('error', new Error('Invalid subscriptions - invalid topic')); + } + if ('number' !== typeof qos) { + stream.emit('error', new Error('Invalid subscriptions - invalid qos')); + } + + length += Buffer.byteLength(topic) + 2 + 1; + } + } else { + stream.emit('error', new Error('Invalid subscriptions')); + } + + // Generate header + stream.write(protocol.SUBSCRIBE_HEADER[1][dup ? 1 : 0][0]); + + // Generate length + writeLength(stream, length); + + // Generate message ID + writeNumber(stream, id); + + var result = true + + // Generate subs + for (var i = 0; i < subs.length; i++) { + var sub = subs[i] + , topic = sub.topic + , qos = sub.qos; + + // Write topic string + writeString(stream, topic); + // Write qos + result = stream.write(protocol.QOS[qos]); + } + + return result; +} + +function suback(opts, stream) { + var opts = opts || {} + , id = opts.messageId + , granted = opts.granted; + + var length = 0; + + // Check message id + if ('number' !== typeof id) { + stream.emit('error', new Error('Invalid message id')); + } else { + length += 2; + } + // Check granted qos vector + if ('object' === typeof granted && granted.length) { + for (var i = 0; i < granted.length; i += 1) { + if ('number' !== typeof granted[i]) { + stream.emit('error', new Error('Invalid qos vector')); + } + length += 1; + } + } else { + stream.emit('error', new Error('Invalid qos vector')); + } + + // header + stream.write(protocol.SUBACK_HEADER); + + // Length + writeLength(stream, length); + + // Message ID + writeNumber(stream, id); + + return stream.write(new Buffer(granted)); +} + +function unsubscribe(opts, stream) { + var opts = opts || {} + , id = opts.messageId + , dup = opts.dup ? protocol.DUP_MASK : 0 + , unsubs = opts.unsubscriptions; + + var length = 0; + + // Check message id + if ('number' !== typeof id) { + stream.emit('error', new Error('Invalid message id')); + } else { + length += 2; + } + // Check unsubs + if ('object' === typeof unsubs && unsubs.length) { + for (var i = 0; i < unsubs.length; i += 1) { + if ('string' !== typeof unsubs[i]) { + stream.emit('error', new Error('Invalid unsubscriptions')); + } + length += Buffer.byteLength(unsubs[i]) + 2; + } + } else { + stream.emit('error', new Error('Invalid unsubscriptions')); + } + + // Header + stream.write(protocol.UNSUBSCRIBE_HEADER[1][dup ? 1 : 0][0]); + + // Length + writeLength(stream, length); + + // Message ID + writeNumber(stream, id); + + // Unsubs + var result = true + for (var i = 0; i < unsubs.length; i++) { + result = writeString(stream, unsubs[i]); + } + + return result; +} + +function emptyPacket(opts, stream) { + return stream.write(protocol.EMPTY[opts.cmd]); +} + +/** + * calcLengthLength - calculate the length of the remaining + * length field + * + * @api private + */ +function calcLengthLength(length) { + if (length >= 0 && length < 128) { + return 1 + } else if (length >= 128 && length < 16384) { + return 2 + } else if (length >= 16384 && length < 2097152) { + return 3 + } else if (length >= 2097152 && length < 268435456) { + return 4 + } else { + return 0 + } +} + +function genBufLength(length) { + var digit = 0 + , pos = 0 + , buffer = new Buffer(calcLengthLength(length)) + + do { + digit = length % 128 | 0 + length = length / 128 | 0 + if (length > 0) { + digit = digit | 0x80 + } + buffer.writeUInt8(digit, pos++, true) + } while (length > 0) + + return buffer +} + +/** + * writeLength - write an MQTT style length field to the buffer + * + * @param buffer - destination + * @param pos - offset + * @param length - length (>0) + * @returns number of bytes written + * + * @api private + */ + +var lengthCache = {} +function writeLength(stream, length) { + var buffer = lengthCache[length] + + if (!buffer) { + buffer = genBufLength(length) + if (length < 16384) { + lengthCache[length] = buffer + } + } + + stream.write(buffer) +} + +/** + * writeString - write a utf8 string to the buffer + * + * @param buffer - destination + * @param pos - offset + * @param string - string to write + * @return number of bytes written + * + * @api private + */ + +function writeString(stream, string) { + var strlen = Buffer.byteLength(string) + writeNumber(stream, strlen) + + stream.write(string, 'utf8') +} + +function writeStringNoPos(buffer, pos, string) { + buffer.write(string, pos) +} + +/** + * write_buffer - write buffer to buffer + * + * @param buffer - dest buffer + * @param pos - offset + * @param src - source buffer + * @return number of bytes written + * + * @api private + */ + +function writeBuffer(buffer, pos, src) { + src.copy(buffer, pos) + return src.length +} + +/** + * writeNumber - write a two byte number to the buffer + * + * @param buffer - destination + * @param pos - offset + * @param number - number to write + * @return number of bytes written + * + * @api private + */ +function writeNumber(stream, number) { + return stream.write(numCache[number]) +} + +/** + * writeStringOrBuffer - write a String or Buffer with the its length prefix + * + * @param buffer - destination + * @param pos - offset + * @param toWrite - String or Buffer + * @return number of bytes written + */ +function writeStringOrBuffer(stream, toWrite) { + if (toWrite && typeof toWrite === 'string') { + writeString(stream, toWrite) + } else if (toWrite) { + writeNumber(stream, toWrite.length) + stream.write(toWrite) + } else { + writeNumber(stream, 0) + } +} + +function byteLength(bufOrString) { + if (!bufOrString) { + return 0 + } else if (Buffer.isBuffer(bufOrString)) { + return bufOrString.length + } else { + return Buffer.byteLength(bufOrString) + } +} + +module.exports = generate From 3d5c79d823d8d6ae5a569933b2ec615b09423f97 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Sun, 18 Oct 2015 11:11:26 +0200 Subject: [PATCH 2/4] Faster generate. --- generate.js | 51 ++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 48 insertions(+), 3 deletions(-) diff --git a/generate.js b/generate.js index 6168af8..8a34867 100644 --- a/generate.js +++ b/generate.js @@ -1,12 +1,57 @@ 'use strict'; var writeToStream = require('./writeToStream') -var bl = require('bl') + , EE = require('events').EventEmitter + , inherits = require('inherits') function generate(packet) { - var stream = bl() + var stream = new Accumulator() writeToStream(packet, stream) - return stream.slice() + return stream.concat() } +function Accumulator() { + this._array = new Array(20) + this._i = 0 +} + +inherits(Accumulator, EE) + +Accumulator.prototype.write = function (chunk) { + this._array[this._i++] = chunk + return true +}; + +Accumulator.prototype.concat = function () { + var length = 0 + , lengths = new Array(this._array.length) + , list = this._array + , pos = 0 + , i + , result; + + for (i = 0; i < list.length && list[i]; i++) { + if (typeof list[i] !== 'string') { + lengths[i] = list[i].length; + } else { + lengths[i] = Buffer.byteLength(list[i]); + } + length += lengths[i]; + } + + result = new Buffer(length); + + for (i = 0; i < list.length && list[i]; i++) { + if (typeof list[i] !== 'string') { + list[i].copy(result, pos); + pos += lengths[i]; + } else { + result.write(list[i], pos); + pos += lengths[i]; + } + } + + return result; +}; + module.exports = generate From cbd649df8a440e60c07d003c955b9e3668849ee4 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Sun, 18 Oct 2015 11:24:55 +0200 Subject: [PATCH 3/4] Added README. --- README.md | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/README.md b/README.md index ae48dd9..6fc4063 100644 --- a/README.md +++ b/README.md @@ -87,6 +87,7 @@ API --- * mqtt#generate() + * mqtt#writeToStream() * mqtt#parser() @@ -96,6 +97,36 @@ Generates a `Buffer` containing an MQTT packet. The object must be one of the ones specified by the [packets](#packets) section. Throws an `Error` if a packet cannot be generated. + +### mqtt.writeToStream(object, stream) + +Writes the mqtt packet defined by `object` to the given stream. +The object must be one of the ones specified by the [packets](#packets) +section. Emits an `Error` on the stream if a packet cannot be generated. + +This function is best used with `cork()` in the Streams3 API, as +follows: + +```js + +funciton send(packet, stream) { + stream.cork() + var res = mqtt.writeToStream({ + cmd: 'publish' + , topic: 'test' + , payload: buf + }, stream); + process.nextTick(uncork, stream); +} + +// this should be defined at the top +// level +function uncork (stream) { + stream.uncork() +} + +``` + ### mqtt.parser() From da497016562ebb4d0cc9e6ae706c6d6926764043 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Tue, 20 Oct 2015 10:06:53 +0100 Subject: [PATCH 4/4] Included stream.cork() and stream.uncork() as part of writeToStream. --- README.md | 25 ++----------------------- benchmarks/writeToStream.js | 7 ------- writeToStream.js | 8 ++++++++ 3 files changed, 10 insertions(+), 30 deletions(-) diff --git a/README.md b/README.md index 6fc4063..1171a77 100644 --- a/README.md +++ b/README.md @@ -103,29 +103,8 @@ section. Throws an `Error` if a packet cannot be generated. Writes the mqtt packet defined by `object` to the given stream. The object must be one of the ones specified by the [packets](#packets) section. Emits an `Error` on the stream if a packet cannot be generated. - -This function is best used with `cork()` in the Streams3 API, as -follows: - -```js - -funciton send(packet, stream) { - stream.cork() - var res = mqtt.writeToStream({ - cmd: 'publish' - , topic: 'test' - , payload: buf - }, stream); - process.nextTick(uncork, stream); -} - -// this should be defined at the top -// level -function uncork (stream) { - stream.uncork() -} - -``` +On node >= 12, this function automatically calls `cork()` on your stream, +and then it calls `uncork()` on the next tick. ### mqtt.parser() diff --git a/benchmarks/writeToStream.js b/benchmarks/writeToStream.js index 4e34bca..d595299 100644 --- a/benchmarks/writeToStream.js +++ b/benchmarks/writeToStream.js @@ -35,7 +35,6 @@ function tickWait() { //var toSend = new Buffer(5) for (; i < max && res; i++) { - dest.cork() res = mqtt.writeToStream({ cmd: 'publish' , topic: 'test' @@ -43,8 +42,6 @@ function tickWait() { }, dest) //dest.write(toSend, 'buffer') //res = dest.write(buf, 'buffer') - - process.nextTick(uncork, dest); } if (i >= max) { @@ -52,7 +49,3 @@ function tickWait() { return; } } - -function uncork (stream) { - stream.uncork() -} diff --git a/writeToStream.js b/writeToStream.js index b5d60d9..a2c58c5 100644 --- a/writeToStream.js +++ b/writeToStream.js @@ -7,6 +7,10 @@ var protocol = require('./constants') , numCache = require('./numbers') function generate(packet, stream) { + if (stream.cork) { + stream.cork() + process.nextTick(uncork, stream) + } switch (packet.cmd) { case 'connect': @@ -37,6 +41,10 @@ function generate(packet, stream) { } } +function uncork(stream) { + stream.uncork(); +} + function connect(opts, stream) { var opts = opts || {} , protocolId = opts.protocolId || 'MQTT'