From 088f1a5035c3667ce77ec5dd90a97ffe608255a6 Mon Sep 17 00:00:00 2001 From: Dmitri Date: Mon, 12 Aug 2024 02:48:29 +0300 Subject: [PATCH] Fix concurrency control in pMapIterable Fixes sindresorhus/p-map#76 --- index.js | 10 +++++----- test.js | 17 +++++++++++++++++ 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/index.js b/index.js index 2f7d91c..f7e210e 100644 --- a/index.js +++ b/index.js @@ -191,31 +191,31 @@ export function pMapIterable( const iterator = iterable[Symbol.asyncIterator] === undefined ? iterable[Symbol.iterator]() : iterable[Symbol.asyncIterator](); const promises = []; - let runningMappersCount = 0; + let pendingPromisesCount = 0; let isDone = false; let index = 0; function trySpawn() { - if (isDone || !(runningMappersCount < concurrency && promises.length < backpressure)) { + if (isDone || !(pendingPromisesCount < concurrency && promises.length < backpressure)) { return; } const promise = (async () => { + pendingPromisesCount++; const {done, value} = await iterator.next(); if (done) { + pendingPromisesCount--; return {done: true}; } - runningMappersCount++; - // Spawn if still below concurrency and backpressure limit trySpawn(); try { const returnValue = await mapper(await value, index++); - runningMappersCount--; + pendingPromisesCount--; if (returnValue === pMapSkip) { const index = promises.indexOf(promise); diff --git a/test.js b/test.js index 9db3013..484bfeb 100644 --- a/test.js +++ b/test.js @@ -637,6 +637,23 @@ test('pMapIterable - backpressure', async t => { t.is(currentValue, 40); }); +test('pMapIterable - async input, backpressure > concurrency', async t => { + async function * source() { + yield 1; + yield 2; + yield 3; + } + + const log = []; + await collectAsyncIterable(pMapIterable(source(), async n => { + log.push(n); + await delay(100); + log.push(n); + }, {concurrency: 1, backpressure: 2})); + + t.deepEqual(log, [1, 1, 2, 2, 3, 3]); +}); + test('pMapIterable - pMapSkip', async t => { t.deepEqual(await collectAsyncIterable(pMapIterable([ 1,