diff --git a/doc/ws.md b/doc/ws.md index fc14e7e97..20facffbb 100644 --- a/doc/ws.md +++ b/doc/ws.md @@ -31,15 +31,18 @@ - [websocket.bufferedAmount](#websocketbufferedamount) - [websocket.close([code[, reason]])](#websocketclosecode-reason) - [websocket.extensions](#websocketextensions) + - [websocket.isPaused](#websocketispaused) - [websocket.onclose](#websocketonclose) - [websocket.onerror](#websocketonerror) - [websocket.onmessage](#websocketonmessage) - [websocket.onopen](#websocketonopen) + - [websocket.pause()](#websocketpause) - [websocket.ping([data[, mask]][, callback])](#websocketpingdata-mask-callback) - [websocket.pong([data[, mask]][, callback])](#websocketpongdata-mask-callback) - [websocket.protocol](#websocketprotocol) - [websocket.readyState](#websocketreadystate) - [websocket.removeEventListener(type, listener)](#websocketremoveeventlistenertype-listener) + - [websocket.resume()](#websocketresume) - [websocket.send(data[, options][, callback])](#websocketsenddata-options-callback) - [websocket.terminate()](#websocketterminate) - [websocket.url](#websocketurl) @@ -409,6 +412,12 @@ following ways: Initiate a closing handshake. +### websocket.isPaused + +- {Boolean} + +Indicates whether the websocket is paused. + ### websocket.extensions - {Object} @@ -443,6 +452,12 @@ listener receives a `MessageEvent` named "message". An event listener to be called when the connection is established. The listener receives an `OpenEvent` named "open". +### websocket.pause() + +Pause the websocket causing it to stop emitting events. Some events can still be +emitted after this is called, until all buffered data is consumed. This method +is a noop if the ready state is `CONNECTING` or `CLOSED`. + ### websocket.ping([data[, mask]][, callback]) - `data` {Array|Number|Object|String|ArrayBuffer|Buffer|DataView|TypedArray} The @@ -473,6 +488,11 @@ Send a pong. This method throws an error if the ready state is `CONNECTING`. The subprotocol selected by the server. +### websocket.resume() + +Make a paused socket resume emitting events. This method is a noop if the ready +state is `CONNECTING` or `CLOSED`. + ### websocket.readyState - {Number} diff --git a/lib/stream.js b/lib/stream.js index 9622a5ac6..230734b79 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -47,23 +47,8 @@ function duplexOnError(err) { * @public */ function createWebSocketStream(ws, options) { - let resumeOnReceiverDrain = true; let terminateOnDestroy = true; - function receiverOnDrain() { - if (resumeOnReceiverDrain) ws._socket.resume(); - } - - if (ws.readyState === ws.CONNECTING) { - ws.once('open', function open() { - ws._receiver.removeAllListeners('drain'); - ws._receiver.on('drain', receiverOnDrain); - }); - } else { - ws._receiver.removeAllListeners('drain'); - ws._receiver.on('drain', receiverOnDrain); - } - const duplex = new Duplex({ ...options, autoDestroy: false, @@ -76,10 +61,7 @@ function createWebSocketStream(ws, options) { const data = !isBinary && duplex._readableState.objectMode ? msg.toString() : msg; - if (!duplex.push(data)) { - resumeOnReceiverDrain = false; - ws._socket.pause(); - } + if (!duplex.push(data)) ws.pause(); }); ws.once('error', function error(err) { @@ -155,13 +137,7 @@ function createWebSocketStream(ws, options) { }; duplex._read = function () { - if ( - (ws.readyState === ws.OPEN || ws.readyState === ws.CLOSING) && - !resumeOnReceiverDrain - ) { - resumeOnReceiverDrain = true; - if (!ws._receiver._writableState.needDrain) ws._socket.resume(); - } + if (ws.isPaused) ws.resume(); }; duplex._write = function (chunk, encoding, callback) { diff --git a/lib/websocket.js b/lib/websocket.js index 53aac9a57..130b3dc58 100644 --- a/lib/websocket.js +++ b/lib/websocket.js @@ -58,6 +58,7 @@ class WebSocket extends EventEmitter { this._closeMessage = EMPTY_BUFFER; this._closeTimer = null; this._extensions = {}; + this._paused = false; this._protocol = ''; this._readyState = WebSocket.CONNECTING; this._receiver = null; @@ -124,6 +125,13 @@ class WebSocket extends EventEmitter { return Object.keys(this._extensions).join(); } + /** + * @type {Boolean} + */ + get isPaused() { + return this._paused; + } + /** * @type {Function} */ @@ -312,6 +320,23 @@ class WebSocket extends EventEmitter { ); } + /** + * Pause the socket. + * + * @public + */ + pause() { + if ( + this.readyState === WebSocket.CONNECTING || + this.readyState === WebSocket.CLOSED + ) { + return; + } + + this._paused = true; + this._socket.pause(); + } + /** * Send a ping. * @@ -376,6 +401,23 @@ class WebSocket extends EventEmitter { this._sender.pong(data || EMPTY_BUFFER, mask, cb); } + /** + * Resume the socket. + * + * @public + */ + resume() { + if ( + this.readyState === WebSocket.CONNECTING || + this.readyState === WebSocket.CLOSED + ) { + return; + } + + this._paused = false; + if (!this._receiver._writableState.needDrain) this._socket.resume(); + } + /** * Send a data message. * @@ -518,6 +560,7 @@ Object.defineProperty(WebSocket.prototype, 'CLOSED', { 'binaryType', 'bufferedAmount', 'extensions', + 'isPaused', 'protocol', 'readyState', 'url' @@ -1001,7 +1044,9 @@ function receiverOnConclude(code, reason) { * @private */ function receiverOnDrain() { - this[kWebSocket]._socket.resume(); + const websocket = this[kWebSocket]; + + if (!websocket.isPaused) websocket._socket.resume(); } /** diff --git a/test/websocket.test.js b/test/websocket.test.js index 4703debef..0d48887de 100644 --- a/test/websocket.test.js +++ b/test/websocket.test.js @@ -359,6 +359,39 @@ describe('WebSocket', () => { }); }); + describe('`isPaused`', () => { + it('is enumerable and configurable', () => { + const descriptor = Object.getOwnPropertyDescriptor( + WebSocket.prototype, + 'isPaused' + ); + + assert.strictEqual(descriptor.configurable, true); + assert.strictEqual(descriptor.enumerable, true); + assert.ok(descriptor.get !== undefined); + assert.ok(descriptor.set === undefined); + }); + + it('indicates whether the websocket is paused', (done) => { + const wss = new WebSocket.Server({ port: 0 }, () => { + const ws = new WebSocket(`ws://localhost:${wss.address().port}`); + + ws.on('open', () => { + ws.pause(); + assert.ok(ws.isPaused); + + ws.resume(); + assert.ok(!ws.isPaused); + + ws.close(); + wss.close(done); + }); + + assert.ok(!ws.isPaused); + }); + }); + }); + describe('`protocol`', () => { it('is enumerable and configurable', () => { const descriptor = Object.getOwnPropertyDescriptor( @@ -1109,6 +1142,51 @@ describe('WebSocket', () => { }); }); + describe('#pause', () => { + it('does nothing if `readyState` is `CONNECTING` or `CLOSED`', (done) => { + const wss = new WebSocket.Server({ port: 0 }, () => { + const ws = new WebSocket(`ws://localhost:${wss.address().port}`); + + assert.strictEqual(ws.readyState, WebSocket.CONNECTING); + assert.ok(!ws.isPaused); + + ws.pause(); + assert.ok(!ws.isPaused); + + ws.on('open', () => { + ws.on('close', () => { + assert.strictEqual(ws.readyState, WebSocket.CLOSED); + + ws.pause(); + assert.ok(!ws.isPaused); + + wss.close(done); + }); + + ws.close(); + }); + }); + }); + + it('pauses the socket', (done) => { + const wss = new WebSocket.Server({ port: 0 }, () => { + const ws = new WebSocket(`ws://localhost:${wss.address().port}`); + }); + + wss.on('connection', (ws) => { + assert.ok(!ws.isPaused); + assert.ok(!ws._socket.isPaused()); + + ws.pause(); + assert.ok(ws.isPaused); + assert.ok(ws._socket.isPaused()); + + ws.terminate(); + wss.close(done); + }); + }); + }); + describe('#ping', () => { it('throws an error if `readyState` is `CONNECTING`', () => { const ws = new WebSocket('ws://localhost', { @@ -1447,6 +1525,58 @@ describe('WebSocket', () => { }); }); + describe('#resume', () => { + it('does nothing if `readyState` is `CONNECTING` or `CLOSED`', (done) => { + const wss = new WebSocket.Server({ port: 0 }, () => { + const ws = new WebSocket(`ws://localhost:${wss.address().port}`); + + assert.strictEqual(ws.readyState, WebSocket.CONNECTING); + assert.ok(!ws.isPaused); + + // Verify that no exception is thrown. + ws.resume(); + + ws.on('open', () => { + ws.pause(); + assert.ok(ws.isPaused); + + ws.on('close', () => { + assert.strictEqual(ws.readyState, WebSocket.CLOSED); + + ws.resume(); + assert.ok(ws.isPaused); + + wss.close(done); + }); + + ws.terminate(); + }); + }); + }); + + it('resumes the socket', (done) => { + const wss = new WebSocket.Server({ port: 0 }, () => { + const ws = new WebSocket(`ws://localhost:${wss.address().port}`); + }); + + wss.on('connection', (ws) => { + assert.ok(!ws.isPaused); + assert.ok(!ws._socket.isPaused()); + + ws.pause(); + assert.ok(ws.isPaused); + assert.ok(ws._socket.isPaused()); + + ws.resume(); + assert.ok(!ws.isPaused); + assert.ok(!ws._socket.isPaused()); + + ws.close(); + wss.close(done); + }); + }); + }); + describe('#send', () => { it('throws an error if `readyState` is `CONNECTING`', () => { const ws = new WebSocket('ws://localhost', {