Skip to content

Commit

Permalink
1. add promiseEmitter to preserveOrder: false Promise.race to address…
Browse files Browse the repository at this point in the history
… case of: infinite concurrency + async iterable producing >1 element

2. use `!isSyncIterator` as shortcut for `isPromiseLike(next)` (`next` is promise iff iterator is async)
3. add `trySpawn` to the `returnValue === pMapSkip && preserveOrder && (promise mapping next input iterable element is pending` branch
4. add tests for changes (1) and (3)
5. tests `rangeAround` helper
6. extra `pMapSkip` tests
7. test for sindresorhus#76
  • Loading branch information
tgfisher4 committed Aug 25, 2024
1 parent ba50244 commit 4b8f367
Show file tree
Hide file tree
Showing 2 changed files with 158 additions and 26 deletions.
63 changes: 51 additions & 12 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -193,28 +193,61 @@ export function pMapIterable(

return {
async * [Symbol.asyncIterator]() {
const iterator = iterable[Symbol.asyncIterator] === undefined ? iterable[Symbol.iterator]() : iterable[Symbol.asyncIterator]();
const isSyncIterator = iterable[Symbol.asyncIterator] === undefined;
const iterator = isSyncIterator ? iterable[Symbol.iterator]() : iterable[Symbol.asyncIterator]();

const promises = [];
const promisesIndexFromInputIndex = {};
const inputIndexFromPromisesIndex = [];
let runningMappersCount = 0;
let isDone = false;
let inputIndex = 0;
let outputIndex = 0; // Only used when `preserveOrder: false`
let outputIndex = 0; // Only used when `preserveOrder: true`

// This event emitter prevents the race conditions that arises when:
// - `preserveOrder: false`
// - `promises` are added after `Promise.race` is invoked, since `Promise.race` only races the promises that existed in its input array at call time
// More specifically, this occurs when (in addition to `preserveOrder: false`):
// - `concurrency === Number.PositiveInfinity && Number.PositiveInfinity === backpressure`
// - this forces us to forgo eagerly filling the `promises` pool to avoid infinite recursion
// - IMO this is the root of this problem, and a problem in and of itself: we should consider requiring a finite concurrency & backpressure
// - given the inability to eagerly filing the `promises` pool with infinite concurrency & backpressure, there are some situations in which specifying
// a finite concurrency & backpressure will be faster than specifying the otherwise faster-sounding infinite concurrency & backpressure
// - an async iterator input iterable
// - `mapNext` can't `trySpawn` until it `await`s its `next`, since the input iterable might be done
// - the initial `trySpawn` thus ends when the execution of `mapNext` is suspended to `await next`
// - the input iterable produces more than one element
// - the (single) running `mapNext`'s `trySpawn` _will necessarily_ (since concurrency and backpressure are infinite)
// start another `mapNext` promise that `trySpawn` adds to `promises`
// - this additional promise does not partake in the already-running `nextPromise`, because its underlying `Promise.race` began without it,
// when the initial `trySpawn` returned and `nextPromise` was invoked from the main loop
const promiseEmitter = new EventTarget(); // Only used when `preserveOrder: false`
const promiseEmitterEvent = 'promiseFulfilled';

const nextPromise = preserveOrder
// Treat `promises` as a queue
? () => {
// May be undefined bc of `pMapSkip`s
// May be `undefined` bc of `pMapSkip`s
while (promisesIndexFromInputIndex[outputIndex] === undefined) {
outputIndex += 1;
}

return promises[promisesIndexFromInputIndex[outputIndex++]];
}
// Treat `promises` as a pool (order doesn't matter)
: () => Promise.race(promises);
: () => Promise.race([
// Ensures correctness in the case that mappers resolve between the time that one `await nextPromise()` resolves and the next `nextPromise` call is made
// (these promises would otherwise be lost if an event emitter is not listening - the `promises` pool buffers resolved promises to be processed)
// (I wonder if it may be actually be possible to convert the `preserveOrder: false` case to _exclusively_ event-based,
// but such a solution may get messy since we'd want to `yield` from a callback, likely requiring a resolved promises buffer anyway...)
Promise.race(promises),
// Ensures correctness in the case that more promises are added to `promises` after the initial `nextPromise` call is made
// (these additional promises are not be included in the above `Promise.race`)
// (see comment above `promiseEmitter` declaration for details on when this can occur)
new Promise(resolve => {
promiseEmitter.addEventListener(promiseEmitterEvent, r => resolve(r.detail), {once: true});
}),
]);

function popPromise(inputIndex) {
// Swap the fulfilled promise with the last element to avoid an O(n) shift to the `promises` array
Expand All @@ -239,7 +272,7 @@ export function pMapIterable(
let next;
try {
next = iterator.next();
if (isPromiseLike(next)) {
if (!isSyncIterator) { // `!isSyncIterator` iff `isPromiseLike(next)`, but former is already computed
// Optimization: if our concurrency and/or backpressure is bounded (so that we won't infinitely recurse),
// and we need to `await` the next `iterator` element, we first eagerly spawn more `mapNext` promises,
// so that these promises can begin `await`ing their respective `iterator` elements (if needed) and `mapper` results in parallel.
Expand All @@ -250,6 +283,7 @@ export function pMapIterable(
// However, the time needed to `await` and ignore these `done` promises is presumed to be small relative to the time needed to perform common
// `async` operations like disk reads, network requests, etc.
// Overall, this can reduce the total time taken to process all elements.
// Potential TODO: in the `concurrency === Number.POSITIVE_INFINITY` case, we could potentially still optimize here by eagerly spawning some # of promises.
if (backpressure !== Number.POSITIVE_INFINITY) {
// Spawn if still below concurrency and backpressure limit
trySpawn();
Expand Down Expand Up @@ -291,12 +325,15 @@ export function pMapIterable(
if (returnValue === pMapSkip) {
// If `preserveOrder: true`, resolve to the next inputIndex's promise, in case we are already being `await`ed
// NOTE: no chance that `myInputIndex + 1`-spawning code is waiting to be executed in another part of the event loop,
// but currently `promisesIndexFromInputIndex[myInputIndex + 1] === undefined` (so that we incorrectly `mapNext` and
// this potentially-currently-awaited promise resolves to the result of mapping a later element than a different member of
// `promises`, i.e. `promises` resolve out of order), because all `trySpawn`/`mapNext` calls execute the bookkeeping synchronously,
// before any `await`s.
// but currently `promisesIndexFromInputIndex[myInputIndex + 1] === undefined` (so that we incorrectly skip this `if` condition and
// instead call `mapNext`, causing this potentially-currently-awaited promise to resolve to the result of mapping an element
// of the input iterable that was produced later `myInputIndex + 1`, i.e., no chance `promises` resolve out of order, because:
// all `trySpawn`/`mapNext` calls execute their bookkeeping synchronously, before any `await`s, so we cannot observe an intermediate
// state in which input the promise mapping iterable element `myInputIndex + 1` has not been recorded in the `promisesIndexFromInputIndex` ledger.
if (preserveOrder && promisesIndexFromInputIndex[myInputIndex + 1] !== undefined) {
popPromise(myInputIndex);
// Spawn if still below backpressure limit and just dropped below concurrency limit
trySpawn();
return promises[promisesIndexFromInputIndex[myInputIndex + 1]];
}

Expand All @@ -321,24 +358,26 @@ export function pMapIterable(
// Reserve index in `promises` array: we don't actually have the promise to save yet,
// but we don't want recursive `trySpawn` calls to use this same index.
// This is safe (i.e., the empty slot won't be `await`ed) because we replace the value immediately,
// without yielding to the event loop, so no consumers (namely `getAndRemoveFromPoolNextPromise`)
// without yielding to the event loop, so no consumers (namely `nextPromise`)
// can observe the intermediate state.
const promisesIndex = promises.length++;
promises[promisesIndex] = mapNext(promisesIndex);
promises[promisesIndex].then(p => promiseEmitter.dispatchEvent(new CustomEvent(promiseEmitterEvent, {detail: p})));
}

// Bootstrap `promises`
trySpawn();

while (promises.length > 0) {
const {result: {error, done, value}, inputIndex} = await nextPromise();// eslint-disable-line no-await-in-loop
const {result: {error, done, value}, inputIndex} = await nextPromise(); // eslint-disable-line no-await-in-loop
popPromise(inputIndex);

if (error) {
throw error;
}

if (done) {
// When `preserveOrder: false`, ignore to consume any remaining pending promises in the pool
// When `preserveOrder: false`, `continue` to consume any remaining pending promises in the pool
if (!preserveOrder) {
continue;
}
Expand Down
121 changes: 107 additions & 14 deletions test.js
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ class ThrowingIterator {
}
}

function rangeAround(expected) {
return {start: expected - 5, end: expected + 50};
}

test('main', async t => {
const end = timeSpan();
t.deepEqual(await pMap(sharedInput, mapper), [10, 20, 30]);
Expand Down Expand Up @@ -622,9 +626,9 @@ test('pMapIterable - concurrency: 2', async t => {

assertInRange(t, times.get(10), {start: 0, end: 50});
assertInRange(t, times.get(20), {start: 0, end: 50});
assertInRange(t, times.get(30), {start: 195, end: 250});
assertInRange(t, times.get(40), {start: 295, end: 350});
assertInRange(t, times.get(50), {start: 295, end: 350});
assertInRange(t, times.get(30), rangeAround(200));
assertInRange(t, times.get(40), rangeAround(300));
assertInRange(t, times.get(50), rangeAround(300));
});

test('pMapIterable - backpressure', async t => {
Expand Down Expand Up @@ -716,6 +720,18 @@ test('pMapIterable - complex pMapSkip pattern - concurrency 2 - preserveOrder: f
t.assert(result.length === 8);
});

test('pMapIterable - pMapSkip + preserveOrder: true + next input mapping promise pending - eagerly spawns next promise', async t => {
const end = timeSpan();
const testData = [
[pMapSkip, 100],
[2, 200],
[3, 100], // Ensure 3 is spawned when pMapSkip ends (otherwise, overall runtime will be 300 ms)
];
const result = await collectAsyncIterable(pMapIterable(testData, mapper, {preserveOrder: true, concurrency: 2}));
assertInRange(t, end(), rangeAround(200));
t.deepEqual(result, [2, 3]);
});

test('pMapIterable - async iterable input', async t => {
const result = await collectAsyncIterable(pMapIterable(new AsyncTestData(sharedInput), mapper));
t.deepEqual(result, [10, 20, 30]);
Expand Down Expand Up @@ -759,26 +775,47 @@ function * promiseGenerator() {
})();
}

// Differs from `AsyncTestData` because it is not a generator:
// each `next` yields a promise that does not wait for previous `next` promises to finish.
const asyncIterableDoingWorkOnEachNext = (start, stop) => {
let i = start;
return {
[Symbol.asyncIterator]() {
return {
async next() {
const me = i++;
if (me > stop) {
return {done: true};
}

await delay(100);
return {done: false, value: me};
},
};
},
};
};

test('pMapIterable - eager spawn when input iterable returns promise', async t => {
const end = timeSpan();
await collectAsyncIterable(pMapIterable(promiseGenerator(), value => delay(100, {value}), {concurrency: 3}));
assertInRange(t, end(), {start: 195, end: 250});
await collectAsyncIterable(pMapIterable(asyncIterableDoingWorkOnEachNext(1, 3), value => value, /* value => delay(100, {value}), */{concurrency: 5}));
assertInRange(t, end(), rangeAround(100));
});

test('pMapIterable - eager spawn when input iterable returns promise incurs little overhead', async t => {
const end = timeSpan();
await collectAsyncIterable(pMapIterable(promiseGenerator(), value => delay(100, {value}), {concurrency: 100}));
assertInRange(t, end(), {start: 195, end: 250});
assertInRange(t, end(), rangeAround(200));
});

test('pMapIterable - preserveOrder: false - yields mappings as they resolve', async t => {
const end = timeSpan();
const result = await collectAsyncIterable(pMapIterable(sharedInput, mapper, {preserveOrder: false}));
t.deepEqual(result, [30, 20, 10]);
assertInRange(t, end(), {start: 295, end: 350});
assertInRange(t, end(), rangeAround(300));
});

test('pMapIterable - preserveOrder: false - more complex example', async t => {
test('pMapIterable - preserveOrder: false - more complex example - sync iterable and bounded concurrency', async t => {
t.deepEqual(await collectAsyncIterable(pMapIterable([
[1, 200],
[2, 100],
Expand All @@ -789,6 +826,38 @@ test('pMapIterable - preserveOrder: false - more complex example', async t => {
], mapper, {concurrency: 3, preserveOrder: false})), [2, 3, 1, 5, 6, 4]);
});

test('pMapIterable - preserveOrder: false - more complex example - async iterable and unbounded concurrency', async t => {
const testData = [
[1, 200],
[2, 125],
[3, 150],
[4, 200],
[5, 100],
[6, 75],
];
async function * asyncIterable() {
yield * testData;
}

t.deepEqual(await collectAsyncIterable(pMapIterable(asyncIterable(), mapper, {concurrency: Number.POSITIVE_INFINITY, preserveOrder: false})), testData.toSorted(([_aId, aMs], [_bId, bMs]) => aMs - bMs).map(([id, _ms]) => id));
});

test('pMapIterable - preserveOrder: false - more complex example - sync promise-returning iterable and unbounded concurrency', async t => {
const testData = [
[1, 200],
[2, 125],
[3, 150],
[4, 225],
[5, 100],
[6, 75],
];
function * syncPromiseReturningIterable() {
yield * testData.map(d => Promise.resolve(d));
}

t.deepEqual(await collectAsyncIterable(pMapIterable(syncPromiseReturningIterable(), mapper, {concurrency: Number.POSITIVE_INFINITY, preserveOrder: false})), testData.toSorted(([_aId, aMs], [_bId, bMs]) => aMs - bMs).map(([id, _ms]) => id));
});

test('pMapIterable - preserveOrder: false - concurrency: 2', async t => {
const input = [100, 200, 10, 36, 13, 45];
const times = new Map();
Expand All @@ -799,12 +868,12 @@ test('pMapIterable - preserveOrder: false - concurrency: 2', async t => {
return delay(value, {value});
}, {concurrency: 2, backpressure: Number.POSITIVE_INFINITY, preserveOrder: false})), [100, 10, 36, 13, 200, 45]);

assertInRange(t, times.get(100), {start: 0, end: 50});
assertInRange(t, times.get(200), {start: 0, end: 50});
assertInRange(t, times.get(10), {start: times.get(100) + 100 - 5, end: times.get(100) + 100 + 50});
assertInRange(t, times.get(36), {start: times.get(10) + 10 - 5, end: times.get(10) + 10 + 50});
assertInRange(t, times.get(13), {start: times.get(36) + 36 - 5, end: times.get(36) + 36 + 50});
assertInRange(t, times.get(45), {start: times.get(13) + 13 - 5, end: times.get(13) + 13 + 50});
assertInRange(t, times.get(100), rangeAround(0));
assertInRange(t, times.get(200), rangeAround(0));
assertInRange(t, times.get(10), rangeAround(times.get(100) + 100));
assertInRange(t, times.get(36), rangeAround(times.get(10) + 10));
assertInRange(t, times.get(13), rangeAround(times.get(36) + 36));
assertInRange(t, times.get(45), rangeAround(times.get(13) + 13));
});

test('pMapIterable - preserveOrder: false - backpressure', async t => {
Expand Down Expand Up @@ -846,3 +915,27 @@ test('pMapIterable - preserveOrder: false - throws first error to settle', async
}, 10],
], mapper, {preserveOrder: false, concurrency: 2})), {message: 'bar'});
});

test('pMapIterable - {concurrency: 1, backpressure: 2} => no concurrent mappers (#76)', async t => {
const theLog = [];
const log = message => theLog.push(message);
const startLog = n => `${n}: mapper start`;
const endLog = n => `${n}: mapper end`;

async function * source() {
yield 1;
yield 2;
yield 3;
}

await collectAsyncIterable(pMapIterable(source(), async n => {
log(startLog(n));
await delay(100);
log(endLog(n));
}, {
concurrency: 1,
backpressure: 2,
}));
t.deepEqual(theLog, [startLog(1), endLog(1), startLog(2), endLog(2), startLog(3), endLog(3)]);
});

0 comments on commit 4b8f367

Please sign in to comment.