diff --git a/index.js b/index.js index 741f088..3c6c41f 100644 --- a/index.js +++ b/index.js @@ -193,7 +193,8 @@ export function pMapIterable( return { async * [Symbol.asyncIterator]() { - const iterator = iterable[Symbol.asyncIterator] === undefined ? iterable[Symbol.iterator]() : iterable[Symbol.asyncIterator](); + const isSyncIterator = iterable[Symbol.asyncIterator] === undefined; + const iterator = isSyncIterator ? iterable[Symbol.iterator]() : iterable[Symbol.asyncIterator](); const promises = []; const promisesIndexFromInputIndex = {}; @@ -201,12 +202,32 @@ export function pMapIterable( let runningMappersCount = 0; let isDone = false; let inputIndex = 0; - let outputIndex = 0; // Only used when `preserveOrder: false` + let outputIndex = 0; // Only used when `preserveOrder: true` + + // This event emitter prevents the race conditions that arises when: + // - `preserveOrder: false` + // - `promises` are added after `Promise.race` is invoked, since `Promise.race` only races the promises that existed in its input array at call time + // More specifically, this occurs when (in addition to `preserveOrder: false`): + // - `concurrency === Number.PositiveInfinity && Number.PositiveInfinity === backpressure` + // - this forces us to forgo eagerly filling the `promises` pool to avoid infinite recursion + // - IMO this is the root of this problem, and a problem in and of itself: we should consider requiring a finite concurrency & backpressure + // - given the inability to eagerly filing the `promises` pool with infinite concurrency & backpressure, there are some situations in which specifying + // a finite concurrency & backpressure will be faster than specifying the otherwise faster-sounding infinite concurrency & backpressure + // - an async iterator input iterable + // - `mapNext` can't `trySpawn` until it `await`s its `next`, since the input iterable might be done + // - the initial `trySpawn` thus ends when the execution of `mapNext` is suspended to `await next` + // - the input iterable produces more than one element + // - the (single) running `mapNext`'s `trySpawn` _will necessarily_ (since concurrency and backpressure are infinite) + // start another `mapNext` promise that `trySpawn` adds to `promises` + // - this additional promise does not partake in the already-running `nextPromise`, because its underlying `Promise.race` began without it, + // when the initial `trySpawn` returned and `nextPromise` was invoked from the main loop + const promiseEmitter = new EventTarget(); // Only used when `preserveOrder: false` + const promiseEmitterEvent = 'promiseFulfilled'; const nextPromise = preserveOrder // Treat `promises` as a queue ? () => { - // May be undefined bc of `pMapSkip`s + // May be `undefined` bc of `pMapSkip`s while (promisesIndexFromInputIndex[outputIndex] === undefined) { outputIndex += 1; } @@ -214,7 +235,19 @@ export function pMapIterable( return promises[promisesIndexFromInputIndex[outputIndex++]]; } // Treat `promises` as a pool (order doesn't matter) - : () => Promise.race(promises); + : () => Promise.race([ + // Ensures correctness in the case that mappers resolve between the time that one `await nextPromise()` resolves and the next `nextPromise` call is made + // (these promises would otherwise be lost if an event emitter is not listening - the `promises` pool buffers resolved promises to be processed) + // (I wonder if it may be actually be possible to convert the `preserveOrder: false` case to _exclusively_ event-based, + // but such a solution may get messy since we'd want to `yield` from a callback, likely requiring a resolved promises buffer anyway...) + Promise.race(promises), + // Ensures correctness in the case that more promises are added to `promises` after the initial `nextPromise` call is made + // (these additional promises are not be included in the above `Promise.race`) + // (see comment above `promiseEmitter` declaration for details on when this can occur) + new Promise(resolve => { + promiseEmitter.addEventListener(promiseEmitterEvent, r => resolve(r.detail), {once: true}); + }), + ]); function popPromise(inputIndex) { // Swap the fulfilled promise with the last element to avoid an O(n) shift to the `promises` array @@ -239,7 +272,7 @@ export function pMapIterable( let next; try { next = iterator.next(); - if (isPromiseLike(next)) { + if (!isSyncIterator) { // `!isSyncIterator` iff `isPromiseLike(next)`, but former is already computed // Optimization: if our concurrency and/or backpressure is bounded (so that we won't infinitely recurse), // and we need to `await` the next `iterator` element, we first eagerly spawn more `mapNext` promises, // so that these promises can begin `await`ing their respective `iterator` elements (if needed) and `mapper` results in parallel. @@ -250,6 +283,7 @@ export function pMapIterable( // However, the time needed to `await` and ignore these `done` promises is presumed to be small relative to the time needed to perform common // `async` operations like disk reads, network requests, etc. // Overall, this can reduce the total time taken to process all elements. + // Potential TODO: in the `concurrency === Number.POSITIVE_INFINITY` case, we could potentially still optimize here by eagerly spawning some # of promises. if (backpressure !== Number.POSITIVE_INFINITY) { // Spawn if still below concurrency and backpressure limit trySpawn(); @@ -291,12 +325,15 @@ export function pMapIterable( if (returnValue === pMapSkip) { // If `preserveOrder: true`, resolve to the next inputIndex's promise, in case we are already being `await`ed // NOTE: no chance that `myInputIndex + 1`-spawning code is waiting to be executed in another part of the event loop, - // but currently `promisesIndexFromInputIndex[myInputIndex + 1] === undefined` (so that we incorrectly `mapNext` and - // this potentially-currently-awaited promise resolves to the result of mapping a later element than a different member of - // `promises`, i.e. `promises` resolve out of order), because all `trySpawn`/`mapNext` calls execute the bookkeeping synchronously, - // before any `await`s. + // but currently `promisesIndexFromInputIndex[myInputIndex + 1] === undefined` (so that we incorrectly skip this `if` condition and + // instead call `mapNext`, causing this potentially-currently-awaited promise to resolve to the result of mapping an element + // of the input iterable that was produced later `myInputIndex + 1`, i.e., no chance `promises` resolve out of order, because: + // all `trySpawn`/`mapNext` calls execute their bookkeeping synchronously, before any `await`s, so we cannot observe an intermediate + // state in which input the promise mapping iterable element `myInputIndex + 1` has not been recorded in the `promisesIndexFromInputIndex` ledger. if (preserveOrder && promisesIndexFromInputIndex[myInputIndex + 1] !== undefined) { popPromise(myInputIndex); + // Spawn if still below backpressure limit and just dropped below concurrency limit + trySpawn(); return promises[promisesIndexFromInputIndex[myInputIndex + 1]]; } @@ -321,16 +358,18 @@ export function pMapIterable( // Reserve index in `promises` array: we don't actually have the promise to save yet, // but we don't want recursive `trySpawn` calls to use this same index. // This is safe (i.e., the empty slot won't be `await`ed) because we replace the value immediately, - // without yielding to the event loop, so no consumers (namely `getAndRemoveFromPoolNextPromise`) + // without yielding to the event loop, so no consumers (namely `nextPromise`) // can observe the intermediate state. const promisesIndex = promises.length++; promises[promisesIndex] = mapNext(promisesIndex); + promises[promisesIndex].then(p => promiseEmitter.dispatchEvent(new CustomEvent(promiseEmitterEvent, {detail: p}))); } + // Bootstrap `promises` trySpawn(); while (promises.length > 0) { - const {result: {error, done, value}, inputIndex} = await nextPromise();// eslint-disable-line no-await-in-loop + const {result: {error, done, value}, inputIndex} = await nextPromise(); // eslint-disable-line no-await-in-loop popPromise(inputIndex); if (error) { @@ -338,7 +377,7 @@ export function pMapIterable( } if (done) { - // When `preserveOrder: false`, ignore to consume any remaining pending promises in the pool + // When `preserveOrder: false`, `continue` to consume any remaining pending promises in the pool if (!preserveOrder) { continue; } diff --git a/test.js b/test.js index 40e7fb2..7285d5d 100644 --- a/test.js +++ b/test.js @@ -102,6 +102,10 @@ class ThrowingIterator { } } +function rangeAround(expected) { + return {start: expected - 5, end: expected + 50}; +} + test('main', async t => { const end = timeSpan(); t.deepEqual(await pMap(sharedInput, mapper), [10, 20, 30]); @@ -622,9 +626,9 @@ test('pMapIterable - concurrency: 2', async t => { assertInRange(t, times.get(10), {start: 0, end: 50}); assertInRange(t, times.get(20), {start: 0, end: 50}); - assertInRange(t, times.get(30), {start: 195, end: 250}); - assertInRange(t, times.get(40), {start: 295, end: 350}); - assertInRange(t, times.get(50), {start: 295, end: 350}); + assertInRange(t, times.get(30), rangeAround(200)); + assertInRange(t, times.get(40), rangeAround(300)); + assertInRange(t, times.get(50), rangeAround(300)); }); test('pMapIterable - backpressure', async t => { @@ -716,6 +720,18 @@ test('pMapIterable - complex pMapSkip pattern - concurrency 2 - preserveOrder: f t.assert(result.length === 8); }); +test('pMapIterable - pMapSkip + preserveOrder: true + next input mapping promise pending - eagerly spawns next promise', async t => { + const end = timeSpan(); + const testData = [ + [pMapSkip, 100], + [2, 200], + [3, 100], // Ensure 3 is spawned when pMapSkip ends (otherwise, overall runtime will be 300 ms) + ]; + const result = await collectAsyncIterable(pMapIterable(testData, mapper, {preserveOrder: true, concurrency: 2})); + assertInRange(t, end(), rangeAround(200)); + t.deepEqual(result, [2, 3]); +}); + test('pMapIterable - async iterable input', async t => { const result = await collectAsyncIterable(pMapIterable(new AsyncTestData(sharedInput), mapper)); t.deepEqual(result, [10, 20, 30]); @@ -759,26 +775,47 @@ function * promiseGenerator() { })(); } +// Differs from `AsyncTestData` because it is not a generator: +// each `next` yields a promise that does not wait for previous `next` promises to finish. +const asyncIterableDoingWorkOnEachNext = (start, stop) => { + let i = start; + return { + [Symbol.asyncIterator]() { + return { + async next() { + const me = i++; + if (me > stop) { + return {done: true}; + } + + await delay(100); + return {done: false, value: me}; + }, + }; + }, + }; +}; + test('pMapIterable - eager spawn when input iterable returns promise', async t => { const end = timeSpan(); - await collectAsyncIterable(pMapIterable(promiseGenerator(), value => delay(100, {value}), {concurrency: 3})); - assertInRange(t, end(), {start: 195, end: 250}); + await collectAsyncIterable(pMapIterable(asyncIterableDoingWorkOnEachNext(1, 3), value => value, /* value => delay(100, {value}), */{concurrency: 5})); + assertInRange(t, end(), rangeAround(100)); }); test('pMapIterable - eager spawn when input iterable returns promise incurs little overhead', async t => { const end = timeSpan(); await collectAsyncIterable(pMapIterable(promiseGenerator(), value => delay(100, {value}), {concurrency: 100})); - assertInRange(t, end(), {start: 195, end: 250}); + assertInRange(t, end(), rangeAround(200)); }); test('pMapIterable - preserveOrder: false - yields mappings as they resolve', async t => { const end = timeSpan(); const result = await collectAsyncIterable(pMapIterable(sharedInput, mapper, {preserveOrder: false})); t.deepEqual(result, [30, 20, 10]); - assertInRange(t, end(), {start: 295, end: 350}); + assertInRange(t, end(), rangeAround(300)); }); -test('pMapIterable - preserveOrder: false - more complex example', async t => { +test('pMapIterable - preserveOrder: false - more complex example - sync iterable and bounded concurrency', async t => { t.deepEqual(await collectAsyncIterable(pMapIterable([ [1, 200], [2, 100], @@ -789,6 +826,38 @@ test('pMapIterable - preserveOrder: false - more complex example', async t => { ], mapper, {concurrency: 3, preserveOrder: false})), [2, 3, 1, 5, 6, 4]); }); +test('pMapIterable - preserveOrder: false - more complex example - async iterable and unbounded concurrency', async t => { + const testData = [ + [1, 200], + [2, 125], + [3, 150], + [4, 200], + [5, 100], + [6, 75], + ]; + async function * asyncIterable() { + yield * testData; + } + + t.deepEqual(await collectAsyncIterable(pMapIterable(asyncIterable(), mapper, {concurrency: Number.POSITIVE_INFINITY, preserveOrder: false})), testData.toSorted(([_aId, aMs], [_bId, bMs]) => aMs - bMs).map(([id, _ms]) => id)); +}); + +test('pMapIterable - preserveOrder: false - more complex example - sync promise-returning iterable and unbounded concurrency', async t => { + const testData = [ + [1, 200], + [2, 125], + [3, 150], + [4, 225], + [5, 100], + [6, 75], + ]; + function * syncPromiseReturningIterable() { + yield * testData.map(d => Promise.resolve(d)); + } + + t.deepEqual(await collectAsyncIterable(pMapIterable(syncPromiseReturningIterable(), mapper, {concurrency: Number.POSITIVE_INFINITY, preserveOrder: false})), testData.toSorted(([_aId, aMs], [_bId, bMs]) => aMs - bMs).map(([id, _ms]) => id)); +}); + test('pMapIterable - preserveOrder: false - concurrency: 2', async t => { const input = [100, 200, 10, 36, 13, 45]; const times = new Map(); @@ -799,12 +868,12 @@ test('pMapIterable - preserveOrder: false - concurrency: 2', async t => { return delay(value, {value}); }, {concurrency: 2, backpressure: Number.POSITIVE_INFINITY, preserveOrder: false})), [100, 10, 36, 13, 200, 45]); - assertInRange(t, times.get(100), {start: 0, end: 50}); - assertInRange(t, times.get(200), {start: 0, end: 50}); - assertInRange(t, times.get(10), {start: times.get(100) + 100 - 5, end: times.get(100) + 100 + 50}); - assertInRange(t, times.get(36), {start: times.get(10) + 10 - 5, end: times.get(10) + 10 + 50}); - assertInRange(t, times.get(13), {start: times.get(36) + 36 - 5, end: times.get(36) + 36 + 50}); - assertInRange(t, times.get(45), {start: times.get(13) + 13 - 5, end: times.get(13) + 13 + 50}); + assertInRange(t, times.get(100), rangeAround(0)); + assertInRange(t, times.get(200), rangeAround(0)); + assertInRange(t, times.get(10), rangeAround(times.get(100) + 100)); + assertInRange(t, times.get(36), rangeAround(times.get(10) + 10)); + assertInRange(t, times.get(13), rangeAround(times.get(36) + 36)); + assertInRange(t, times.get(45), rangeAround(times.get(13) + 13)); }); test('pMapIterable - preserveOrder: false - backpressure', async t => { @@ -846,3 +915,27 @@ test('pMapIterable - preserveOrder: false - throws first error to settle', async }, 10], ], mapper, {preserveOrder: false, concurrency: 2})), {message: 'bar'}); }); + +test('pMapIterable - {concurrency: 1, backpressure: 2} => no concurrent mappers (#76)', async t => { + const theLog = []; + const log = message => theLog.push(message); + const startLog = n => `${n}: mapper start`; + const endLog = n => `${n}: mapper end`; + + async function * source() { + yield 1; + yield 2; + yield 3; + } + + await collectAsyncIterable(pMapIterable(source(), async n => { + log(startLog(n)); + await delay(100); + log(endLog(n)); + }, { + concurrency: 1, + backpressure: 2, + })); + t.deepEqual(theLog, [startLog(1), endLog(1), startLog(2), endLog(2), startLog(3), endLog(3)]); +}); +