From 8272f2867f565400469e788b9b8325845bf88d7c Mon Sep 17 00:00:00 2001 From: Fedor Indutny <238531+indutny@users.noreply.github.com> Date: Tue, 6 Aug 2024 11:34:35 -0700 Subject: [PATCH] lib: fix unhandled errors in webstream adapters WebStream's Readable controller does not tolerate `.close()` being called after an `error`. However, when wrapping a Node's Readable stream it is possible that the sequence of events leads to `finished()`'s callback being invoked after such `error`. In order to handle this, in this change we call the `finished()` handler earlier when controller is canceled, and always handle this as an error case. Fix: https://github.com/nodejs/node/issues/54205 PR-URL: https://github.com/nodejs/node/pull/54206 Fixes: https://github.com/nodejs/node/issues/54205 Reviewed-By: Yagiz Nizipli Reviewed-By: Matteo Collina Reviewed-By: Mattias Buelens Reviewed-By: Benjamin Gruenbaum --- lib/internal/webstreams/adapters.js | 6 ++++++ .../test-stream-readable-from-web-termination.js | 15 +++++++++++++++ .../test-stream-readable-to-web-termination.js | 12 ++++++++++++ 3 files changed, 33 insertions(+) create mode 100644 test/parallel/test-stream-readable-from-web-termination.js create mode 100644 test/parallel/test-stream-readable-to-web-termination.js diff --git a/lib/internal/webstreams/adapters.js b/lib/internal/webstreams/adapters.js index 8993ced806d1ff..a57043921dc957 100644 --- a/lib/internal/webstreams/adapters.js +++ b/lib/internal/webstreams/adapters.js @@ -424,6 +424,7 @@ function newReadableStreamFromStreamReadable(streamReadable, options = kEmptyObj const strategy = evaluateStrategyOrFallback(options?.strategy); let controller; + let wasCanceled = false; function onData(chunk) { // Copy the Buffer to detach it from the pool. @@ -448,6 +449,10 @@ function newReadableStreamFromStreamReadable(streamReadable, options = kEmptyObj streamReadable.on('error', () => {}); if (error) return controller.error(error); + // Was already canceled + if (wasCanceled) { + return; + } controller.close(); }); @@ -459,6 +464,7 @@ function newReadableStreamFromStreamReadable(streamReadable, options = kEmptyObj pull() { streamReadable.resume(); }, cancel(reason) { + wasCanceled = true; destroy(streamReadable, reason); }, }, strategy); diff --git a/test/parallel/test-stream-readable-from-web-termination.js b/test/parallel/test-stream-readable-from-web-termination.js new file mode 100644 index 00000000000000..68ed7d69694089 --- /dev/null +++ b/test/parallel/test-stream-readable-from-web-termination.js @@ -0,0 +1,15 @@ +'use strict'; +require('../common'); +const { Readable } = require('stream'); + +{ + const r = Readable.from(['data']); + + const wrapper = Readable.fromWeb(Readable.toWeb(r)); + + wrapper.on('data', () => { + // Destroying wrapper while emitting data should not cause uncaught + // exceptions + wrapper.destroy(); + }); +} diff --git a/test/parallel/test-stream-readable-to-web-termination.js b/test/parallel/test-stream-readable-to-web-termination.js new file mode 100644 index 00000000000000..13fce9bc715e1e --- /dev/null +++ b/test/parallel/test-stream-readable-to-web-termination.js @@ -0,0 +1,12 @@ +'use strict'; +require('../common'); +const { Readable } = require('stream'); + +{ + const r = Readable.from([]); + // Cancelling reader while closing should not cause uncaught exceptions + r.on('close', () => reader.cancel()); + + const reader = Readable.toWeb(r).getReader(); + reader.read(); +}