Skip to content

Commit

Permalink
Implicitly flush Websock if needed
Browse files Browse the repository at this point in the history
Callers shouldn't have to deal with the internal buffering limits of
Websock, so implicitly flush the buffer if more room is needed.
  • Loading branch information
CendioOssman committed Jun 4, 2023
1 parent f8b65f9 commit ccef89f
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 19 deletions.
19 changes: 2 additions & 17 deletions core/rfb.js
Original file line number Diff line number Diff line change
Expand Up @@ -3047,23 +3047,8 @@ RFB.messages = {
}

sock.sQpush32(length);

// We have to keep track of from where in the data we begin creating the
// buffer for the flush in the next iteration.
let dataOffset = 0;

let remaining = data.length;
while (remaining > 0) {

let flushSize = Math.min(remaining, (sock._sQbufferSize - sock._sQlen));

sock.sQpushBytes(data.subarray(dataOffset, dataOffset + flushSize));
sock.flush();

remaining -= flushSize;
dataOffset += flushSize;
}

sock.sQpushBytes(data);
sock.flush();
},

setDesktopSize(sock, width, height, id, flags) {
Expand Down
23 changes: 21 additions & 2 deletions core/websock.js
Original file line number Diff line number Diff line change
Expand Up @@ -176,15 +176,18 @@ export default class Websock {
// Send Queue

sQpush8(num) {
this._sQensureSpace(1);
this._sQ[this._sQlen++] = num;
}

sQpush16(num) {
this._sQensureSpace(2);
this._sQ[this._sQlen++] = (num >> 8) & 0xff;
this._sQ[this._sQlen++] = (num >> 0) & 0xff;
}

sQpush32(num) {
this._sQensureSpace(4);
this._sQ[this._sQlen++] = (num >> 24) & 0xff;
this._sQ[this._sQlen++] = (num >> 16) & 0xff;
this._sQ[this._sQlen++] = (num >> 8) & 0xff;
Expand All @@ -197,8 +200,18 @@ export default class Websock {
}

sQpushBytes(bytes) {
this._sQ.set(bytes, this._sQlen);
this._sQlen += bytes.length;
for (let offset = 0;offset < bytes.length;) {
this._sQensureSpace(1);

let chunkSize = this._sQbufferSize - this._sQlen;
if (chunkSize > bytes.length - offset) {
chunkSize = bytes.length - offset;
}

this._sQ.set(bytes.subarray(offset, chunkSize), this._sQlen);
this._sQlen += chunkSize;
offset += chunkSize;
}
}

flush() {
Expand All @@ -208,6 +221,12 @@ export default class Websock {
}
}

_sQensureSpace(bytes) {
if (this._sQbufferSize - this._sQlen < bytes) {
this.flush();
}
}

// Event Handlers
off(evt) {
this._eventHandlers[evt] = () => {};
Expand Down
116 changes: 116 additions & 0 deletions tests/test.websock.js
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ describe('Websock', function () {
describe('Send queue methods', function () {
let sock;

const bufferSize = 10 * 1024;

beforeEach(function () {
let websock = new FakeWebSocket();
websock._open();
Expand All @@ -167,6 +169,18 @@ describe('Websock', function () {
sock.sQpush8(42);
expect(sock).to.have.sent(new Uint8Array([]));
});
it('should implicitly flush if the queue is full', function () {
for (let i = 0;i <= bufferSize;i++) {
sock.sQpush8(42);
}

let expected = [];
for (let i = 0;i < bufferSize;i++) {
expected.push(42);
}

expect(sock).to.have.sent(new Uint8Array(expected));
});
});

describe('sQpush16()', function () {
Expand All @@ -179,6 +193,19 @@ describe('Websock', function () {
sock.sQpush16(420);
expect(sock).to.have.sent(new Uint8Array([]));
});
it('should implicitly flush if the queue is full', function () {
for (let i = 0;i <= bufferSize/2;i++) {
sock.sQpush16(420);
}

let expected = [];
for (let i = 0;i < bufferSize/2;i++) {
expected.push(1);
expected.push(164);
}

expect(sock).to.have.sent(new Uint8Array(expected));
});
});

describe('sQpush32()', function () {
Expand All @@ -191,6 +218,21 @@ describe('Websock', function () {
sock.sQpush32(420420);
expect(sock).to.have.sent(new Uint8Array([]));
});
it('should implicitly flush if the queue is full', function () {
for (let i = 0;i <= bufferSize/4;i++) {
sock.sQpush32(420420);
}

let expected = [];
for (let i = 0;i < bufferSize/4;i++) {
expected.push(0);
expected.push(6);
expected.push(106);
expected.push(68);
}

expect(sock).to.have.sent(new Uint8Array(expected));
});
});

describe('sQpushString()', function () {
Expand All @@ -203,6 +245,41 @@ describe('Websock', function () {
sock.sQpushString('\x12\x34\x56\x78\x90');
expect(sock).to.have.sent(new Uint8Array([]));
});
it('should implicitly flush if the queue is full', function () {
for (let i = 0;i <= bufferSize/5;i++) {
sock.sQpushString('\x12\x34\x56\x78\x90');
}

let expected = [];
for (let i = 0;i < bufferSize/5;i++) {
expected.push(0x12);
expected.push(0x34);
expected.push(0x56);
expected.push(0x78);
expected.push(0x90);
}

expect(sock).to.have.sent(new Uint8Array(expected));
});
it('should implicitly split a large buffer', function () {
let str = '';
for (let i = 0;i <= bufferSize/5;i++) {
str += '\x12\x34\x56\x78\x90';
}

sock.sQpushString(str);

let expected = [];
for (let i = 0;i < bufferSize/5;i++) {
expected.push(0x12);
expected.push(0x34);
expected.push(0x56);
expected.push(0x78);
expected.push(0x90);
}

expect(sock).to.have.sent(new Uint8Array(expected));
});
});

describe('sQpushBytes()', function () {
Expand All @@ -215,6 +292,45 @@ describe('Websock', function () {
sock.sQpushBytes(new Uint8Array([0x12, 0x34, 0x56, 0x78, 0x90]));
expect(sock).to.have.sent(new Uint8Array([]));
});
it('should implicitly flush if the queue is full', function () {
for (let i = 0;i <= bufferSize/5;i++) {
sock.sQpushBytes(new Uint8Array([0x12, 0x34, 0x56, 0x78, 0x90]));
}

let expected = [];
for (let i = 0;i < bufferSize/5;i++) {
expected.push(0x12);
expected.push(0x34);
expected.push(0x56);
expected.push(0x78);
expected.push(0x90);
}

expect(sock).to.have.sent(new Uint8Array(expected));
});
it('should implicitly split a large buffer', function () {
let buffer = [];
for (let i = 0;i <= bufferSize/5;i++) {
buffer.push(0x12);
buffer.push(0x34);
buffer.push(0x56);
buffer.push(0x78);
buffer.push(0x90);
}

sock.sQpushBytes(new Uint8Array(buffer));

let expected = [];
for (let i = 0;i < bufferSize/5;i++) {
expected.push(0x12);
expected.push(0x34);
expected.push(0x56);
expected.push(0x78);
expected.push(0x90);
}

expect(sock).to.have.sent(new Uint8Array(expected));
});
});

describe('flush', function () {
Expand Down

0 comments on commit ccef89f

Please sign in to comment.