From 7f4e1a75afbcee162cff0d44000b4fda82008d05 Mon Sep 17 00:00:00 2001 From: Luigi Pinca Date: Tue, 19 Sep 2023 15:33:39 +0200 Subject: [PATCH] [fix] Add missing rejection handler Use `queueMicrotask()` when available and add a rejection handler to the shim for it. --- lib/receiver.js | 41 ++++++++++++++++++++++++++++++++++++----- test/receiver.test.js | 30 ++++++++++++++++++++++++++++++ 2 files changed, 66 insertions(+), 5 deletions(-) diff --git a/lib/receiver.js b/lib/receiver.js index b5e9a8bca..1d425ead0 100644 --- a/lib/receiver.js +++ b/lib/receiver.js @@ -15,6 +15,12 @@ const { isValidStatusCode, isValidUTF8 } = require('./validation'); const FastBuffer = Buffer[Symbol.species]; const promise = Promise.resolve(); +// +// `queueMicrotask()` is not available in Node.js < 11. +// +const queueTask = + typeof queueMicrotask === 'function' ? queueMicrotask : queueMicrotaskShim; + const GET_INFO = 0; const GET_PAYLOAD_LENGTH_16 = 1; const GET_PAYLOAD_LENGTH_64 = 2; @@ -169,11 +175,7 @@ class Receiver extends Writable { // this._loop = false; - // - // `queueMicrotask()` is not available in Node.js < 11 and is no - // better anyway. - // - promise.then(() => { + queueTask(() => { this._state = GET_INFO; this.startLoop(cb); }); @@ -646,3 +648,32 @@ function error(ErrorCtor, message, prefix, statusCode, errorCode) { err[kStatusCode] = statusCode; return err; } + +/** + * A shim for `queueMicrotask()`. + * + * @param {Function} cb Callback + */ +function queueMicrotaskShim(cb) { + promise.then(cb).catch(throwErrorNextTick); +} + +/** + * Throws an error. + * + * @param {Error} err The error to throw + * @private + */ +function throwError(err) { + throw err; +} + +/** + * Throws an error in the next tick. + * + * @param {Error} err The error to throw + * @private + */ +function throwErrorNextTick(err) { + process.nextTick(throwError, err); +} diff --git a/test/receiver.test.js b/test/receiver.test.js index a4e1bb5ad..40e0565ad 100644 --- a/test/receiver.test.js +++ b/test/receiver.test.js @@ -2,6 +2,7 @@ const assert = require('assert'); const crypto = require('crypto'); +const EventEmitter = require('events'); const PerMessageDeflate = require('../lib/permessage-deflate'); const Receiver = require('../lib/receiver'); @@ -1175,4 +1176,33 @@ describe('Receiver', () => { receiver.write(Buffer.from('8A01318A01328A0133', 'hex')); }); + + it('does not swallow errors thrown from event handlers', (done) => { + const receiver = new Receiver(); + let count = 0; + + receiver.on('message', function () { + if (++count === 2) { + throw new Error('Oops'); + } + }); + + assert.strictEqual( + process.listenerCount('uncaughtException'), + EventEmitter.usingDomains ? 2 : 1 + ); + + const listener = process.listeners('uncaughtException').pop(); + + process.removeListener('uncaughtException', listener); + process.once('uncaughtException', (err) => { + assert.ok(err instanceof Error); + assert.strictEqual(err.message, 'Oops'); + + process.on('uncaughtException', listener); + done(); + }); + + receiver.write(Buffer.from('82008200', 'hex')); + }); });