Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to enable/disable numbers cache #29

Merged
merged 7 commits into from
Jun 21, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ The object must be one of the ones specified by the [packets](#packets)
section. Emits an `Error` on the stream if a packet cannot be generated.
On node >= 0.12, this function automatically calls `cork()` on your stream,
and then it calls `uncork()` on the next tick.
By default cache for number buffers is enabled.
It creates a list of buffers for faster write. To disable cache set `mqtt.writeToStream.cacheNumbers = false`.
Should be set before any `writeToStream` calls.

<a name="parser">

Expand Down
11 changes: 10 additions & 1 deletion benchmarks/generate.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
'use strict'

var mqtt = require('../')
var max = 100000
var i
var buf = Buffer.from('test')

// initialize it
mqtt.generate({
cmd: 'publish',
topic: 'test',
payload: buf
})

var start = Date.now()
var time
var buf = Buffer.from('test')

for (i = 0; i < max; i++) {
mqtt.generate({
Expand Down
20 changes: 15 additions & 5 deletions numbers.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,23 @@
var Buffer = require('safe-buffer').Buffer
var max = 65536
var cache = {}
var buffer

for (var i = 0; i < max; i++) {
buffer = Buffer.allocUnsafe(2)
function generateBuffer (i) {
var buffer = Buffer.allocUnsafe(2)
buffer.writeUInt8(i >> 8, 0, true)
buffer.writeUInt8(i & 0x00FF, 0 + 1, true)
cache[i] = buffer

return buffer
}

function generateCache () {
for (var i = 0; i < max; i++) {
cache[i] = generateBuffer(i)
}
}

module.exports = cache
module.exports = {
cache: cache,
generateCache: generateCache,
generateNumber: generateBuffer
}
34 changes: 34 additions & 0 deletions test.js
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,40 @@ function testWriteToStreamError (expected, fixture) {
})
}

test('disabled numbers cache', function (t) {
var stream = WS()
var message = {
cmd: 'publish',
retain: false,
qos: 0,
dup: false,
length: 10,
topic: Buffer.from('test'),
payload: Buffer.from('test')
}
var expected = Buffer.from([
48, 10, // Header
0, 4, // Topic length
116, 101, 115, 116, // Topic (test)
116, 101, 115, 116 // Payload (test)
])
var written = Buffer.alloc(0)

stream.write = (chunk) => {
written = Buffer.concat([written, chunk])
}
mqtt.writeToStream.cacheNumbers = false

mqtt.writeToStream(message, stream)

t.deepEqual(written, expected, 'written buffer is expected')

mqtt.writeToStream.cacheNumbers = true

stream.end()
t.end()
})

testParseGenerate('minimal connect', {
cmd: 'connect',
retain: false,
Expand Down
36 changes: 34 additions & 2 deletions writeToStream.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,26 @@ var protocol = require('./constants')
var Buffer = require('safe-buffer').Buffer
var empty = Buffer.allocUnsafe(0)
var zeroBuf = Buffer.from([0])
var numCache = require('./numbers')
var numbers = require('./numbers')
var nextTick = require('process-nextick-args')

var numCache = numbers.cache
var generateNumber = numbers.generateNumber
var generateCache = numbers.generateCache
var writeNumber = writeNumberCached
var toGenerate = true

function generate (packet, stream) {
if (stream.cork) {
stream.cork()
nextTick(uncork, stream)
}

if (toGenerate) {
toGenerate = false
generateCache()
}

switch (packet.cmd) {
case 'connect':
return connect(packet, stream)
Expand Down Expand Up @@ -41,6 +52,24 @@ function generate (packet, stream) {
return false
}
}
/**
* Controls numbers cache.
* Set to "false" to allocate buffers on-the-flight instead of pre-generated cache
*/
Object.defineProperty(generate, 'cacheNumbers', {
get: function () {
return writeNumber === writeNumberCached
},
set: function (value) {
if (value) {
if (!numCache || Object.keys(numCache).length === 0) toGenerate = true
writeNumber = writeNumberCached
} else {
toGenerate = false
writeNumber = writeNumberGenerated
}
}
})

function uncork (stream) {
stream.uncork()
Expand Down Expand Up @@ -519,9 +548,12 @@ function writeString (stream, string) {
*
* @api private
*/
function writeNumber (stream, number) {
function writeNumberCached (stream, number) {
return stream.write(numCache[number])
}
function writeNumberGenerated (stream, number) {
return stream.write(generateNumber(number))
}

/**
* writeStringOrBuffer - write a String or Buffer with the its length prefix
Expand Down