From 869c9892cd5f1f574fae3181231e462e1d4ab740 Mon Sep 17 00:00:00 2001 From: Luigi Pinca Date: Sat, 28 Aug 2021 16:42:49 +0200 Subject: [PATCH] [fix] Resume the socket in the next tick Ensure that `socket.resume()` is called after `socket.pause()`. Fixes #1940 --- lib/websocket.js | 22 +++++++++++++++-- test/websocket.test.js | 56 +++++++++++++++++++++++++++++++++++++++++- 2 files changed, 75 insertions(+), 3 deletions(-) diff --git a/lib/websocket.js b/lib/websocket.js index f0b6eb531..aac87296f 100644 --- a/lib/websocket.js +++ b/lib/websocket.js @@ -1,3 +1,5 @@ +/* eslint no-unused-vars: ["error", { "varsIgnorePattern": "^Readable$" }] */ + 'use strict'; const EventEmitter = require('events'); @@ -6,6 +8,7 @@ const http = require('http'); const net = require('net'); const tls = require('tls'); const { randomBytes, createHash } = require('crypto'); +const { Readable } = require('stream'); const { URL } = require('url'); const PerMessageDeflate = require('./permessage-deflate'); @@ -954,7 +957,7 @@ function receiverOnConclude(code, reason) { const websocket = this[kWebSocket]; websocket._socket.removeListener('data', socketOnData); - websocket._socket.resume(); + process.nextTick(resume, websocket._socket); websocket._closeFrameReceived = true; websocket._closeMessage = reason; @@ -983,7 +986,12 @@ function receiverOnError(err) { const websocket = this[kWebSocket]; websocket._socket.removeListener('data', socketOnData); - websocket._socket.resume(); + + // + // On Node.js < 14.0.0 the `'error'` event is emitted synchronously. See + // https://github.com/websockets/ws/issues/1940. + // + process.nextTick(resume, websocket._socket); websocket.close(err[kStatusCode]); websocket.emit('error', err); @@ -1032,6 +1040,16 @@ function receiverOnPong(data) { this[kWebSocket].emit('pong', data); } +/** + * Resume a readable stream + * + * @param {Readable} stream The readable stream + * @private + */ +function resume(stream) { + stream.resume(); +} + /** * The listener of the `net.Socket` `'close'` event. * diff --git a/test/websocket.test.js b/test/websocket.test.js index 888a57715..4b85c7948 100644 --- a/test/websocket.test.js +++ b/test/websocket.test.js @@ -3308,7 +3308,7 @@ describe('WebSocket', () => { }); }); - describe('Connection close edge cases', () => { + describe('Connection close', () => { it('closes cleanly after simultaneous errors (1/2)', (done) => { let clientCloseEventEmitted = false; let serverClientCloseEventEmitted = false; @@ -3420,5 +3420,59 @@ describe('WebSocket', () => { }); }); }); + + it('resumes the socket when an error occurs', (done) => { + const maxPayload = 16 * 1024; + const wss = new WebSocket.Server({ maxPayload, port: 0 }, () => { + const ws = new WebSocket(`ws://localhost:${wss.address().port}`); + }); + + wss.on('connection', (ws) => { + const list = [ + ...Sender.frame(Buffer.alloc(maxPayload + 1), { + fin: true, + opcode: 0x02, + mask: true, + readOnly: false + }) + ]; + + ws.on('error', (err) => { + assert.ok(err instanceof RangeError); + assert.strictEqual(err.code, 'WS_ERR_UNSUPPORTED_MESSAGE_LENGTH'); + assert.strictEqual(err.message, 'Max payload size exceeded'); + + ws.on('close', (code, reason) => { + assert.strictEqual(code, 1006); + assert.strictEqual(reason, EMPTY_BUFFER); + wss.close(done); + }); + }); + + ws._socket.push(Buffer.concat(list)); + }); + }); + + it('resumes the socket when the close frame is received', (done) => { + const wss = new WebSocket.Server({ port: 0 }, () => { + const ws = new WebSocket(`ws://localhost:${wss.address().port}`); + }); + + wss.on('connection', (ws) => { + const opts = { fin: true, mask: true, readOnly: false }; + const list = [ + ...Sender.frame(Buffer.alloc(16 * 1024), { opcode: 0x02, ...opts }), + ...Sender.frame(EMPTY_BUFFER, { opcode: 0x08, ...opts }) + ]; + + ws.on('close', (code, reason) => { + assert.strictEqual(code, 1005); + assert.strictEqual(reason, EMPTY_BUFFER); + wss.close(done); + }); + + ws._socket.push(Buffer.concat(list)); + }); + }); }); });