Skip to content

Commit

Permalink
Merge pull request #565 from kanaka/bug/dynamic-rq-resize
Browse files Browse the repository at this point in the history
Resize Receive Queue to Fit Incoming Messages
  • Loading branch information
DirectXMan12 committed Jan 5, 2016
2 parents b2a813d + 40037b6 commit 7e4475f
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 17 deletions.
64 changes: 47 additions & 17 deletions include/websock.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ function Websock() {

(function () {
"use strict";
// this has performance issues in some versions Chromium, and
// doesn't gain a tremendous amount of performance increase in Firefox
// at the moment. It may be valuable to turn it on in the future.
var ENABLE_COPYWITHIN = false;

var MAX_RQ_GROW_SIZE = 40 * 1024 * 1024; // 40 MiB

var typedArrayToString = (function () {
// This is only for PhantomJS, which doesn't like apply-ing
Expand Down Expand Up @@ -340,9 +346,49 @@ function Websock() {
return new Uint8Array(this._sQ.buffer, 0, this._sQlen);
},

_expand_compact_rQ: function (min_fit) {
var resizeNeeded = min_fit || this._rQlen - this._rQi > this._rQbufferSize / 2;
if (resizeNeeded) {
if (!min_fit) {
// just double the size if we need to do compaction
this._rQbufferSize *= 2;
} else {
// otherwise, make sure we satisy rQlen - rQi + min_fit < rQbufferSize / 8
this._rQbufferSize = (this._rQlen - this._rQi + min_fit) * 8;
}
}

// we don't want to grow unboundedly
if (this._rQbufferSize > MAX_RQ_GROW_SIZE) {
this._rQbufferSize = MAX_RQ_GROW_SIZE;
if (this._rQbufferSize - this._rQlen - this._rQi < min_fit) {
throw new Exception("Receive Queue buffer exceeded " + MAX_RQ_GROW_SIZE + " bytes, and the new message could not fit");
}
}

if (resizeNeeded) {
var old_rQbuffer = this._rQ.buffer;
this._rQmax = this._rQbufferSize / 8;
this._rQ = new Uint8Array(this._rQbufferSize);
this._rQ.set(new Uint8Array(old_rQbuffer, this._rQi));
} else {
if (ENABLE_COPYWITHIN) {
this._rQ.copyWithin(0, this._rQi);
} else {
this._rQ.set(new Uint8Array(this._rQ.buffer, this._rQi));
}
}

this._rQlen = this._rQlen - this._rQi;
this._rQi = 0;
},

_decode_message: function (data) {
// push arraybuffer values onto the end
var u8 = new Uint8Array(data);
if (u8.length > this._rQbufferSize - this._rQlen) {
this._expand_compact_rQ(u8.length);
}
this._rQ.set(u8, this._rQlen);
this._rQlen += u8.length;
},
Expand All @@ -357,23 +403,7 @@ function Websock() {
this._rQlen = 0;
this._rQi = 0;
} else if (this._rQlen > this._rQmax) {
if (this._rQlen - this._rQi > 0.5 * this._rQbufferSize) {
var old_rQbuffer = this._rQ.buffer;
this._rQbufferSize *= 2;
this._rQmax = this._rQbufferSize / 8;
this._rQ = new Uint8Array(this._rQbufferSize);
this._rQ.set(new Uint8Array(old_rQbuffer, this._rQi));
} else {
if (this._rQ.copyWithin) {
// Firefox only, ATM
this._rQ.copyWithin(0, this._rQi);
} else {
this._rQ.set(new Uint8Array(this._rQ.buffer, this._rQi));
}
}

this._rQlen = this._rQlen - this._rQi;
this._rQi = 0;
this._expand_compact_rQ();
}
} else {
Util.Debug("Ignoring empty message");
Expand Down
14 changes: 14 additions & 0 deletions tests/test.websock.js
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,20 @@ describe('Websock', function() {
expect(sock.get_rQi()).to.equal(0);
});

it('should automatically resize the receive queue if the incoming message is too large', function () {
sock._rQ = new Uint8Array(20);
sock._rQlen = 0;
sock.set_rQi(0);
sock._rQbufferSize = 20;
sock._rQmax = 2;
var msg = { data: new Uint8Array(30).buffer };
sock._mode = 'binary';
sock._recv_message(msg);
expect(sock._rQlen).to.equal(30);
expect(sock.get_rQi()).to.equal(0);
expect(sock._rQ.length).to.equal(240); // keep the invariant that rQbufferSize / 8 >= rQlen
});

it('should call the error event handler on an exception', function () {
sock._eventHandlers.error = sinon.spy();
sock._eventHandlers.message = sinon.stub().throws();
Expand Down

0 comments on commit 7e4475f

Please sign in to comment.