Skip to content

Commit

Permalink
Request async iterator (#278)
Browse files Browse the repository at this point in the history
* Update eslintrc and fix linter error.

* Add async iterator support.
  • Loading branch information
hbgl authored Aug 14, 2023
1 parent 7d5aafb commit 9a565e0
Show file tree
Hide file tree
Showing 3 changed files with 337 additions and 2 deletions.
9 changes: 8 additions & 1 deletion .eslintrc
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
env:
node: true
es2017: true

parserOptions:
ecmaVersion: 2019

rules:
comma-dangle: 2
Expand Down Expand Up @@ -42,7 +46,10 @@ rules:
no-sync: 2
no-loop-func: 2
no-labels: 2
no-unused-vars: 1
no-unused-vars:
- 1
- argsIgnorePattern: ^_
varsIgnorePattern: ^_
no-script-url: 2
no-proto: 2
no-iterator: 2
Expand Down
105 changes: 104 additions & 1 deletion lib/mockRequest.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ function createRequest(options) {
mockRequest.originalUrl = options.originalUrl || mockRequest.url;
mockRequest.baseUrl = options.baseUrl || mockRequest.url;
mockRequest.path = options.path ||
((options.url ? url.parse(options.url).pathname : ''));
(options.url ? url.parse(options.url).pathname : '');
mockRequest.params = options.params ? options.params : {};
if (options.session) {
mockRequest.session = options.session;
Expand Down Expand Up @@ -540,6 +540,109 @@ function createRequest(options) {
return subdomains.slice(offset);
}());

/**
* Function: asyncIterator
*
* Buffers data, error, end, and close events and yields them in order.
* Unlike stream.Readable, this async iterator implementation will not exit
* early on error or close.
*/
mockRequest[Symbol.asyncIterator] = async function* asyncIterator() {
let ended = false;
let closed = false;
let error = null;
let chunks = [];
let resolvePromise = null;

const promiseExecutor = resolve => {
resolvePromise = resolve;
};

const promiseResolver = () => {
if (resolvePromise) {
resolvePromise();
resolvePromise = null;
}
};
const dataEventHandler = chunk => {
if (ended || closed || error) {
return;
}
chunks.push(chunk);
promiseResolver();
};
const endEventHandler = () => {
if (ended || closed || error) {
return;
}
ended = true;
promiseResolver();
};
const closeEventHandler = () => {
if (closed || error) {
return;
}
closed = true;
promiseResolver();
};
const errorEventHandler = err => {
if (closed || error) {
return;
}
error = err;
promiseResolver();
};

mockRequest.on('data', dataEventHandler);
mockRequest.on('end', endEventHandler);
mockRequest.on('close', closeEventHandler);
mockRequest.on('error', errorEventHandler);

// Emit custom event after entering the loop.
setTimeout(() => {
this.emit('async_iterator');
});

try {
for (;;) {
await new Promise(promiseExecutor);
let i = 0;
for (;;) {
if (error) {
throw error;
}
if (closed) {
return;
}

const hasChunks = i < chunks.length;
if (!hasChunks) {
if (ended) {
// End signaled. Bail.
return;
}
// Wait for next push.
break;
}

const chunk = chunks[i];
chunks[i] = undefined;
i += 1;
yield chunk;
}
chunks.length = 0;
}
} finally {
chunks.length = 0;
error = null;

mockRequest.off('data', dataEventHandler);
mockRequest.off('end', endEventHandler);
mockRequest.off('close', closeEventHandler);
mockRequest.off('error', errorEventHandler);
}
};

return mockRequest;
}

Expand Down
225 changes: 225 additions & 0 deletions test/lib/mockRequest.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -993,4 +993,229 @@ describe('mockRequest', function() {

});

describe('asyncIterator', function() {

async function collect(asyncIterable) {
const chunks = [];
for await (const chunk of asyncIterable) {
chunks.push(chunk);
}
return chunks;
}

it('should iterate when sending data', async function() {
const request = mockRequest.createRequest();

const chunksPromise = collect(request);
request.send('test data');

const data = Buffer.concat(await chunksPromise).toString();
expect(data).to.equal('test data');
});

it('should iterate synchronous pushes', async function() {
const request = mockRequest.createRequest();

const chunksPromise = collect(request);
request.emit('data', Buffer.from('foo'));
request.emit('data', Buffer.from('bar'));
request.emit('data', Buffer.from('baz'));
request.emit('end');

const data = Buffer.concat(await chunksPromise).toString();
expect(data).to.equal('foobarbaz');
});

it('should ignore push after end', async function() {
const request = mockRequest.createRequest();

const chunksPromise = collect(request);
request.emit('data', Buffer.from('foo'));
request.emit('end');
request.emit('data', Buffer.from('bar'));

const data = Buffer.concat(await chunksPromise).toString();
expect(data).to.equal('foo');
});

it('should iterate asynchronous pushes', async function() {
const request = mockRequest.createRequest();

const chunksPromise = collect(request);
request.emit('data', Buffer.from('foo'));
await new Promise(r => setTimeout(r));
request.emit('data', Buffer.from('bar'));
await new Promise(r => setTimeout(r));
request.emit('data', Buffer.from('baz'));
await new Promise(r => setTimeout(r));
request.emit('end');

const data = Buffer.concat(await chunksPromise).toString();
expect(data).to.equal('foobarbaz');
});

it('should support asynchronous pushes while iterating', async function() {
const request = mockRequest.createRequest();

const chunksPromise = (async () => {
const extraPushes = ['3', '2', '1'];
const chunks = [];
for await (const chunk of request) {
chunks.push(chunk);
if (extraPushes.length > 0) {
request.emit('data', Buffer.from(extraPushes.pop()));
await new Promise(r => setTimeout(r));
}
}
return chunks;
})();

request.emit('data', Buffer.from('foo'));
await new Promise(r => setTimeout(r));
request.emit('data', Buffer.from('bar'));
await new Promise(r => setTimeout(r));
request.emit('data', Buffer.from('baz'));
await new Promise(r => setTimeout(r));
request.emit('end');

const data = Buffer.concat(await chunksPromise).toString();
expect(data).to.equal('foo1bar2baz3');
});

it('supports error', async function() {
const request = mockRequest.createRequest();

/** @type {AsyncIterator} */
const iterator = request[Symbol.asyncIterator]();
const error = new Error('Test error');

const nextPromise = iterator.next();
request.emit('error', error);

try {
await nextPromise;
expect.fail();
} catch (e) {
expect(e).to.equal(error);
}
});

it('supports throw', async function() {
const request = mockRequest.createRequest();

/** @type {AsyncIterator} */
const iterator = request[Symbol.asyncIterator]();
const error = new Error('Test error');

const nextPromise = iterator.next();
request.emit('data', Buffer.from('foo'));
await nextPromise;

try {
await iterator.throw(error);
expect.fail();
} catch (e) {
expect(e).to.equal(error);
return;
}
});

it('first error wins', async function() {
const request = mockRequest.createRequest();

/** @type {AsyncIterator} */
const iterator = request[Symbol.asyncIterator]();
const error1 = new Error('Test error 1');
const error2 = new Error('Test error 2');

const nextPromise = iterator.next();
request.emit('error', error1);
request.emit('error', error2);

try {
await nextPromise;
expect.fail();
} catch (e) {
expect(e).to.equal(error1);
}
});

it('supports return', async function() {
const request = mockRequest.createRequest();

/** @type {AsyncIterator} */
const iterator = request[Symbol.asyncIterator]();

const result = await iterator.return();
expect(result.done).to.equal(true);
});

['close', 'error'].forEach(event => {
it(`discards buffer on ${event}`, async function () {
const request = mockRequest.createRequest();

const chunksPromise = (async () => {
const chunks = [];
try {
for await (const data of request) {
chunks.push(data);
}
} catch (e) {
// Ignore
}
return chunks;
})();

request.emit('data', Buffer.from('foo'));
await new Promise(r => setTimeout(r));
request.emit('data', Buffer.from('bar'));
request.emit(event, event === 'error' ? new Error('Test error') : undefined);
request.emit('data', Buffer.from('baz'));

const data = Buffer.concat(await chunksPromise).toString();
expect(data).to.equal('foo');
});
});

it('emits custom event after creation', async () => {
const request = mockRequest.createRequest();

request.on('async_iterator', () => {
request.emit('data', Buffer.from('foo'));
request.emit('data', Buffer.from('bar'));
request.emit('data', Buffer.from('baz'));
request.emit('end');
});

const data = Buffer.concat(await collect(request)).toString();
expect(data).to.equal('foobarbaz');
});

if (typeof global.Request === 'function') {
it('can be fed to a Fetch API Request body', async function () {
const request = mockRequest.createRequest();

// eslint-disable-next-line no-undef
const webRequest = new Request('http://example.com', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: request,
duplex: 'half'
});

request.on('async_iterator', () => {
request.emit('data', Buffer.from('{ "foo": "b'));
request.emit('data', Buffer.from('ar" }'));
request.emit('end');
});

const webRequestJson = await webRequest.json();
expect(webRequestJson).to.deep.equal({ foo: 'bar' });
});
}

});

});

0 comments on commit 9a565e0

Please sign in to comment.