From 0be2321d53b1b0c55e327972998df3c97eda03ac Mon Sep 17 00:00:00 2001 From: Benjamin Gruenbaum Date: Mon, 17 Jan 2022 20:21:51 +0200 Subject: [PATCH] stream: support some and every This continues on the iterator-helpers work by adding `.some` and `.every` to readable streams. Co-Authored-By: Robert Nagy PR-URL: https://github.com/nodejs/node/pull/41573 Reviewed-By: Robert Nagy Reviewed-By: Matteo Collina --- doc/api/stream.md | 100 +++++++++++++++++++++++- lib/internal/streams/operators.js | 41 ++++++++++ test/parallel/test-stream-some-every.js | 95 ++++++++++++++++++++++ 3 files changed, 235 insertions(+), 1 deletion(-) create mode 100644 test/parallel/test-stream-some-every.js diff --git a/doc/api/stream.md b/doc/api/stream.md index b94ffbd3567e15..fccd138f15c57d 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1863,7 +1863,7 @@ import { Resolver } from 'dns/promises'; await Readable.from([1, 2, 3, 4]).toArray(); // [1, 2, 3, 4] // Make dns queries concurrently using .map and collect -// the results into an aray using toArray +// the results into an array using toArray const dnsResults = await Readable.from([ 'nodejs.org', 'openjsf.org', @@ -1874,6 +1874,104 @@ const dnsResults = await Readable.from([ }, { concurrency: 2 }).toArray(); ``` +### `readable.some(fn[, options])` + + + +> Stability: 1 - Experimental + +* `fn` {Function|AsyncFunction} a function to call on each item of the stream. + * `data` {any} a chunk of data from the stream. + * `options` {Object} + * `signal` {AbortSignal} aborted if the stream is destroyed allowing to + abort the `fn` call early. +* `options` {Object} + * `concurrency` {number} the maximum concurrent invocation of `fn` to call + on the stream at once. **Default:** `1`. + * `signal` {AbortSignal} allows destroying the stream if the signal is + aborted. +* Returns: {Promise} a promise evaluating to `true` if `fn` returned a truthy + value for at least one of the chunks. + +This method is similar to `Array.prototype.some` and calls `fn` on each chunk +in the stream until the awaited return value is `true` (or any truthy value). +Once an `fn` call on a chunk awaited return value is truthy, the stream is +destroyed and the promise is fulfilled with `true`. If none of the `fn` +calls on the chunks return a truthy value, the promise is fulfilled with +`false`. + +```mjs +import { Readable } from 'stream'; +import { stat } from 'fs/promises'; + +// With a synchronous predicate. +await Readable.from([1, 2, 3, 4]).some((x) => x > 2); // true +await Readable.from([1, 2, 3, 4]).some((x) => x < 0); // false + +// With an asynchronous predicate, making at most 2 file checks at a time. +const anyBigFile = await Readable.from([ + 'file1', + 'file2', + 'file3', +]).some(async (fileName) => { + const stats = await stat(fileName); + return stat.size > 1024 * 1024; +}, { concurrency: 2 }); +console.log(anyBigFile); // `true` if any file in the list is bigger than 1MB +console.log('done'); // Stream has finished +``` + +### `readable.every(fn[, options])` + + + +> Stability: 1 - Experimental + +* `fn` {Function|AsyncFunction} a function to call on each item of the stream. + * `data` {any} a chunk of data from the stream. + * `options` {Object} + * `signal` {AbortSignal} aborted if the stream is destroyed allowing to + abort the `fn` call early. +* `options` {Object} + * `concurrency` {number} the maximum concurrent invocation of `fn` to call + on the stream at once. **Default:** `1`. + * `signal` {AbortSignal} allows destroying the stream if the signal is + aborted. +* Returns: {Promise} a promise evaluating to `true` if `fn` returned a truthy + value for all of the chunks. + +This method is similar to `Array.prototype.every` and calls `fn` on each chunk +in the stream to check if all awaited return values are truthy value for `fn`. +Once an `fn` call on a chunk awaited return value is falsy, the stream is +destroyed and the promise is fulfilled with `false`. If all of the `fn` calls +on the chunks return a truthy value, the promise is fulfilled with `true`. + +```mjs +import { Readable } from 'stream'; +import { stat } from 'fs/promises'; + +// With a synchronous predicate. +await Readable.from([1, 2, 3, 4]).every((x) => x > 2); // false +await Readable.from([1, 2, 3, 4]).every((x) => x > 0); // true + +// With an asynchronous predicate, making at most 2 file checks at a time. +const allBigFiles = await Readable.from([ + 'file1', + 'file2', + 'file3', +]).every(async (fileName) => { + const stats = await stat(fileName); + return stat.size > 1024 * 1024; +}, { concurrency: 2 }); +// `true` if all files in the list are bigger than 1MiB +console.log(allBigFiles); +console.log('done'); // Stream has finished +``` + ### Duplex and transform streams #### Class: `stream.Duplex` diff --git a/lib/internal/streams/operators.js b/lib/internal/streams/operators.js index 2649966fd403ac..9c50865f3da3be 100644 --- a/lib/internal/streams/operators.js +++ b/lib/internal/streams/operators.js @@ -10,6 +10,7 @@ const { AbortError, } = require('internal/errors'); const { validateInteger } = require('internal/validators'); +const { kWeakHandler } = require('internal/event_target'); const { ArrayPrototypePush, @@ -47,6 +48,10 @@ async function * map(fn, options) { const signalOpt = { signal }; const abort = () => ac.abort(); + if (options?.signal?.aborted) { + abort(); + } + options?.signal?.addEventListener('abort', abort); let next; @@ -150,6 +155,40 @@ async function * map(fn, options) { } } +async function some(fn, options) { + // https://tc39.es/proposal-iterator-helpers/#sec-iteratorprototype.some + // Note that some does short circuit but also closes the iterator if it does + const ac = new AbortController(); + if (options?.signal) { + if (options.signal.aborted) { + ac.abort(); + } + options.signal.addEventListener('abort', () => ac.abort(), { + [kWeakHandler]: this, + once: true, + }); + } + const mapped = this.map(fn, { ...options, signal: ac.signal }); + for await (const result of mapped) { + if (result) { + ac.abort(); + return true; + } + } + return false; +} + +async function every(fn, options) { + if (typeof fn !== 'function') { + throw new ERR_INVALID_ARG_TYPE( + 'fn', ['Function', 'AsyncFunction'], fn); + } + // https://en.wikipedia.org/wiki/De_Morgan%27s_laws + return !(await some.call(this, async (x) => { + return !(await fn(x)); + }, options)); +} + async function forEach(fn, options) { if (typeof fn !== 'function') { throw new ERR_INVALID_ARG_TYPE( @@ -196,6 +235,8 @@ module.exports.streamReturningOperators = { }; module.exports.promiseReturningOperators = { + every, forEach, toArray, + some, }; diff --git a/test/parallel/test-stream-some-every.js b/test/parallel/test-stream-some-every.js new file mode 100644 index 00000000000000..c2be5ea955bcd2 --- /dev/null +++ b/test/parallel/test-stream-some-every.js @@ -0,0 +1,95 @@ +'use strict'; + +const common = require('../common'); +const { + Readable, +} = require('stream'); +const assert = require('assert'); + +function oneTo5() { + return Readable.from([1, 2, 3, 4, 5]); +} + +function oneTo5Async() { + return oneTo5().map(async (x) => { + await Promise.resolve(); + return x; + }); +} +{ + // Some and every work with a synchronous stream and predicate + (async () => { + assert.strictEqual(await oneTo5().some((x) => x > 3), true); + assert.strictEqual(await oneTo5().every((x) => x > 3), false); + assert.strictEqual(await oneTo5().some((x) => x > 6), false); + assert.strictEqual(await oneTo5().every((x) => x < 6), true); + assert.strictEqual(await Readable.from([]).some((x) => true), false); + assert.strictEqual(await Readable.from([]).every((x) => true), true); + })().then(common.mustCall()); +} + +{ + // Some and every work with an asynchronous stream and synchronous predicate + (async () => { + assert.strictEqual(await oneTo5Async().some((x) => x > 3), true); + assert.strictEqual(await oneTo5Async().every((x) => x > 3), false); + assert.strictEqual(await oneTo5Async().some((x) => x > 6), false); + assert.strictEqual(await oneTo5Async().every((x) => x < 6), true); + })().then(common.mustCall()); +} + +{ + // Some and every work on asynchronous streams with an asynchronous predicate + (async () => { + assert.strictEqual(await oneTo5().some(async (x) => x > 3), true); + assert.strictEqual(await oneTo5().every(async (x) => x > 3), false); + assert.strictEqual(await oneTo5().some(async (x) => x > 6), false); + assert.strictEqual(await oneTo5().every(async (x) => x < 6), true); + })().then(common.mustCall()); +} + +{ + // Some and every short circuit + (async () => { + await oneTo5().some(common.mustCall((x) => x > 2, 3)); + await oneTo5().every(common.mustCall((x) => x < 3, 3)); + // When short circuit isn't possible the whole stream is iterated + await oneTo5().some(common.mustCall((x) => x > 6, 5)); + // The stream is destroyed afterwards + const stream = oneTo5(); + await stream.some(common.mustCall((x) => x > 2, 3)); + assert.strictEqual(stream.destroyed, true); + })().then(common.mustCall()); +} + +{ + // Support for AbortSignal + const ac = new AbortController(); + assert.rejects(Readable.from([1, 2, 3]).some( + () => new Promise(() => {}), + { signal: ac.signal } + ), { + name: 'AbortError', + }).then(common.mustCall()); + ac.abort(); +} +{ + // Support for pre-aborted AbortSignal + assert.rejects(Readable.from([1, 2, 3]).some( + () => new Promise(() => {}), + { signal: AbortSignal.abort() } + ), { + name: 'AbortError', + }).then(common.mustCall()); +} +{ + // Error cases + assert.rejects(async () => { + await Readable.from([1]).every(1); + }, /ERR_INVALID_ARG_TYPE/).then(common.mustCall()); + assert.rejects(async () => { + await Readable.from([1]).every((x) => x, { + concurrency: 'Foo' + }); + }, /ERR_OUT_OF_RANGE/).then(common.mustCall()); +}