Skip to content

Commit

Permalink
stream: support some and every
Browse files Browse the repository at this point in the history
This continues on the iterator-helpers work by adding `.some` and
`.every` to readable streams.

Co-Authored-By: Robert Nagy <ronagy@icloud.com>
PR-URL: nodejs#41573
Reviewed-By: Robert Nagy <ronagy@icloud.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
  • Loading branch information
2 people authored and danielleadams committed Apr 21, 2022
1 parent 8312972 commit 0be2321
Show file tree
Hide file tree
Showing 3 changed files with 235 additions and 1 deletion.
100 changes: 99 additions & 1 deletion doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -1863,7 +1863,7 @@ import { Resolver } from 'dns/promises';
await Readable.from([1, 2, 3, 4]).toArray(); // [1, 2, 3, 4]

// Make dns queries concurrently using .map and collect
// the results into an aray using toArray
// the results into an array using toArray
const dnsResults = await Readable.from([
'nodejs.org',
'openjsf.org',
Expand All @@ -1874,6 +1874,104 @@ const dnsResults = await Readable.from([
}, { concurrency: 2 }).toArray();
```

### `readable.some(fn[, options])`

<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental
* `fn` {Function|AsyncFunction} a function to call on each item of the stream.
* `data` {any} a chunk of data from the stream.
* `options` {Object}
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
abort the `fn` call early.
* `options` {Object}
* `concurrency` {number} the maximum concurrent invocation of `fn` to call
on the stream at once. **Default:** `1`.
* `signal` {AbortSignal} allows destroying the stream if the signal is
aborted.
* Returns: {Promise} a promise evaluating to `true` if `fn` returned a truthy
value for at least one of the chunks.

This method is similar to `Array.prototype.some` and calls `fn` on each chunk
in the stream until the awaited return value is `true` (or any truthy value).
Once an `fn` call on a chunk awaited return value is truthy, the stream is
destroyed and the promise is fulfilled with `true`. If none of the `fn`
calls on the chunks return a truthy value, the promise is fulfilled with
`false`.

```mjs
import { Readable } from 'stream';
import { stat } from 'fs/promises';

// With a synchronous predicate.
await Readable.from([1, 2, 3, 4]).some((x) => x > 2); // true
await Readable.from([1, 2, 3, 4]).some((x) => x < 0); // false

// With an asynchronous predicate, making at most 2 file checks at a time.
const anyBigFile = await Readable.from([
'file1',
'file2',
'file3',
]).some(async (fileName) => {
const stats = await stat(fileName);
return stat.size > 1024 * 1024;
}, { concurrency: 2 });
console.log(anyBigFile); // `true` if any file in the list is bigger than 1MB
console.log('done'); // Stream has finished
```

### `readable.every(fn[, options])`

<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental
* `fn` {Function|AsyncFunction} a function to call on each item of the stream.
* `data` {any} a chunk of data from the stream.
* `options` {Object}
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
abort the `fn` call early.
* `options` {Object}
* `concurrency` {number} the maximum concurrent invocation of `fn` to call
on the stream at once. **Default:** `1`.
* `signal` {AbortSignal} allows destroying the stream if the signal is
aborted.
* Returns: {Promise} a promise evaluating to `true` if `fn` returned a truthy
value for all of the chunks.

This method is similar to `Array.prototype.every` and calls `fn` on each chunk
in the stream to check if all awaited return values are truthy value for `fn`.
Once an `fn` call on a chunk awaited return value is falsy, the stream is
destroyed and the promise is fulfilled with `false`. If all of the `fn` calls
on the chunks return a truthy value, the promise is fulfilled with `true`.

```mjs
import { Readable } from 'stream';
import { stat } from 'fs/promises';

// With a synchronous predicate.
await Readable.from([1, 2, 3, 4]).every((x) => x > 2); // false
await Readable.from([1, 2, 3, 4]).every((x) => x > 0); // true

// With an asynchronous predicate, making at most 2 file checks at a time.
const allBigFiles = await Readable.from([
'file1',
'file2',
'file3',
]).every(async (fileName) => {
const stats = await stat(fileName);
return stat.size > 1024 * 1024;
}, { concurrency: 2 });
// `true` if all files in the list are bigger than 1MiB
console.log(allBigFiles);
console.log('done'); // Stream has finished
```

### Duplex and transform streams

#### Class: `stream.Duplex`
Expand Down
41 changes: 41 additions & 0 deletions lib/internal/streams/operators.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const {
AbortError,
} = require('internal/errors');
const { validateInteger } = require('internal/validators');
const { kWeakHandler } = require('internal/event_target');

const {
ArrayPrototypePush,
Expand Down Expand Up @@ -47,6 +48,10 @@ async function * map(fn, options) {
const signalOpt = { signal };

const abort = () => ac.abort();
if (options?.signal?.aborted) {
abort();
}

options?.signal?.addEventListener('abort', abort);

let next;
Expand Down Expand Up @@ -150,6 +155,40 @@ async function * map(fn, options) {
}
}

async function some(fn, options) {
// https://tc39.es/proposal-iterator-helpers/#sec-iteratorprototype.some
// Note that some does short circuit but also closes the iterator if it does
const ac = new AbortController();
if (options?.signal) {
if (options.signal.aborted) {
ac.abort();
}
options.signal.addEventListener('abort', () => ac.abort(), {
[kWeakHandler]: this,
once: true,
});
}
const mapped = this.map(fn, { ...options, signal: ac.signal });
for await (const result of mapped) {
if (result) {
ac.abort();
return true;
}
}
return false;
}

async function every(fn, options) {
if (typeof fn !== 'function') {
throw new ERR_INVALID_ARG_TYPE(
'fn', ['Function', 'AsyncFunction'], fn);
}
// https://en.wikipedia.org/wiki/De_Morgan%27s_laws
return !(await some.call(this, async (x) => {
return !(await fn(x));
}, options));
}

async function forEach(fn, options) {
if (typeof fn !== 'function') {
throw new ERR_INVALID_ARG_TYPE(
Expand Down Expand Up @@ -196,6 +235,8 @@ module.exports.streamReturningOperators = {
};

module.exports.promiseReturningOperators = {
every,
forEach,
toArray,
some,
};
95 changes: 95 additions & 0 deletions test/parallel/test-stream-some-every.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
'use strict';

const common = require('../common');
const {
Readable,
} = require('stream');
const assert = require('assert');

function oneTo5() {
return Readable.from([1, 2, 3, 4, 5]);
}

function oneTo5Async() {
return oneTo5().map(async (x) => {
await Promise.resolve();
return x;
});
}
{
// Some and every work with a synchronous stream and predicate
(async () => {
assert.strictEqual(await oneTo5().some((x) => x > 3), true);
assert.strictEqual(await oneTo5().every((x) => x > 3), false);
assert.strictEqual(await oneTo5().some((x) => x > 6), false);
assert.strictEqual(await oneTo5().every((x) => x < 6), true);
assert.strictEqual(await Readable.from([]).some((x) => true), false);
assert.strictEqual(await Readable.from([]).every((x) => true), true);
})().then(common.mustCall());
}

{
// Some and every work with an asynchronous stream and synchronous predicate
(async () => {
assert.strictEqual(await oneTo5Async().some((x) => x > 3), true);
assert.strictEqual(await oneTo5Async().every((x) => x > 3), false);
assert.strictEqual(await oneTo5Async().some((x) => x > 6), false);
assert.strictEqual(await oneTo5Async().every((x) => x < 6), true);
})().then(common.mustCall());
}

{
// Some and every work on asynchronous streams with an asynchronous predicate
(async () => {
assert.strictEqual(await oneTo5().some(async (x) => x > 3), true);
assert.strictEqual(await oneTo5().every(async (x) => x > 3), false);
assert.strictEqual(await oneTo5().some(async (x) => x > 6), false);
assert.strictEqual(await oneTo5().every(async (x) => x < 6), true);
})().then(common.mustCall());
}

{
// Some and every short circuit
(async () => {
await oneTo5().some(common.mustCall((x) => x > 2, 3));
await oneTo5().every(common.mustCall((x) => x < 3, 3));
// When short circuit isn't possible the whole stream is iterated
await oneTo5().some(common.mustCall((x) => x > 6, 5));
// The stream is destroyed afterwards
const stream = oneTo5();
await stream.some(common.mustCall((x) => x > 2, 3));
assert.strictEqual(stream.destroyed, true);
})().then(common.mustCall());
}

{
// Support for AbortSignal
const ac = new AbortController();
assert.rejects(Readable.from([1, 2, 3]).some(
() => new Promise(() => {}),
{ signal: ac.signal }
), {
name: 'AbortError',
}).then(common.mustCall());
ac.abort();
}
{
// Support for pre-aborted AbortSignal
assert.rejects(Readable.from([1, 2, 3]).some(
() => new Promise(() => {}),
{ signal: AbortSignal.abort() }
), {
name: 'AbortError',
}).then(common.mustCall());
}
{
// Error cases
assert.rejects(async () => {
await Readable.from([1]).every(1);
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
assert.rejects(async () => {
await Readable.from([1]).every((x) => x, {
concurrency: 'Foo'
});
}, /ERR_OUT_OF_RANGE/).then(common.mustCall());
}

0 comments on commit 0be2321

Please sign in to comment.