From e605c868243633e3c18f9a8453fe983d8b26ebb8 Mon Sep 17 00:00:00 2001 From: Robert Nagy <ronagy@icloud.com> Date: Fri, 30 Jul 2021 14:18:38 +0200 Subject: [PATCH 1/3] Revert "stream: add readableDidRead" This reverts commit 83060510015c2ad05b156af54938d05e9b2df5ae. --- lib/_http_incoming.js | 1 - lib/_http_server.js | 2 +- lib/internal/streams/readable.js | 14 ----------- test/parallel/test-stream-readable-didRead.js | 24 ------------------- 4 files changed, 1 insertion(+), 40 deletions(-) delete mode 100644 test/parallel/test-stream-readable-didRead.js diff --git a/lib/_http_incoming.js b/lib/_http_incoming.js index bb76d6eaefc34d..31b7db6f6c7c99 100644 --- a/lib/_http_incoming.js +++ b/lib/_http_incoming.js @@ -87,7 +87,6 @@ function IncomingMessage(socket) { this.statusMessage = null; this.client = socket; - // TODO: Deprecate and remove. this._consuming = false; // Flag for when we decide that this message cannot possibly be // read by the user, so there's no point continuing to handle it. diff --git a/lib/_http_server.js b/lib/_http_server.js index 8d9321d28b9919..64cd44c066cf8a 100644 --- a/lib/_http_server.js +++ b/lib/_http_server.js @@ -802,7 +802,7 @@ function resOnFinish(req, res, socket, state, server) { // If the user never called req.read(), and didn't pipe() or // .resume() or .on('data'), then we call req._dump() so that the // bytes will be pulled off the wire. - if (!req.readableDidRead) + if (!req._consuming && !req._readableState.resumeScheduled) req._dump(); // Make sure the requestTimeout is cleared before finishing. diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index e7dd60a6d78c13..5b5d2e8471bb32 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -171,8 +171,6 @@ function ReadableState(options, stream, isDuplex) { // If true, a maybeReadMore has been scheduled. this.readingMore = false; - this.didRead = false; - this.decoder = null; this.encoding = null; if (options && options.encoding) { @@ -544,8 +542,6 @@ Readable.prototype.read = function(n) { if (ret !== null) this.emit('data', ret); - state.didRead = true; - return ret; }; @@ -849,9 +845,7 @@ function pipeOnDrain(src, dest) { if ((!state.awaitDrainWriters || state.awaitDrainWriters.size === 0) && EE.listenerCount(src, 'data')) { - // TODO(ronag): Call resume() instead? state.flowing = true; - state.didRead = true; flow(src); } }; @@ -999,7 +993,6 @@ Readable.prototype.resume = function() { function resume(stream, state) { if (!state.resumeScheduled) { state.resumeScheduled = true; - state.didRead = true; process.nextTick(resume_, stream, state); } } @@ -1184,13 +1177,6 @@ ObjectDefineProperties(Readable.prototype, { } }, - readableDidRead: { - enumerable: false, - get: function() { - return this._readableState.didRead; - } - }, - readableHighWaterMark: { enumerable: false, get: function() { diff --git a/test/parallel/test-stream-readable-didRead.js b/test/parallel/test-stream-readable-didRead.js deleted file mode 100644 index 18e2da97e88e94..00000000000000 --- a/test/parallel/test-stream-readable-didRead.js +++ /dev/null @@ -1,24 +0,0 @@ -'use strict'; -require('../common'); -const assert = require('assert'); -const Readable = require('stream').Readable; - -{ - const readable = new Readable({ - read: () => {} - }); - - assert.strictEqual(readable.readableDidRead, false); - readable.read(); - assert.strictEqual(readable.readableDidRead, true); -} - -{ - const readable = new Readable({ - read: () => {} - }); - - assert.strictEqual(readable.readableDidRead, false); - readable.resume(); - assert.strictEqual(readable.readableDidRead, true); -} From b8f5fb94219e69baba3962ba9f1d7c29d1046026 Mon Sep 17 00:00:00 2001 From: Robert Nagy <ronagy@icloud.com> Date: Fri, 30 Jul 2021 14:17:25 +0200 Subject: [PATCH 2/3] stream: add readableDidRead Adds did read accessor used to determine whether a readable has been read from. Refs: https://github.com/nodejs/undici/pull/907 --- doc/api/stream.md | 11 ++ lib/internal/streams/readable.js | 19 ++- test/parallel/test-stream-readable-didRead.js | 109 ++++++++++++++++++ 3 files changed, 138 insertions(+), 1 deletion(-) create mode 100644 test/parallel/test-stream-readable-didRead.js diff --git a/doc/api/stream.md b/doc/api/stream.md index d616bb78bccfd0..dd46e8fa761e62 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1222,6 +1222,17 @@ added: v11.4.0 Is `true` if it is safe to call [`readable.read()`][stream-read], which means the stream has not been destroyed or emitted `'error'` or `'end'`. +##### `readable.readableDidRead` +<!-- YAML +added: REPLACEME +--> + +* {boolean} + +Allows determining if the stream has been or is about to be read. +Returns true if `'data'`, `'end'`, `'error'` or `'close'` has been +emitted. + ##### `readable.readableEncoding` <!-- YAML added: v12.7.0 diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index 5b5d2e8471bb32..a6a1913961bb41 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -171,6 +171,8 @@ function ReadableState(options, stream, isDuplex) { // If true, a maybeReadMore has been scheduled. this.readingMore = false; + this.dataEmitted = false; + this.decoder = null; this.encoding = null; if (options && options.encoding) { @@ -314,6 +316,7 @@ function addChunk(stream, state, chunk, addToFront) { } else { state.awaitDrainWriters = null; } + state.dataEmitted = true; stream.emit('data', chunk); } else { // Update the buffer info. @@ -539,8 +542,10 @@ Readable.prototype.read = function(n) { endReadable(this); } - if (ret !== null) + if (ret !== null) { + state.dataEmitted = true; this.emit('data', ret); + } return ret; }; @@ -1177,6 +1182,18 @@ ObjectDefineProperties(Readable.prototype, { } }, + readableDidRead: { + enumerable: false, + get: function() { + return ( + this._readableState.dataEmitted || + this._readableState.endEmitted || + this._readableState.errorEmitted || + this._readableState.closeEmitted + ); + } + }, + readableHighWaterMark: { enumerable: false, get: function() { diff --git a/test/parallel/test-stream-readable-didRead.js b/test/parallel/test-stream-readable-didRead.js new file mode 100644 index 00000000000000..6b018bc3348fde --- /dev/null +++ b/test/parallel/test-stream-readable-didRead.js @@ -0,0 +1,109 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); +const Readable = require('stream').Readable; + +function noop() {} + +function check(readable, data, fn) { + assert.strictEqual(readable.readableDidRead, false); + assert.strictEqual(readable.readableUsed, false); + if (data === -1) { + readable.on('error', common.mustCall()); + readable.on('data', common.mustNotCall()); + assert.strictEqual(readable.readableUsed, true); + readable.on('end', common.mustNotCall()); + } else { + readable.on('error', common.mustNotCall()); + if (data === -2) { + readable.on('end', common.mustNotCall()); + } else { + readable.on('end', common.mustCall()); + } + if (data > 0) { + readable.on('data', common.mustCallAtLeast(data)); + assert.strictEqual(readable.readableUsed, true); + } else { + readable.on('data', common.mustNotCall()); + assert.strictEqual(readable.readableUsed, true); + } + } + readable.on('close', common.mustCall()); + fn(); + setImmediate(() => { + assert.strictEqual(readable.readableDidRead, true); + assert.strictEqual(readable.readableUsed, true); + }); +} + +{ + const readable = new Readable({ + read() { + this.push(null); + } + }); + check(readable, 0, () => { + readable.read(); + }); +} + +{ + const readable = new Readable({ + read() { + this.push(null); + } + }); + check(readable, 0, () => { + readable.resume(); + }); +} + +{ + const readable = new Readable({ + read() { + this.push(null); + } + }); + check(readable, -2, () => { + readable.destroy(); + }); +} + +{ + const readable = new Readable({ + read() { + this.push(null); + } + }); + + check(readable, -1, () => { + readable.destroy(new Error()); + }); +} + +{ + const readable = new Readable({ + read() { + this.push('data'); + this.push(null); + } + }); + + check(readable, 1, () => { + readable.on('data', noop); + }); +} + +{ + const readable = new Readable({ + read() { + this.push('data'); + this.push(null); + } + }); + + check(readable, 1, () => { + readable.on('data', noop); + readable.off('data', noop); + }); +} From 5b7d80d68b86296d1e6d0030a4a5d537979d8411 Mon Sep 17 00:00:00 2001 From: Robert Nagy <ronagy@icloud.com> Date: Fri, 30 Jul 2021 18:21:12 +0200 Subject: [PATCH 3/3] fixup: rm readableUsed --- test/parallel/test-stream-readable-didRead.js | 5 ----- 1 file changed, 5 deletions(-) diff --git a/test/parallel/test-stream-readable-didRead.js b/test/parallel/test-stream-readable-didRead.js index 6b018bc3348fde..7508dd76899013 100644 --- a/test/parallel/test-stream-readable-didRead.js +++ b/test/parallel/test-stream-readable-didRead.js @@ -7,11 +7,9 @@ function noop() {} function check(readable, data, fn) { assert.strictEqual(readable.readableDidRead, false); - assert.strictEqual(readable.readableUsed, false); if (data === -1) { readable.on('error', common.mustCall()); readable.on('data', common.mustNotCall()); - assert.strictEqual(readable.readableUsed, true); readable.on('end', common.mustNotCall()); } else { readable.on('error', common.mustNotCall()); @@ -22,17 +20,14 @@ function check(readable, data, fn) { } if (data > 0) { readable.on('data', common.mustCallAtLeast(data)); - assert.strictEqual(readable.readableUsed, true); } else { readable.on('data', common.mustNotCall()); - assert.strictEqual(readable.readableUsed, true); } } readable.on('close', common.mustCall()); fn(); setImmediate(() => { assert.strictEqual(readable.readableDidRead, true); - assert.strictEqual(readable.readableUsed, true); }); }