Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write directly to the Stream #6

Merged
merged 4 commits into from
Oct 21, 2015
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ API
---

* <a href="#generate"><code>mqtt#<b>generate()</b></code></a>
* <a href="#writeToStream"><code>mqtt#<b>writeToStream()</b></code></a>
* <a href="#parser"><code>mqtt#<b>parser()</b></code></a>

<a name="generate">
Expand All @@ -96,6 +97,36 @@ Generates a `Buffer` containing an MQTT packet.
The object must be one of the ones specified by the [packets](#packets)
section. Throws an `Error` if a packet cannot be generated.

<a name="writeToStream">
### mqtt.writeToStream(object, stream)

Writes the mqtt packet defined by `object` to the given stream.
The object must be one of the ones specified by the [packets](#packets)
section. Emits an `Error` on the stream if a packet cannot be generated.

This function is best used with `cork()` in the Streams3 API, as
follows:

```js

funciton send(packet, stream) {
stream.cork()
var res = mqtt.writeToStream({
cmd: 'publish'
, topic: 'test'
, payload: buf
}, stream);
process.nextTick(uncork, stream);
}

// this should be defined at the top
// level
function uncork (stream) {
stream.uncork()
}

```

<a name="parser">
### mqtt.parser()

Expand Down
2 changes: 1 addition & 1 deletion benchmarks/generate.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@

var mqtt = require('../')
, max = 10000000
, max = 100000
, i
, start = Date.now()
, time
Expand Down
53 changes: 53 additions & 0 deletions benchmarks/generateTick.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@

var mqtt = require('../')
, max = 1000000
, i = 0
, start = Date.now()
, time
, buf = new Buffer(10)
, net = require('net')
, server = net.createServer(handle)
, dest

buf.fill('test')

function handle(sock) {
sock.resume();
}

server.listen(0, function() {
dest = net.connect(server.address());

dest.on('connect', tickWait);
dest.on('drain', tickWait);

dest.on('finish', function () {
time = Date.now() - start;
console.log('Total time', time);
console.log('Total packets', max);
console.log('Packet/s', max / time * 1000);
server.close();
});
});

function tickWait () {
//console.log('tickWait', i)
var res = true
//var toSend = new Buffer(5 + buf.length)

for (; i < max && res; i++) {
res = dest.write(mqtt.generate({
cmd: 'publish'
, topic: 'test'
, payload: buf
}))
//buf.copy(toSend, 5)
//res = dest.write(toSend, 'buffer')
//console.log(res)
}

if (i >= max) {
dest.end();
return;
}
}
58 changes: 58 additions & 0 deletions benchmarks/writeToStream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@

var mqtt = require('../')
, max = 1000000
, i = 0
, start = Date.now()
, time
, buf = new Buffer(10)
, net = require('net')
, server = net.createServer(handle)
, dest

function handle(sock) {
sock.resume();
}

buf.fill('test')

server.listen(0, function() {
dest = net.connect(server.address());

dest.on('connect', tickWait);
dest.on('drain', tickWait);

dest.on('finish', function () {
time = Date.now() - start;
console.log('Total time', time);
console.log('Total packets', max);
console.log('Packet/s', max / time * 1000);
server.close();
});
});

function tickWait() {
var res = true
//var toSend = new Buffer(5)

for (; i < max && res; i++) {
dest.cork()
res = mqtt.writeToStream({
cmd: 'publish'
, topic: 'test'
, payload: buf
}, dest)
//dest.write(toSend, 'buffer')
//res = dest.write(buf, 'buffer')

process.nextTick(uncork, dest);
}

if (i >= max) {
dest.end();
return;
}
}

function uncork (stream) {
stream.uncork()
}
97 changes: 76 additions & 21 deletions constants.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
/* Protocol - protocol constants */
var protocol = module.exports;

/* Command code => mnemonic */
module.exports.types = {
protocol.types = {
0: 'reserved',
1: 'connect',
2: 'connack',
Expand All @@ -21,32 +22,86 @@ module.exports.types = {
};

/* Mnemonic => Command code */
module.exports.codes = {}
for(var k in module.exports.types) {
var v = module.exports.types[k];
module.exports.codes[v] = k;
protocol.codes = {}
for(var k in protocol.types) {
var v = protocol.types[k];
protocol.codes[v] = k;
}

/* Header */
module.exports.CMD_SHIFT = 4;
module.exports.CMD_MASK = 0xF0;
module.exports.DUP_MASK = 0x08;
module.exports.QOS_MASK = 0x03;
module.exports.QOS_SHIFT = 1;
module.exports.RETAIN_MASK = 0x01;
protocol.CMD_SHIFT = 4;
protocol.CMD_MASK = 0xF0;
protocol.DUP_MASK = 0x08;
protocol.QOS_MASK = 0x03;
protocol.QOS_SHIFT = 1;
protocol.RETAIN_MASK = 0x01;

/* Length */
module.exports.LENGTH_MASK = 0x7F;
module.exports.LENGTH_FIN_MASK = 0x80;
protocol.LENGTH_MASK = 0x7F;
protocol.LENGTH_FIN_MASK = 0x80;

/* Connack */
module.exports.SESSIONPRESENT_MASK = 0x01;
protocol.SESSIONPRESENT_MASK = 0x01;
protocol.SESSIONPRESENT_HEADER = new Buffer([protocol.SESSIONPRESENT_MASK]);
protocol.CONNACK_HEADER = new Buffer([protocol.codes['connack'] << protocol.CMD_SHIFT])

/* Connect */
module.exports.USERNAME_MASK = 0x80;
module.exports.PASSWORD_MASK = 0x40;
module.exports.WILL_RETAIN_MASK = 0x20;
module.exports.WILL_QOS_MASK = 0x18;
module.exports.WILL_QOS_SHIFT = 3;
module.exports.WILL_FLAG_MASK = 0x04;
module.exports.CLEAN_SESSION_MASK = 0x02;
protocol.USERNAME_MASK = 0x80;
protocol.PASSWORD_MASK = 0x40;
protocol.WILL_RETAIN_MASK = 0x20;
protocol.WILL_QOS_MASK = 0x18;
protocol.WILL_QOS_SHIFT = 3;
protocol.WILL_FLAG_MASK = 0x04;
protocol.CLEAN_SESSION_MASK = 0x02;
protocol.CONNECT_HEADER = new Buffer([protocol.codes['connect'] << protocol.CMD_SHIFT])

function genHeader (type) {
return [0, 1, 2].map(function(qos) {
return [0, 1].map(function(dup) {
return [0, 1].map(function(retain) {
var buf = new Buffer(1)
buf.writeUInt8(
protocol.codes[type] << protocol.CMD_SHIFT |
(dup ? protocol.DUP_MASK : 0 ) |
qos << protocol.QOS_SHIFT | retain, 0, true)
return buf
});
});
});
}

/* Publish */
protocol.PUBLISH_HEADER = genHeader('publish');

/* SUBSCRIBE */
protocol.SUBSCRIBE_HEADER = genHeader('subscribe');

/* UNSUBSCRIBE */
protocol.UNSUBSCRIBE_HEADER = genHeader('unsubscribe');

/* Confirmations */
protocol.ACKS = {
unsuback: genHeader('unsuback'),
puback: genHeader('puback'),
pubcomp: genHeader('pubcomp'),
pubrel: genHeader('pubrel'),
pubrec: genHeader('pubrec')
};

protocol.SUBACK_HEADER = new Buffer([protocol.codes['suback'] << protocol.CMD_SHIFT]);

/* Protocol versions */
protocol.VERSION3 = new Buffer([3])
protocol.VERSION4 = new Buffer([4])

/* QOS */
protocol.QOS = [0, 1, 2].map(function(qos) {
return new Buffer([qos])
})

/* empty packets */
protocol.EMPTY = {
pingreq: new Buffer([protocol.codes['pingreq'] << 4, 0]),
pingresp: new Buffer([protocol.codes['pingresp'] << 4, 0]),
disconnect: new Buffer([protocol.codes['disconnect'] << 4, 0])
};
Loading