Skip to content

Commit

Permalink
Fix concurrency control in pMapIterable
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitri-gb committed Aug 11, 2024
1 parent a38d5a7 commit 088f1a5
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 5 deletions.
10 changes: 5 additions & 5 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -191,31 +191,31 @@ export function pMapIterable(
const iterator = iterable[Symbol.asyncIterator] === undefined ? iterable[Symbol.iterator]() : iterable[Symbol.asyncIterator]();

const promises = [];
let runningMappersCount = 0;
let pendingPromisesCount = 0;
let isDone = false;
let index = 0;

function trySpawn() {
if (isDone || !(runningMappersCount < concurrency && promises.length < backpressure)) {
if (isDone || !(pendingPromisesCount < concurrency && promises.length < backpressure)) {
return;
}

const promise = (async () => {
pendingPromisesCount++;
const {done, value} = await iterator.next();

if (done) {
pendingPromisesCount--;
return {done: true};
}

runningMappersCount++;

// Spawn if still below concurrency and backpressure limit
trySpawn();

try {
const returnValue = await mapper(await value, index++);

runningMappersCount--;
pendingPromisesCount--;

if (returnValue === pMapSkip) {
const index = promises.indexOf(promise);
Expand Down
17 changes: 17 additions & 0 deletions test.js
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,23 @@ test('pMapIterable - backpressure', async t => {
t.is(currentValue, 40);
});

test('pMapIterable - async input, backpressure > concurrency', async t => {
async function * source() {
yield 1;
yield 2;
yield 3;
}

const log = [];
await collectAsyncIterable(pMapIterable(source(), async n => {
log.push(n);
await delay(100);
log.push(n);
}, {concurrency: 1, backpressure: 2}));

t.deepEqual(log, [1, 1, 2, 2, 3, 3]);
});

test('pMapIterable - pMapSkip', async t => {
t.deepEqual(await collectAsyncIterable(pMapIterable([
1,
Expand Down

0 comments on commit 088f1a5

Please sign in to comment.