Skip to content

Commit

Permalink
Merge pull request #29 from kirilknysh/feature/dynamic-num-cache
Browse files Browse the repository at this point in the history
Add option to enable/disable numbers cache
  • Loading branch information
mcollina authored Jun 21, 2017
2 parents b603b22 + 2a338d8 commit a0e3f1d
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 8 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ The object must be one of the ones specified by the [packets](#packets)
section. Emits an `Error` on the stream if a packet cannot be generated.
On node >= 0.12, this function automatically calls `cork()` on your stream,
and then it calls `uncork()` on the next tick.
By default cache for number buffers is enabled.
It creates a list of buffers for faster write. To disable cache set `mqtt.writeToStream.cacheNumbers = false`.
Should be set before any `writeToStream` calls.
<a name="parser">
Expand Down
11 changes: 10 additions & 1 deletion benchmarks/generate.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
'use strict'

var mqtt = require('../')
var max = 100000
var i
var buf = Buffer.from('test')

// initialize it
mqtt.generate({
cmd: 'publish',
topic: 'test',
payload: buf
})

var start = Date.now()
var time
var buf = Buffer.from('test')

for (i = 0; i < max; i++) {
mqtt.generate({
Expand Down
20 changes: 15 additions & 5 deletions numbers.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,23 @@
var Buffer = require('safe-buffer').Buffer
var max = 65536
var cache = {}
var buffer

for (var i = 0; i < max; i++) {
buffer = Buffer.allocUnsafe(2)
function generateBuffer (i) {
var buffer = Buffer.allocUnsafe(2)
buffer.writeUInt8(i >> 8, 0, true)
buffer.writeUInt8(i & 0x00FF, 0 + 1, true)
cache[i] = buffer

return buffer
}

function generateCache () {
for (var i = 0; i < max; i++) {
cache[i] = generateBuffer(i)
}
}

module.exports = cache
module.exports = {
cache: cache,
generateCache: generateCache,
generateNumber: generateBuffer
}
34 changes: 34 additions & 0 deletions test.js
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,40 @@ function testWriteToStreamError (expected, fixture) {
})
}

test('disabled numbers cache', function (t) {
var stream = WS()
var message = {
cmd: 'publish',
retain: false,
qos: 0,
dup: false,
length: 10,
topic: Buffer.from('test'),
payload: Buffer.from('test')
}
var expected = Buffer.from([
48, 10, // Header
0, 4, // Topic length
116, 101, 115, 116, // Topic (test)
116, 101, 115, 116 // Payload (test)
])
var written = Buffer.alloc(0)

stream.write = (chunk) => {
written = Buffer.concat([written, chunk])
}
mqtt.writeToStream.cacheNumbers = false

mqtt.writeToStream(message, stream)

t.deepEqual(written, expected, 'written buffer is expected')

mqtt.writeToStream.cacheNumbers = true

stream.end()
t.end()
})

testParseGenerate('minimal connect', {
cmd: 'connect',
retain: false,
Expand Down
36 changes: 34 additions & 2 deletions writeToStream.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,26 @@ var protocol = require('./constants')
var Buffer = require('safe-buffer').Buffer
var empty = Buffer.allocUnsafe(0)
var zeroBuf = Buffer.from([0])
var numCache = require('./numbers')
var numbers = require('./numbers')
var nextTick = require('process-nextick-args')

var numCache = numbers.cache
var generateNumber = numbers.generateNumber
var generateCache = numbers.generateCache
var writeNumber = writeNumberCached
var toGenerate = true

function generate (packet, stream) {
if (stream.cork) {
stream.cork()
nextTick(uncork, stream)
}

if (toGenerate) {
toGenerate = false
generateCache()
}

switch (packet.cmd) {
case 'connect':
return connect(packet, stream)
Expand Down Expand Up @@ -41,6 +52,24 @@ function generate (packet, stream) {
return false
}
}
/**
* Controls numbers cache.
* Set to "false" to allocate buffers on-the-flight instead of pre-generated cache
*/
Object.defineProperty(generate, 'cacheNumbers', {
get: function () {
return writeNumber === writeNumberCached
},
set: function (value) {
if (value) {
if (!numCache || Object.keys(numCache).length === 0) toGenerate = true
writeNumber = writeNumberCached
} else {
toGenerate = false
writeNumber = writeNumberGenerated
}
}
})

function uncork (stream) {
stream.uncork()
Expand Down Expand Up @@ -519,9 +548,12 @@ function writeString (stream, string) {
*
* @api private
*/
function writeNumber (stream, number) {
function writeNumberCached (stream, number) {
return stream.write(numCache[number])
}
function writeNumberGenerated (stream, number) {
return stream.write(generateNumber(number))
}

/**
* writeStringOrBuffer - write a String or Buffer with the its length prefix
Expand Down

0 comments on commit a0e3f1d

Please sign in to comment.