Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

async_hooks: don't reuse resource in HttpAgent when queued #34439

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 26 additions & 9 deletions lib/_http_agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const EventEmitter = require('events');
let debug = require('internal/util/debuglog').debuglog('http', (fn) => {
debug = fn;
});
const { AsyncResource } = require('async_hooks');
const { async_id_symbol } = require('internal/async_hooks').symbols;
const {
codes: {
Expand All @@ -47,6 +48,7 @@ const { validateNumber } = require('internal/validators');

const kOnKeylog = Symbol('onkeylog');
const kRequestOptions = Symbol('requestOptions');
const kRequestAsyncResource = Symbol('requestAsyncResource');
// New Agent code.

// The largest departure from the previous implementation is that
Expand Down Expand Up @@ -127,7 +129,17 @@ function Agent(options) {
const requests = this.requests[name];
if (requests && requests.length) {
const req = requests.shift();
setRequestSocket(this, req, socket);
Flarna marked this conversation as resolved.
Show resolved Hide resolved
const reqAsyncRes = req[kRequestAsyncResource];
if (reqAsyncRes) {
puzpuzpuz marked this conversation as resolved.
Show resolved Hide resolved
// Run request within the original async context.
reqAsyncRes.runInAsyncScope(() => {
asyncResetHandle(socket);
setRequestSocket(this, req, socket);
});
req[kRequestAsyncResource] = null;
} else {
setRequestSocket(this, req, socket);
}
if (requests.length === 0) {
Flarna marked this conversation as resolved.
Show resolved Hide resolved
delete this.requests[name];
}
Expand Down Expand Up @@ -253,14 +265,7 @@ Agent.prototype.addRequest = function addRequest(req, options, port/* legacy */,
const sockLen = freeLen + this.sockets[name].length;

if (socket) {
// Guard against an uninitialized or user supplied Socket.
const handle = socket._handle;
if (handle && typeof handle.asyncReset === 'function') {
// Assign the handle a new asyncId and run any destroy()/init() hooks.
handle.asyncReset(new ReusedHandle(handle.getProviderType(), handle));
socket[async_id_symbol] = handle.getAsyncId();
}

asyncResetHandle(socket);
this.reuseSocket(socket, req);
setRequestSocket(this, req, socket);
this.sockets[name].push(socket);
Expand All @@ -284,6 +289,8 @@ Agent.prototype.addRequest = function addRequest(req, options, port/* legacy */,

// Used to create sockets for pending requests from different origin
req[kRequestOptions] = options;
// Used to capture the original async context.
req[kRequestAsyncResource] = new AsyncResource('QueuedRequest');

this.requests[name].push(req);
}
Expand Down Expand Up @@ -493,6 +500,16 @@ function setRequestSocket(agent, req, socket) {
socket.setTimeout(req.timeout);
}

function asyncResetHandle(socket) {
// Guard against an uninitialized or user supplied Socket.
const handle = socket._handle;
if (handle && typeof handle.asyncReset === 'function') {
// Assign the handle a new asyncId and run any destroy()/init() hooks.
handle.asyncReset(new ReusedHandle(handle.getProviderType(), handle));
socket[async_id_symbol] = handle.getAsyncId();
}
}

module.exports = {
Agent,
globalAgent: new Agent()
Expand Down
35 changes: 35 additions & 0 deletions test/async-hooks/test-async-local-storage-http-agent.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const { AsyncLocalStorage } = require('async_hooks');
const http = require('http');

const asyncLocalStorage = new AsyncLocalStorage();

const agent = new http.Agent({
maxSockets: 1
});

const N = 3;
let responses = 0;

const server = http.createServer(common.mustCall((req, res) => {
res.end('ok');
}, N));

server.listen(0, common.mustCall(() => {
const port = server.address().port;

for (let i = 0; i < N; i++) {
asyncLocalStorage.run(i, () => {
http.get({ agent, port }, common.mustCall((res) => {
assert.strictEqual(asyncLocalStorage.getStore(), i);
if (++responses === N) {
server.close();
agent.destroy();
}
res.resume();
}));
});
}
}));
92 changes: 92 additions & 0 deletions test/async-hooks/test-http-agent-handle-reuse-parallel.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
'use strict';
// Flags: --expose-internals
const common = require('../common');
const initHooks = require('./init-hooks');
const { checkInvocations } = require('./hook-checks');
const assert = require('assert');
const { async_id_symbol } = require('internal/async_hooks').symbols;
const http = require('http');

// Checks that the async resource used in init in case of a reused handle
// is not reused. Test is based on parallel\test-async-hooks-http-agent.js.

const hooks = initHooks();
hooks.enable();

const reqAsyncIds = [];
let socket;
let responses = 0;

// Make sure a single socket is transparently reused for 2 requests.
const agent = new http.Agent({
keepAlive: true,
keepAliveMsecs: Infinity,
maxSockets: 1
});

const verifyRequest = (idx) => (res) => {
reqAsyncIds[idx] = res.socket[async_id_symbol];
assert.ok(reqAsyncIds[idx] > 0, `${reqAsyncIds[idx]} > 0`);
if (socket) {
// Check that both requests share their socket.
assert.strictEqual(res.socket, socket);
} else {
socket = res.socket;
}

res.on('data', common.mustCallAtLeast(() => {}));
res.on('end', common.mustCall(() => {
if (++responses === 2) {
// Clean up to let the event loop stop.
server.close();
agent.destroy();
}
}));
};

const server = http.createServer(common.mustCall((req, res) => {
req.once('data', common.mustCallAtLeast(() => {
res.writeHead(200, { 'Content-Type': 'text/plain' });
res.write('foo');
}));
req.on('end', common.mustCall(() => {
res.end('bar');
}));
}, 2)).listen(0, common.mustCall(() => {
const port = server.address().port;
const payload = 'hello world';

// First request.
const r1 = http.request({
agent, port, method: 'POST'
}, common.mustCall(verifyRequest(0)));
r1.end(payload);

// Second request. Sent in parallel with the first one.
const r2 = http.request({
agent, port, method: 'POST'
}, common.mustCall(verifyRequest(1)));
r2.end(payload);
}));


process.on('exit', onExit);

function onExit() {
hooks.disable();
hooks.sanityCheck();
const activities = hooks.activities;

// Verify both invocations
const first = activities.filter((x) => x.uid === reqAsyncIds[0])[0];
checkInvocations(first, { init: 1, destroy: 1 }, 'when process exits');

const second = activities.filter((x) => x.uid === reqAsyncIds[1])[0];
checkInvocations(second, { init: 1, destroy: 1 }, 'when process exits');

// Verify reuse handle has been wrapped
assert.strictEqual(first.type, second.type);
assert.ok(first.handle !== second.handle, 'Resource reused');
assert.ok(first.handle === second.handle.handle,
'Resource not wrapped correctly');
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const assert = require('assert');
const { async_id_symbol } = require('internal/async_hooks').symbols;
const http = require('http');

// Checks that the async resource used in init in case of a resused handle
// Checks that the async resource used in init in case of a reused handle
// is not reused. Test is based on parallel\test-async-hooks-http-agent.js.

const hooks = initHooks();
Expand Down