From a13a80bc2543462d19d154dfd5833783de897ebf Mon Sep 17 00:00:00 2001 From: Carlos Fuentes Date: Sat, 12 Oct 2024 07:04:02 +0200 Subject: [PATCH] chore(H2): onboard H2 into Undici queueing system (#3707) (cherry picked from commit d6c44f3b49949dbdd66d9c56e729d503e6b09ddc) --- lib/dispatcher/client-h2.js | 75 +++++-- package.json | 3 + test/http2.js | 322 +++++++++++++++--------------- test/node-test/client-dispatch.js | 5 +- 4 files changed, 224 insertions(+), 181 deletions(-) diff --git a/lib/dispatcher/client-h2.js b/lib/dispatcher/client-h2.js index 6c5155717d1..0448fa00736 100644 --- a/lib/dispatcher/client-h2.js +++ b/lib/dispatcher/client-h2.js @@ -24,7 +24,9 @@ const { kOnError, kMaxConcurrentStreams, kHTTP2Session, - kResume + kResume, + kSize, + kHTTPContext } = require('../core/symbols.js') const kOpenStreams = Symbol('open streams') @@ -160,11 +162,10 @@ async function connectH2 (client, socket) { version: 'h2', defaultPipelining: Infinity, write (...args) { - // TODO (fix): return - writeH2(client, ...args) + return writeH2(client, ...args) }, resume () { - + resumeH2(client) }, destroy (err, callback) { if (closed) { @@ -183,6 +184,20 @@ async function connectH2 (client, socket) { } } +function resumeH2 (client) { + const socket = client[kSocket] + + if (socket?.destroyed === false) { + if (client[kSize] === 0 && client[kMaxConcurrentStreams] === 0) { + socket.unref() + client[kHTTP2Session].unref() + } else { + socket.ref() + client[kHTTP2Session].ref() + } + } +} + function onHttp2SessionError (err) { assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID') @@ -210,17 +225,32 @@ function onHttp2SessionEnd () { * along with the socket right away */ function onHTTP2GoAway (code) { - const err = new RequestAbortedError(`HTTP/2: "GOAWAY" frame received with code ${code}`) + // We cannot recover, so best to close the session and the socket + const err = this[kError] || new SocketError(`HTTP/2: "GOAWAY" frame received with code ${code}`, util.getSocketInfo(this)) + const client = this[kClient] - // We need to trigger the close cycle right away - // We need to destroy the session and the socket - // Requests should be failed with the error after the current one is handled - this[kSocket][kError] = err - this[kClient][kOnError](err) + client[kSocket] = null + client[kHTTPContext] = null - this.unref() + if (this[kHTTP2Session] != null) { + this[kHTTP2Session].destroy(err) + this[kHTTP2Session] = null + } util.destroy(this[kSocket], err) + + // Fail head of pipeline. + const request = client[kQueue][client[kRunningIdx]] + client[kQueue][client[kRunningIdx]++] = null + util.errorRequest(client, request, err) + + client[kPendingIdx] = client[kRunningIdx] + + assert(client[kRunning] === 0) + + client.emit('disconnect', client[kUrl], [client], err) + + client[kResume]() } // https://www.rfc-editor.org/rfc/rfc7230#section-3.3.2 @@ -237,10 +267,6 @@ function writeH2 (client, request) { return false } - if (request.aborted) { - return false - } - const headers = {} for (let n = 0; n < reqHeaders.length; n += 2) { const key = reqHeaders[n + 0] @@ -283,6 +309,8 @@ function writeH2 (client, request) { // We do not destroy the socket as we can continue using the session // the stream get's destroyed and the session remains to create new streams util.destroy(body, err) + client[kQueue][client[kRunningIdx]++] = null + client[kResume]() } try { @@ -293,6 +321,10 @@ function writeH2 (client, request) { util.errorRequest(client, request, err) } + if (request.aborted) { + return false + } + if (method === 'CONNECT') { session.ref() // We are already connected, streams are pending, first request @@ -304,10 +336,12 @@ function writeH2 (client, request) { if (stream.id && !stream.pending) { request.onUpgrade(null, null, stream) ++session[kOpenStreams] + client[kQueue][client[kRunningIdx]++] = null } else { stream.once('ready', () => { request.onUpgrade(null, null, stream) ++session[kOpenStreams] + client[kQueue][client[kRunningIdx]++] = null }) } @@ -428,17 +462,20 @@ function writeH2 (client, request) { // Present specially when using pipeline or stream if (stream.state?.state == null || stream.state.state < 6) { request.onComplete([]) - return } - // Stream is closed or half-closed-remote (6), decrement counter and cleanup - // It does not have sense to continue working with the stream as we do not - // have yet RST_STREAM support on client-side if (session[kOpenStreams] === 0) { + // Stream is closed or half-closed-remote (6), decrement counter and cleanup + // It does not have sense to continue working with the stream as we do not + // have yet RST_STREAM support on client-side + session.unref() } abort(new InformationalError('HTTP/2: stream half-closed (remote)')) + client[kQueue][client[kRunningIdx]++] = null + client[kPendingIdx] = client[kRunningIdx] + client[kResume]() }) stream.once('close', () => { diff --git a/package.json b/package.json index 80b5b95e3f1..7170c4e8320 100644 --- a/package.json +++ b/package.json @@ -78,6 +78,9 @@ "test:fuzzing": "node test/fuzzing/fuzzing.test.js", "test:fetch": "npm run build:node && npm run test:fetch:nobuild", "test:fetch:nobuild": "borp --timeout 180000 --expose-gc --concurrency 1 -p \"test/fetch/*.js\" && npm run test:webidl && npm run test:busboy", + "test:h2": "npm run test:h2:core && npm run test:h2:fetch", + "test:h2:core": "borp -p \"test/http2*.js\"", + "test:h2:fetch": "npm run build:node && borp -p \"test/fetch/http2*.js\"", "test:interceptors": "borp -p \"test/interceptors/*.js\"", "test:jest": "cross-env NODE_V8_COVERAGE= jest", "test:unit": "borp --expose-gc -p \"test/*.js\"", diff --git a/test/http2.js b/test/http2.js index a43700574b8..d6840a1bd15 100644 --- a/test/http2.js +++ b/test/http2.js @@ -217,66 +217,6 @@ test('Should support H2 connection(POST Buffer)', async t => { t.strictEqual(Buffer.concat(body).toString('utf8'), 'hello h2!') }) -test('Should support H2 GOAWAY (server-side)', async t => { - const body = [] - const server = createSecureServer(pem) - - server.on('stream', (stream, headers) => { - t.strictEqual(headers['x-my-header'], 'foo') - t.strictEqual(headers[':method'], 'GET') - stream.respond({ - 'content-type': 'text/plain; charset=utf-8', - 'x-custom-h2': 'hello', - ':status': 200 - }) - stream.end('hello h2!') - }) - - server.on('session', session => { - setTimeout(() => { - session.goaway(0) - }, 1000) - }) - - server.listen(0) - await once(server, 'listening') - - const client = new Client(`https://localhost:${server.address().port}`, { - connect: { - rejectUnauthorized: false - }, - allowH2: true - }) - - t = tspl(t, { plan: 9 }) - after(() => server.close()) - after(() => client.close()) - - const response = await client.request({ - path: '/', - method: 'GET', - headers: { - 'x-my-header': 'foo' - } - }) - - response.body.on('data', chunk => { - body.push(chunk) - }) - - await once(response.body, 'end') - t.strictEqual(response.statusCode, 200) - t.strictEqual(response.headers['content-type'], 'text/plain; charset=utf-8') - t.strictEqual(response.headers['x-custom-h2'], 'hello') - t.strictEqual(Buffer.concat(body).toString('utf8'), 'hello h2!') - - const [url, disconnectClient, err] = await once(client, 'disconnect') - - t.ok(url instanceof URL) - t.deepStrictEqual(disconnectClient, [client]) - t.strictEqual(err.message, 'HTTP/2: "GOAWAY" frame received with code 0') -}) - test('Should throw if bad allowH2 has been passed', async t => { t = tspl(t, { plan: 1 }) @@ -852,7 +792,10 @@ test('Should handle h2 request with body (string or buffer) - dispatch', async t onHeaders (statusCode, headers) { t.strictEqual(statusCode, 200) t.strictEqual(headers[0].toString('utf-8'), 'content-type') - t.strictEqual(headers[1].toString('utf-8'), 'text/plain; charset=utf-8') + t.strictEqual( + headers[1].toString('utf-8'), + 'text/plain; charset=utf-8' + ) t.strictEqual(headers[2].toString('utf-8'), 'x-custom-h2') t.strictEqual(headers[3].toString('utf-8'), 'foo') }, @@ -1183,56 +1126,53 @@ test('Agent should support H2 connection', async t => { t.strictEqual(Buffer.concat(body).toString('utf8'), 'hello h2!') }) -test( - 'Should provide pseudo-headers in proper order', - async t => { - t = tspl(t, { plan: 2 }) +test('Should provide pseudo-headers in proper order', async t => { + t = tspl(t, { plan: 2 }) - const server = createSecureServer(pem) - server.on('stream', (stream, _headers, _flags, rawHeaders) => { - t.deepStrictEqual(rawHeaders, [ - ':authority', - `localhost:${server.address().port}`, - ':method', - 'GET', - ':path', - '/', - ':scheme', - 'https' - ]) + const server = createSecureServer(pem) + server.on('stream', (stream, _headers, _flags, rawHeaders) => { + t.deepStrictEqual(rawHeaders, [ + ':authority', + `localhost:${server.address().port}`, + ':method', + 'GET', + ':path', + '/', + ':scheme', + 'https' + ]) - stream.respond({ - 'content-type': 'text/plain; charset=utf-8', - ':status': 200 - }) - stream.end() + stream.respond({ + 'content-type': 'text/plain; charset=utf-8', + ':status': 200 }) + stream.end() + }) - server.listen(0) - await once(server, 'listening') + server.listen(0) + await once(server, 'listening') - const client = new Client(`https://localhost:${server.address().port}`, { - connect: { - rejectUnauthorized: false - }, - allowH2: true - }) + const client = new Client(`https://localhost:${server.address().port}`, { + connect: { + rejectUnauthorized: false + }, + allowH2: true + }) - after(() => server.close()) - after(() => client.close()) + after(() => server.close()) + after(() => client.close()) - const response = await client.request({ - path: '/', - method: 'GET' - }) + const response = await client.request({ + path: '/', + method: 'GET' + }) - t.strictEqual(response.statusCode, 200) + t.strictEqual(response.statusCode, 200) - await response.body.dump() + await response.body.dump() - await t.complete - } -) + await t.complete +}) test('The h2 pseudo-headers is not included in the headers', async t => { const server = createSecureServer(pem) @@ -1287,16 +1227,20 @@ test('Should throw informational error on half-closed streams (remote)', async t }) t = tspl(t, { plan: 2 }) - after(() => server.close()) - after(() => client.close()) - - await client.request({ - path: '/', - method: 'GET' - }).catch(err => { - t.strictEqual(err.message, 'HTTP/2: stream half-closed (remote)') - t.strictEqual(err.code, 'UND_ERR_INFO') + after(async () => { + server.close() + await client.close() }) + + await client + .request({ + path: '/', + method: 'GET' + }) + .catch(err => { + t.strictEqual(err.message, 'HTTP/2: stream half-closed (remote)') + t.strictEqual(err.code, 'UND_ERR_INFO') + }) }) test('#2364 - Concurrent aborts', async t => { @@ -1325,62 +1269,76 @@ test('#2364 - Concurrent aborts', async t => { allowH2: true }) - t = tspl(t, { plan: 18 }) + t = tspl(t, { plan: 14 }) after(() => server.close()) after(() => client.close()) - const controller = new AbortController() + const signal = AbortSignal.timeout(50) - client.request({ - path: '/1', - method: 'GET', - headers: { - 'x-my-header': 'foo' + client.request( + { + path: '/1', + method: 'GET', + headers: { + 'x-my-header': 'foo' + } + }, + (err, response) => { + t.ifError(err) + t.strictEqual( + response.headers['content-type'], + 'text/plain; charset=utf-8' + ) + t.strictEqual(response.headers['x-custom-h2'], 'hello') + t.strictEqual(response.statusCode, 200) } - }, (err, response) => { - t.ifError(err) - t.strictEqual(response.headers['content-type'], 'text/plain; charset=utf-8') - t.strictEqual(response.headers['x-custom-h2'], 'hello') - t.strictEqual(response.statusCode, 200) - response.body.dump() - }) + ) - client.request({ - path: '/2', - method: 'GET', - headers: { - 'x-my-header': 'foo' + client.request( + { + path: '/2', + method: 'GET', + headers: { + 'x-my-header': 'foo' + }, + signal }, - signal: controller.signal - }, (err, response) => { - t.strictEqual(err.name, 'AbortError') - }) - - client.request({ - path: '/3', - method: 'GET', - headers: { - 'x-my-header': 'foo' + (err, response) => { + t.strictEqual(err.name, 'TimeoutError') } - }, (err, response) => { - t.ifError(err) - t.strictEqual(response.headers['content-type'], 'text/plain; charset=utf-8') - t.strictEqual(response.headers['x-custom-h2'], 'hello') - t.strictEqual(response.statusCode, 200) - response.body.dump() - }) + ) - client.request({ - path: '/4', - method: 'GET', - headers: { - 'x-my-header': 'foo' + client.request( + { + path: '/3', + method: 'GET', + headers: { + 'x-my-header': 'foo' + } }, - signal: controller.signal - }, (err, response) => { - t.strictEqual(err.name, 'AbortError') - }) + (err, response) => { + t.ifError(err) + t.strictEqual( + response.headers['content-type'], + 'text/plain; charset=utf-8' + ) + t.strictEqual(response.headers['x-custom-h2'], 'hello') + t.strictEqual(response.statusCode, 200) + } + ) - controller.abort() + client.request( + { + path: '/4', + method: 'GET', + headers: { + 'x-my-header': 'foo' + }, + signal + }, + (err, response) => { + t.strictEqual(err.name, 'TimeoutError') + } + ) await t.completed }) @@ -1418,8 +1376,8 @@ test('#3046 - GOAWAY Frame', async t => { }) t = tspl(t, { plan: 7 }) - after(() => server.close()) after(() => client.close()) + after(() => server.close()) client.on('disconnect', (url, disconnectClient, err) => { t.ok(url instanceof URL) @@ -1439,10 +1397,56 @@ test('#3046 - GOAWAY Frame', async t => { t.strictEqual(response.headers['x-custom-h2'], 'hello') t.strictEqual(response.statusCode, 200) - t.rejects(response.body.text.bind(response.body), { + t.rejects(response.body.text(), { message: 'HTTP/2: "GOAWAY" frame received with code 0', - code: 'UND_ERR_ABORTED' + code: 'UND_ERR_SOCKET' + }) + + await t.completed +}) + +test('#3671 - Graceful close', async (t) => { + const server = createSecureServer(pem) + + server.on('stream', (stream, headers) => { + setTimeout(() => { + stream.respond({ + 'content-type': 'text/plain; charset=utf-8', + 'x-custom-h2': 'hello', + ':status': 200 + }) + stream.end('Hello World') + }, 200) }) + server.listen(0) + await once(server, 'listening') + + const client = new Client(`https://localhost:${server.address().port}`, { + connect: { + rejectUnauthorized: false + }, + allowH2: true + }) + + t = tspl(t, { plan: 5 }) + after(() => server.close()) + + client.request({ + path: '/', + method: 'GET', + headers: { + 'x-my-header': 'foo' + } + }, async (err, response) => { + t.ifError(err) + t.strictEqual(response.headers['content-type'], 'text/plain; charset=utf-8') + t.strictEqual(response.headers['x-custom-h2'], 'hello') + t.strictEqual(response.statusCode, 200) + t.equal(await response.body.text(), 'Hello World') + }) + + await client.close() + await t.completed }) diff --git a/test/node-test/client-dispatch.js b/test/node-test/client-dispatch.js index f9ed888d44b..296e3b8d075 100644 --- a/test/node-test/client-dispatch.js +++ b/test/node-test/client-dispatch.js @@ -1051,7 +1051,7 @@ test('Issue#3065 - fix bad destroy handling', async (t) => { test('Issue#3065 - fix bad destroy handling (h2)', async (t) => { // Due to we handle the session, the request for h2 will fail on servername change - const p = tspl(t, { plan: 5 }) + const p = tspl(t, { plan: 4 }) const server = createSecureServer(pem) server.on('stream', (stream) => { stream.respond({ @@ -1105,8 +1105,7 @@ test('Issue#3065 - fix bad destroy handling (h2)', async (t) => { p.deepStrictEqual(dispatches, ['onConnect', 'onBodySent', 'onResponseStarted', 'onHeaders1', 'onData', 'onComplete']) }, onError (err) { - p.strictEqual(err.code, 'UND_ERR_INFO') - p.strictEqual(err.message, 'servername changed') + p.ifError(err) } })