diff --git a/lib/permessage-deflate.js b/lib/permessage-deflate.js index b9c5ef499..7bb7c98d7 100644 --- a/lib/permessage-deflate.js +++ b/lib/permessage-deflate.js @@ -131,12 +131,18 @@ class PerMessageDeflate { } if (this._deflate) { - if (this._deflate[kCallback]) { - this._deflate[kCallback](); - } + const callback = this._deflate[kCallback]; this._deflate.close(); this._deflate = null; + + if (callback) { + callback( + new Error( + 'The deflate stream was closed while data was being processed' + ) + ); + } } } @@ -314,9 +320,7 @@ class PerMessageDeflate { zlibLimiter.add((done) => { this._compress(data, fin, (err, result) => { done(); - if (err || result) { - callback(err, result); - } + callback(err, result); }); }); } diff --git a/lib/sender.js b/lib/sender.js index a9bcef201..f23042453 100644 --- a/lib/sender.js +++ b/lib/sender.js @@ -306,6 +306,22 @@ class Sender { this._deflating = true; perMessageDeflate.compress(data, options.fin, (_, buf) => { + if (this._socket.destroyed) { + const err = new Error( + 'The socket was closed while data was being compressed' + ); + + if (typeof cb === 'function') cb(err); + + for (let i = 0; i < this._queue.length; i++) { + const callback = this._queue[i][4]; + + if (typeof callback === 'function') callback(err); + } + + return; + } + this._deflating = false; options.readOnly = false; this.sendFrame(Sender.frame(buf, options), cb); diff --git a/lib/websocket.js b/lib/websocket.js index c157cfd2c..ca72ad76d 100644 --- a/lib/websocket.js +++ b/lib/websocket.js @@ -179,9 +179,8 @@ class WebSocket extends EventEmitter { * @private */ emitClose() { - this.readyState = WebSocket.CLOSED; - if (!this._socket) { + this.readyState = WebSocket.CLOSED; this.emit('close', this._closeCode, this._closeMessage); return; } @@ -191,6 +190,7 @@ class WebSocket extends EventEmitter { } this._receiver.removeAllListeners(); + this.readyState = WebSocket.CLOSED; this.emit('close', this._closeCode, this._closeMessage); } diff --git a/test/permessage-deflate.test.js b/test/permessage-deflate.test.js index 11f8555af..09681d96f 100644 --- a/test/permessage-deflate.test.js +++ b/test/permessage-deflate.test.js @@ -615,15 +615,19 @@ describe('PerMessageDeflate', () => { }); }); - it("doesn't call the callback if the deflate stream is closed prematurely", (done) => { + it('calls the callback if the deflate stream is closed prematurely', (done) => { const perMessageDeflate = new PerMessageDeflate({ threshold: 0 }); const buf = Buffer.from('A'.repeat(50)); perMessageDeflate.accept([{}]); - perMessageDeflate.compress(buf, true, () => { - done(new Error('Unexpected callback invocation')); + perMessageDeflate.compress(buf, true, (err) => { + assert.ok(err instanceof Error); + assert.strictEqual( + err.message, + 'The deflate stream was closed while data was being processed' + ); + done(); }); - perMessageDeflate._deflate.on('close', done); process.nextTick(() => perMessageDeflate.cleanup()); }); diff --git a/test/websocket.test.js b/test/websocket.test.js index 229b78715..b9275035d 100644 --- a/test/websocket.test.js +++ b/test/websocket.test.js @@ -2341,6 +2341,52 @@ describe('WebSocket', () => { ws.on('message', (message) => ws.send(message, { compress: true })); }); }); + + it('calls the callback if the socket is closed prematurely', (done) => { + const wss = new WebSocket.Server( + { perMessageDeflate: true, port: 0 }, + () => { + const called = []; + const ws = new WebSocket(`ws://localhost:${wss.address().port}`, { + perMessageDeflate: { threshold: 0 } + }); + + ws.on('open', () => { + ws.send('foo'); + ws.send('bar', (err) => { + called.push(1); + + assert.strictEqual(ws.readyState, WebSocket.CLOSING); + assert.ok(err instanceof Error); + assert.strictEqual( + err.message, + 'The socket was closed while data was being compressed' + ); + }); + ws.send('baz'); + ws.send('qux', (err) => { + called.push(2); + + assert.strictEqual(ws.readyState, WebSocket.CLOSING); + assert.ok(err instanceof Error); + assert.strictEqual( + err.message, + 'The socket was closed while data was being compressed' + ); + }); + }); + + ws.on('close', () => { + assert.deepStrictEqual(called, [1, 2]); + wss.close(done); + }); + } + ); + + wss.on('connection', (ws) => { + ws._socket.end(); + }); + }); }); describe('#terminate', () => { @@ -2356,19 +2402,22 @@ describe('WebSocket', () => { }); ws.on('open', () => { - ws.send('hi', () => - done(new Error('Unexpected callback invocation')) - ); + ws.send('hi', (err) => { + assert.strictEqual(ws.readyState, WebSocket.CLOSING); + assert.ok(err instanceof Error); + assert.strictEqual( + err.message, + 'The socket was closed while data was being compressed' + ); + + ws.on('close', () => { + wss.close(done); + }); + }); ws.terminate(); }); } ); - - wss.on('connection', (ws) => { - ws.on('close', () => { - wss.close(done); - }); - }); }); it('can be used while data is being decompressed', (done) => {