From 56a3bca958466b43e98e3c5a19c61a985e3f48d6 Mon Sep 17 00:00:00 2001 From: harold Date: Wed, 25 Aug 2021 16:19:26 -0400 Subject: [PATCH 1/6] Implement #20 - Async Iterable - Expand capabilities to handle both async and sync iterables - There is a problem in the original functionality around the `stop-on-error` test that needs to be resolved - The test has a bug in that neither p-map nor the test mapper function, which is provided for that specific test and is not the common version, await the calling of the delay() function for item 2 - This means that item 2 will only ever return a Promise, never a value - When this is fixed to await the function call, it causes the returned results to change from `[1, 3]` to `[1, 3, 2]` - Fixing this test makes it clear that the `concurrency setup` loop does not wait for mappers, so the only way to make this test fail would be to have enough items that eventually the initial iteration could not reach the last items in the test array before the 100 ms delay on the exception expires - With `concurrency = 1` this test behaves as expected since there are not unlimited unawaited promises created - Let me know your thoughts on how to resolve this as it becomes more apparent with the behavior changes needed for asyncIterable (where the concurrency setup iteration of asyncIterable has to be awaited to prevent inifinte runners from being created) - The test, in the state it's in in this PR, does not actually demonstrate that stop on error works as intended --- index.d.ts | 2 +- index.js | 56 +++++++++++++++-------- test.js | 132 +++++++++++++++++++++++++++++++++++++++++++++++++++-- 3 files changed, 165 insertions(+), 25 deletions(-) diff --git a/index.d.ts b/index.d.ts index ebcb3e8..5e22e00 100644 --- a/index.d.ts +++ b/index.d.ts @@ -55,7 +55,7 @@ console.log(result); ``` */ export default function pMap( - input: Iterable, + input: AsyncIterable | Iterable, mapper: Mapper, options?: Options ): Promise>>; diff --git a/index.js b/index.js index b32edb4..e565982 100644 --- a/index.js +++ b/index.js @@ -20,23 +20,33 @@ export default async function pMap( const result = []; const errors = []; const skippedIndexes = []; - const iterator = iterable[Symbol.iterator](); let isRejected = false; let isIterableDone = false; let resolvingCount = 0; let currentIndex = 0; + let asyncIterator = false; + let iterator; + + if (iterable[Symbol.iterator] === undefined) { + // We've got an async iterable + iterator = iterable[Symbol.asyncIterator](); + asyncIterator = true; + } else { + iterator = iterable[Symbol.iterator](); + } const reject = reason => { isRejected = true; reject_(reason); }; - const next = () => { + const next = async () => { if (isRejected) { return; } - const nextItem = iterator.next(); + const nextItem = asyncIterator ? await iterator.next() : iterator.next(); + const index = currentIndex; currentIndex++; @@ -60,6 +70,7 @@ export default async function pMap( resolvingCount++; + // Intentionally not awaited (async () => { try { const element = await nextItem.value; @@ -69,6 +80,7 @@ export default async function pMap( } const value = await mapper(element, index); + if (value === pMapSkip) { skippedIndexes.push(index); } else { @@ -76,7 +88,7 @@ export default async function pMap( } resolvingCount--; - next(); + await next(); } catch (error) { if (stopOnError) { reject(error); @@ -89,7 +101,7 @@ export default async function pMap( // If we continue calling next() indefinitely we will likely end up // in an infinite loop of failed iteration. try { - next(); + await next(); } catch (error) { reject(error); } @@ -98,23 +110,27 @@ export default async function pMap( })(); }; - for (let index = 0; index < concurrency; index++) { - // Catch errors from the iterable.next() call - // In that case we can't really continue regardless of stopOnError state - // since an iterable is likely to continue throwing after it throws once. - // If we continue calling next() indefinitely we will likely end up - // in an infinite loop of failed iteration. - try { - next(); - } catch (error) { - reject(error); - break; - } + // Create the concurrent runners in a detached (non-awaited) + // promise. We need this so we can await the next() calls + // to stop creating runners before hitting the concurrency limit + // if the iterable has already been marked as done. + // NOTE: We *must* do this for async iterators otherwise we'll spin up + // infinite next() calls by default and never start the event loop. + (async () => { + for (let index = 0; index < concurrency; index++) { + try { + // eslint-disable-next-line no-await-in-loop + await next(); + } catch (error) { + reject(error); + break; + } - if (isIterableDone || isRejected) { - break; + if (isIterableDone || isRejected) { + break; + } } - } + })(); }); } diff --git a/test.js b/test.js index fdcb7c5..687cb74 100644 --- a/test.js +++ b/test.js @@ -7,7 +7,9 @@ import AggregateError from 'aggregate-error'; import pMap, {pMapSkip} from './index.js'; const sharedInput = [ - Promise.resolve([10, 300]), + [async () => { + return 10; + }, 300], [20, 200], [30, 100] ]; @@ -15,7 +17,9 @@ const sharedInput = [ const errorInput1 = [ [20, 200], [30, 100], - [() => Promise.reject(new Error('foo')), 10], + [async () => { + throw new Error('foo'); + }, 10], [() => { throw new Error('bar'); }, 10] @@ -23,7 +27,9 @@ const errorInput1 = [ const errorInput2 = [ [20, 200], - [() => Promise.reject(new Error('bar')), 10], + [async () => { + throw new Error('bar'); + }, 10], [30, 100], [() => { throw new Error('foo'); @@ -152,10 +158,128 @@ test('pMapSkip', async t => { }); test('do not run mapping after stop-on-error happened', async t => { - const input = [1, delay(300, {value: 2}), 3]; + const input = [1, async () => delay(300, {value: 2}), 3]; const mappedValues = []; await t.throwsAsync( pMap(input, async value => { + value = typeof value === 'function' ? await value() : value; + mappedValues.push(value); + if (value === 1) { + await delay(100); + throw new Error('Oops!'); + } + }) + ); + await delay(500); + t.deepEqual(mappedValues, [1, 3, 2]); +}); + +class AsyncTestData { + constructor(data) { + this.data = data; + } + + async * [Symbol.asyncIterator]() { + for (let i = 0; i < this.data.length; i++) { + // Add a delay between each item iterated + // eslint-disable-next-line no-await-in-loop + await delay(10); + yield this.data[i]; + } + } +} + +// +// Async Iterator tests +// + +test('asyncIterator - main', async t => { + const end = timeSpan(); + t.deepEqual(await pMap(new AsyncTestData(sharedInput), mapper), [10, 20, 30]); + + // We give it some leeway on both sides of the expected 300ms as the exact value depends on the machine and workload. + t.true(inRange(end(), {start: 290, end: 430})); +}); + +test('asyncIterator - concurrency: 1', async t => { + const end = timeSpan(); + t.deepEqual(await pMap(new AsyncTestData(sharedInput), mapper, {concurrency: 1}), [10, 20, 30]); + t.true(inRange(end(), {start: 590, end: 760})); +}); + +test('asyncIterator - concurrency: 4', async t => { + const concurrency = 4; + let running = 0; + + await pMap(new AsyncTestData(Array.from({length: 100}).fill(0)), async () => { + running++; + t.true(running <= concurrency); + await delay(randomInt(30, 200)); + running--; + }, {concurrency}); +}); + +test('asyncIterator - handles empty iterable', async t => { + t.deepEqual(await pMap(new AsyncTestData([]), mapper), []); +}); + +test('asyncIterator - async with concurrency: 2 (random time sequence)', async t => { + const input = Array.from({length: 10}).map(() => randomInt(0, 100)); + const mapper = value => delay(value, {value}); + const result = await pMap(new AsyncTestData(input), mapper, {concurrency: 2}); + t.deepEqual(result, input); +}); + +test('asyncIterator - async with concurrency: 2 (problematic time sequence)', async t => { + const input = [100, 200, 10, 36, 13, 45]; + const mapper = value => delay(value, {value}); + const result = await pMap(new AsyncTestData(input), mapper, {concurrency: 2}); + t.deepEqual(result, input); +}); + +test('asyncIterator - async with concurrency: 2 (out of order time sequence)', async t => { + const input = [200, 100, 50]; + const mapper = value => delay(value, {value}); + const result = await pMap(new AsyncTestData(input), mapper, {concurrency: 2}); + t.deepEqual(result, input); +}); + +test('asyncIterator - enforce number in options.concurrency', async t => { + await t.throwsAsync(pMap(new AsyncTestData([]), () => {}, {concurrency: 0}), {instanceOf: TypeError}); + await t.throwsAsync(pMap(new AsyncTestData([]), () => {}, {concurrency: 1.5}), {instanceOf: TypeError}); + await t.notThrowsAsync(pMap(new AsyncTestData([]), () => {}, {concurrency: 1})); + await t.notThrowsAsync(pMap(new AsyncTestData([]), () => {}, {concurrency: 10})); + await t.notThrowsAsync(pMap(new AsyncTestData([]), () => {}, {concurrency: Number.POSITIVE_INFINITY})); +}); + +test('asyncIterator - immediately rejects when stopOnError is true', async t => { + await t.throwsAsync(pMap(new AsyncTestData(errorInput1), mapper, {concurrency: 1}), {message: 'foo'}); + await t.throwsAsync(pMap(new AsyncTestData(errorInput2), mapper, {concurrency: 1}), {message: 'bar'}); +}); + +test('asyncIterator - aggregate errors when stopOnError is false', async t => { + await t.notThrowsAsync(pMap(new AsyncTestData(sharedInput), mapper, {concurrency: 1, stopOnError: false})); + await t.throwsAsync(pMap(new AsyncTestData(errorInput1), mapper, {concurrency: 1, stopOnError: false}), {instanceOf: AggregateError, message: /foo(.|\n)*bar/}); + await t.throwsAsync(pMap(new AsyncTestData(errorInput2), mapper, {concurrency: 1, stopOnError: false}), {instanceOf: AggregateError, message: /bar(.|\n)*foo/}); +}); + +test('asyncIterator - pMapSkip', async t => { + t.deepEqual(await pMap(new AsyncTestData([ + 1, + pMapSkip, + 2 + ]), async value => value), [1, 2]); +}); + +test('asyncIterator - do not run mapping after stop-on-error happened', async t => { + const input = [1, async () => delay(300, {value: 2}), 3]; + const mappedValues = []; + await t.throwsAsync( + pMap(new AsyncTestData(input), async value => { + if (typeof value === 'function') { + value = await value(); + } + mappedValues.push(value); if (value === 1) { await delay(100); From 8477a7ab7b84a81824627d3696d389e6fcf1ca59 Mon Sep 17 00:00:00 2001 From: Harold Hunt Date: Tue, 7 Sep 2021 11:45:56 -0400 Subject: [PATCH 2/6] Update index.js Co-authored-by: Sindre Sorhus --- index.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/index.js b/index.js index e565982..12f5756 100644 --- a/index.js +++ b/index.js @@ -70,7 +70,7 @@ export default async function pMap( resolvingCount++; - // Intentionally not awaited + // Intentionally detached (async () => { try { const element = await nextItem.value; From d5a0b26c5d33cf597f7b4562376e1243f6d0bda7 Mon Sep 17 00:00:00 2001 From: harold Date: Sun, 12 Sep 2021 15:38:34 -0400 Subject: [PATCH 3/6] Prevent skip removal logic from being run twice --- index.js | 29 +++++++++++++++-------------- test.js | 31 +++++++++++++++++++++++++------ 2 files changed, 40 insertions(+), 20 deletions(-) diff --git a/index.js b/index.js index 12f5756..473589a 100644 --- a/index.js +++ b/index.js @@ -21,42 +21,43 @@ export default async function pMap( const errors = []; const skippedIndexes = []; let isRejected = false; + let isRejectedOrResolved = false; let isIterableDone = false; let resolvingCount = 0; let currentIndex = 0; - let asyncIterator = false; - let iterator; - - if (iterable[Symbol.iterator] === undefined) { - // We've got an async iterable - iterator = iterable[Symbol.asyncIterator](); - asyncIterator = true; - } else { - iterator = iterable[Symbol.iterator](); - } + const iterator = iterable[Symbol.iterator] === undefined ? iterable[Symbol.asyncIterator]() : iterable[Symbol.iterator](); const reject = reason => { isRejected = true; + isRejectedOrResolved = true; reject_(reason); }; const next = async () => { - if (isRejected) { + if (isRejectedOrResolved) { return; } - const nextItem = asyncIterator ? await iterator.next() : iterator.next(); + const nextItem = await iterator.next(); const index = currentIndex; currentIndex++; + // Note: iterator.next() can be called many times in parallel. + // This can cause multiple calls to this next() function to + // receive a `nextItem` with `done === true`. + // The shutdown logic that rejects/resolves must be protected + // so it runs only one time as the `skippedIndex` logic is + // non-idempotent. if (nextItem.done) { isIterableDone = true; - if (resolvingCount === 0) { + if (resolvingCount === 0 && !isRejectedOrResolved) { if (!stopOnError && errors.length > 0) { reject(new AggregateError(errors)); } else { + isRejectedOrResolved = true; + for (const skippedIndex of skippedIndexes) { result.splice(skippedIndex, 1); } @@ -75,7 +76,7 @@ export default async function pMap( try { const element = await nextItem.value; - if (isRejected) { + if (isRejectedOrResolved) { return; } diff --git a/test.js b/test.js index 687cb74..fe96714 100644 --- a/test.js +++ b/test.js @@ -157,7 +157,7 @@ test('pMapSkip', async t => { ], async value => value), [1, 2]); }); -test('do not run mapping after stop-on-error happened', async t => { +test('all mappers should run when concurrency is infinite, even after stop-on-error happened', async t => { const input = [1, async () => delay(300, {value: 2}), 3]; const mappedValues = []; await t.throwsAsync( @@ -271,7 +271,7 @@ test('asyncIterator - pMapSkip', async t => { ]), async value => value), [1, 2]); }); -test('asyncIterator - do not run mapping after stop-on-error happened', async t => { +test('asyncIterator - all mappers should run when concurrency is infinite, even after stop-on-error happened', async t => { const input = [1, async () => delay(300, {value: 2}), 3]; const mappedValues = []; await t.throwsAsync( @@ -283,13 +283,13 @@ test('asyncIterator - do not run mapping after stop-on-error happened', async t mappedValues.push(value); if (value === 1) { await delay(100); - throw new Error('Oops!'); + throw new Error(`Oops! ${value}`); } - }, - {concurrency: 1}) + }), + {message: 'Oops! 1'} ); await delay(500); - t.deepEqual(mappedValues, [1]); + t.deepEqual(mappedValues, [1, 3, 2]); }); test('catches exception from source iterator - 1st item', async t => { @@ -349,3 +349,22 @@ test('catches exception from source iterator - 2nd item after 1st item mapper th t.is(input.index, 2); t.deepEqual(mappedValues, [0]); }); + +test('asyncIterator - get the correct exception after stop-on-error', async t => { + const input = [1, async () => delay(200, {value: 2}), async () => delay(300, {value: 3})]; + const mappedValues = []; + + const task = pMap(new AsyncTestData(input), async value => { + if (typeof value === 'function') { + value = await value(); + } + + mappedValues.push(value); + // Throw for each item - all should fail and we should get only the first + await delay(100); + throw new Error(`Oops! ${value}`); + }); + await delay(500); + await t.throwsAsync(task, {message: 'Oops! 1'}); + t.deepEqual(mappedValues, [1, 2, 3]); +}); From 6d4156dfbf012af9364efbd46ffde8d7cb5112b0 Mon Sep 17 00:00:00 2001 From: harold Date: Thu, 16 Sep 2021 13:42:35 -0400 Subject: [PATCH 4/6] Rename per request --- index.js | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/index.js b/index.js index 473589a..2365a76 100644 --- a/index.js +++ b/index.js @@ -21,7 +21,7 @@ export default async function pMap( const errors = []; const skippedIndexes = []; let isRejected = false; - let isRejectedOrResolved = false; + let isResolved = false; let isIterableDone = false; let resolvingCount = 0; let currentIndex = 0; @@ -29,12 +29,12 @@ export default async function pMap( const reject = reason => { isRejected = true; - isRejectedOrResolved = true; + isResolved = true; reject_(reason); }; const next = async () => { - if (isRejectedOrResolved) { + if (isResolved) { return; } @@ -52,11 +52,11 @@ export default async function pMap( if (nextItem.done) { isIterableDone = true; - if (resolvingCount === 0 && !isRejectedOrResolved) { + if (resolvingCount === 0 && !isResolved) { if (!stopOnError && errors.length > 0) { reject(new AggregateError(errors)); } else { - isRejectedOrResolved = true; + isResolved = true; for (const skippedIndex of skippedIndexes) { result.splice(skippedIndex, 1); @@ -76,7 +76,7 @@ export default async function pMap( try { const element = await nextItem.value; - if (isRejectedOrResolved) { + if (isResolved) { return; } From 8b6e6f009a5cb487819d4fa41e968def5216cc7b Mon Sep 17 00:00:00 2001 From: harold Date: Thu, 16 Sep 2021 14:00:19 -0400 Subject: [PATCH 5/6] Update readme to document async iterables - Also distinguish from items returning a promise - Update the TS types to indicate that iterated items can actually be a `Promise` --- index.d.ts | 2 +- readme.md | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/index.d.ts b/index.d.ts index 5e22e00..d918dc7 100644 --- a/index.d.ts +++ b/index.d.ts @@ -55,7 +55,7 @@ console.log(result); ``` */ export default function pMap( - input: AsyncIterable | Iterable, + input: AsyncIterable> | Iterable>, mapper: Mapper, options?: Options ): Promise>>; diff --git a/readme.md b/readme.md index 1a08047..099d94a 100644 --- a/readme.md +++ b/readme.md @@ -43,9 +43,11 @@ Returns a `Promise` that is fulfilled when all promises in `input` and ones retu #### input -Type: `Iterable` +Type: `AsyncIterable | unknown> | Iterable | unknown>` -Iterated over concurrently in the `mapper` function. +Synchronous or asynchronous iterable that is iterated over concurrently, calling the `mapper` function for each element. Each iterated item is `await`'d before the `mapper` is invoked so the iterable may return a `Promise` that resolves to an item. + +Asynchoronous iterables (distinguishing from iterables that return `Promise` that resolves to an item) can be used when the next item may not be ready without waiting for an asynchronous process to complete and/or the end of the iterable may be reached after the asynchornous proces completes (e.g. reading from a remote queue when the queue has reached empty, or reading lines from a stream). #### mapper(element, index) From c9e71126c0a94ef36cb0d3ac24d656a4ad6ee71f Mon Sep 17 00:00:00 2001 From: harold Date: Thu, 23 Sep 2021 17:48:18 -0400 Subject: [PATCH 6/6] Attempt to address all requests --- index.d.ts | 2 +- index.js | 16 ++++++++++------ readme.md | 4 ++-- test.js | 22 ++++++++++++++++------ 4 files changed, 29 insertions(+), 15 deletions(-) diff --git a/index.d.ts b/index.d.ts index d918dc7..a5423ff 100644 --- a/index.d.ts +++ b/index.d.ts @@ -28,7 +28,7 @@ export type Mapper = ( ) => NewElement | Promise; /** -@param input - Iterated over concurrently in the `mapper` function. +@param input - Synchronous or asynchronous iterable that is iterated over concurrently, calling the `mapper` function for each element. Each iterated item is `await`'d before the `mapper` is invoked so the iterable may return a `Promise` that resolves to an item. Asynchronous iterables (different from synchronous iterables that return `Promise` that resolves to an item) can be used when the next item may not be ready without waiting for an asynchronous process to complete and/or the end of the iterable may be reached after the asynchronous process completes. For example, reading from a remote queue when the queue has reached empty, or reading lines from a stream. @param mapper - Function which is called for every item in `input`. Expected to return a `Promise` or value. @returns A `Promise` that is fulfilled when all promises in `input` and ones returned from `mapper` are fulfilled, or rejects if any of the promises reject. The fulfilled value is an `Array` of the fulfilled values returned from `mapper` in `input` order. diff --git a/index.js b/index.js index 2365a76..c7c9fba 100644 --- a/index.js +++ b/index.js @@ -9,6 +9,10 @@ export default async function pMap( } = {} ) { return new Promise((resolve, reject_) => { // eslint-disable-line promise/param-names + if (iterable[Symbol.iterator] === undefined && iterable[Symbol.asyncIterator] === undefined) { + throw new TypeError(`Expected \`input\` to be either an \`Iterable\` or \`AsyncIterable\`, got (${typeof iterable})`); + } + if (typeof mapper !== 'function') { throw new TypeError('Mapper function is required'); } @@ -43,8 +47,8 @@ export default async function pMap( const index = currentIndex; currentIndex++; - // Note: iterator.next() can be called many times in parallel. - // This can cause multiple calls to this next() function to + // Note: `iterator.next()` can be called many times in parallel. + // This can cause multiple calls to this `next()` function to // receive a `nextItem` with `done === true`. // The shutdown logic that rejects/resolves must be protected // so it runs only one time as the `skippedIndex` logic is @@ -97,9 +101,9 @@ export default async function pMap( errors.push(error); resolvingCount--; - // In that case we can't really continue regardless of stopOnError state + // In that case we can't really continue regardless of `stopOnError` state // since an iterable is likely to continue throwing after it throws once. - // If we continue calling next() indefinitely we will likely end up + // If we continue calling `next()` indefinitely we will likely end up // in an infinite loop of failed iteration. try { await next(); @@ -112,11 +116,11 @@ export default async function pMap( }; // Create the concurrent runners in a detached (non-awaited) - // promise. We need this so we can await the next() calls + // promise. We need this so we can await the `next()` calls // to stop creating runners before hitting the concurrency limit // if the iterable has already been marked as done. // NOTE: We *must* do this for async iterators otherwise we'll spin up - // infinite next() calls by default and never start the event loop. + // infinite `next()` calls by default and never start the event loop. (async () => { for (let index = 0; index < concurrency; index++) { try { diff --git a/readme.md b/readme.md index 099d94a..7c8ba3a 100644 --- a/readme.md +++ b/readme.md @@ -45,9 +45,9 @@ Returns a `Promise` that is fulfilled when all promises in `input` and ones retu Type: `AsyncIterable | unknown> | Iterable | unknown>` -Synchronous or asynchronous iterable that is iterated over concurrently, calling the `mapper` function for each element. Each iterated item is `await`'d before the `mapper` is invoked so the iterable may return a `Promise` that resolves to an item. +Synchronous or asynchronous iterable that is iterated over concurrently, calling the `mapper` function for each element. Each iterated item is `await`'d before the `mapper` is invoked so the iterable may return a `Promise` that resolves to an item. -Asynchoronous iterables (distinguishing from iterables that return `Promise` that resolves to an item) can be used when the next item may not be ready without waiting for an asynchronous process to complete and/or the end of the iterable may be reached after the asynchornous proces completes (e.g. reading from a remote queue when the queue has reached empty, or reading lines from a stream). +Asynchronous iterables (different from synchronous iterables that return `Promise` that resolves to an item) can be used when the next item may not be ready without waiting for an asynchronous process to complete and/or the end of the iterable may be reached after the asynchronous process completes. For example, reading from a remote queue when the queue has reached empty, or reading lines from a stream. #### mapper(element, index) diff --git a/test.js b/test.js index fe96714..2120258 100644 --- a/test.js +++ b/test.js @@ -7,9 +7,7 @@ import AggregateError from 'aggregate-error'; import pMap, {pMapSkip} from './index.js'; const sharedInput = [ - [async () => { - return 10; - }, 300], + [async () => 10, 300], [20, 200], [30, 100] ]; @@ -180,11 +178,11 @@ class AsyncTestData { } async * [Symbol.asyncIterator]() { - for (let i = 0; i < this.data.length; i++) { - // Add a delay between each item iterated + for (let index = 0; index < this.data.length; index++) { + // Add a delay between each iterated item // eslint-disable-next-line no-await-in-loop await delay(10); - yield this.data[i]; + yield this.data[index]; } } } @@ -368,3 +366,15 @@ test('asyncIterator - get the correct exception after stop-on-error', async t => await t.throwsAsync(task, {message: 'Oops! 1'}); t.deepEqual(mappedValues, [1, 2, 3]); }); + +test('incorrect input type', async t => { + let mapperCalled = false; + + const task = pMap(123456, async () => { + mapperCalled = true; + await delay(100); + }); + await delay(500); + await t.throwsAsync(task, {message: 'Expected `input` to be either an `Iterable` or `AsyncIterable`, got (number)'}); + t.false(mapperCalled); +});