diff --git a/lib/websocket.js b/lib/websocket.js index b37d24fa1..6fc579747 100644 --- a/lib/websocket.js +++ b/lib/websocket.js @@ -1,3 +1,5 @@ +/* eslint no-unused-vars: ["error", { "varsIgnorePattern": "^Readable$" }] */ + 'use strict'; const EventEmitter = require('events'); @@ -6,6 +8,7 @@ const http = require('http'); const net = require('net'); const tls = require('tls'); const { randomBytes, createHash } = require('crypto'); +const { Readable } = require('stream'); const { URL } = require('url'); const PerMessageDeflate = require('./permessage-deflate'); @@ -916,7 +919,7 @@ function receiverOnConclude(code, reason) { const websocket = this[kWebSocket]; websocket._socket.removeListener('data', socketOnData); - websocket._socket.resume(); + process.nextTick(resume, websocket._socket); websocket._closeFrameReceived = true; websocket._closeMessage = reason; @@ -945,7 +948,12 @@ function receiverOnError(err) { const websocket = this[kWebSocket]; websocket._socket.removeListener('data', socketOnData); - websocket._socket.resume(); + + // + // On Node.js < 14.0.0 the `'error'` event is emitted synchronously. See + // https://github.com/websockets/ws/issues/1940. + // + process.nextTick(resume, websocket._socket); websocket.close(err[kStatusCode]); websocket.emit('error', err); @@ -993,6 +1001,16 @@ function receiverOnPong(data) { this[kWebSocket].emit('pong', data); } +/** + * Resume a readable stream + * + * @param {Readable} stream The readable stream + * @private + */ +function resume(stream) { + stream.resume(); +} + /** * The listener of the `net.Socket` `'close'` event. * diff --git a/test/websocket.test.js b/test/websocket.test.js index 994debe19..6c24feded 100644 --- a/test/websocket.test.js +++ b/test/websocket.test.js @@ -12,7 +12,7 @@ const { URL } = require('url'); const Sender = require('../lib/sender'); const WebSocket = require('..'); -const { GUID, NOOP } = require('../lib/constants'); +const { EMPTY_BUFFER, GUID, NOOP } = require('../lib/constants'); class CustomAgent extends http.Agent { addRequest() {} @@ -3053,7 +3053,7 @@ describe('WebSocket', () => { }); }); - describe('Connection close edge cases', () => { + describe('Connection close', () => { it('closes cleanly after simultaneous errors (1/2)', (done) => { let clientCloseEventEmitted = false; let serverClientCloseEventEmitted = false; @@ -3165,5 +3165,59 @@ describe('WebSocket', () => { }); }); }); + + it('resumes the socket when an error occurs', (done) => { + const maxPayload = 16 * 1024; + const wss = new WebSocket.Server({ maxPayload, port: 0 }, () => { + const ws = new WebSocket(`ws://localhost:${wss.address().port}`); + }); + + wss.on('connection', (ws) => { + const list = [ + ...Sender.frame(Buffer.alloc(maxPayload + 1), { + fin: true, + opcode: 0x02, + mask: true, + readOnly: false + }) + ]; + + ws.on('error', (err) => { + assert.ok(err instanceof RangeError); + assert.strictEqual(err.code, 'WS_ERR_UNSUPPORTED_MESSAGE_LENGTH'); + assert.strictEqual(err.message, 'Max payload size exceeded'); + + ws.on('close', (code, reason) => { + assert.strictEqual(code, 1006); + assert.strictEqual(reason, ''); + wss.close(done); + }); + }); + + ws._socket.push(Buffer.concat(list)); + }); + }); + + it('resumes the socket when the close frame is received', (done) => { + const wss = new WebSocket.Server({ port: 0 }, () => { + const ws = new WebSocket(`ws://localhost:${wss.address().port}`); + }); + + wss.on('connection', (ws) => { + const opts = { fin: true, mask: true, readOnly: false }; + const list = [ + ...Sender.frame(Buffer.alloc(16 * 1024), { opcode: 0x02, ...opts }), + ...Sender.frame(EMPTY_BUFFER, { opcode: 0x08, ...opts }) + ]; + + ws.on('close', (code, reason) => { + assert.strictEqual(code, 1005); + assert.strictEqual(reason, ''); + wss.close(done); + }); + + ws._socket.push(Buffer.concat(list)); + }); + }); }); });