Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use FinalizationRegistry to cancel the body if response is collected #3199

Merged
merged 26 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 25 additions & 9 deletions lib/web/fetch/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,16 @@ class Fetch extends EE {
}
}

function handleFetchDone (response) {
finalizeAndReportTiming(response, 'fetch')
}

// https://fetch.spec.whatwg.org/#fetch-method
function fetch (input, init = undefined) {
webidl.argumentLengthCheck(arguments, 1, 'globalThis.fetch')

// 1. Let p be a new promise.
const p = createDeferredPromise()
let p = createDeferredPromise()

// 2. Let requestObject be the result of invoking the initial value of
// Request as constructor with input and init as arguments. If this throws
Expand Down Expand Up @@ -185,16 +189,17 @@ function fetch (input, init = undefined) {
// 3. Abort controller with requestObject’s signal’s abort reason.
controller.abort(requestObject.signal.reason)

const realResponse = responseObject?.deref()

// 4. Abort the fetch() call with p, request, responseObject,
// and requestObject’s signal’s abort reason.
abortFetch(p, request, responseObject, requestObject.signal.reason)
abortFetch(p, request, realResponse, requestObject.signal.reason)
}
)

// 12. Let handleFetchDone given response response be to finalize and
// report timing with response, globalObject, and "fetch".
const handleFetchDone = (response) =>
finalizeAndReportTiming(response, 'fetch')
// see function handleFetchDone

// 13. Set controller to the result of calling fetch given request,
// with processResponseEndOfBody set to handleFetchDone, and processResponse
Expand Down Expand Up @@ -228,10 +233,11 @@ function fetch (input, init = undefined) {

// 4. Set responseObject to the result of creating a Response object,
// given response, "immutable", and relevantRealm.
responseObject = fromInnerResponse(response, 'immutable')
responseObject = new WeakRef(fromInnerResponse(response, 'immutable'))

// 5. Resolve p with responseObject.
p.resolve(responseObject)
p.resolve(responseObject.deref())
p = null
}

controller = fetching({
Expand Down Expand Up @@ -314,7 +320,10 @@ const markResourceTiming = performance.markResourceTiming
// https://fetch.spec.whatwg.org/#abort-fetch
function abortFetch (p, request, responseObject, error) {
// 1. Reject promise with error.
p.reject(error)
if (p) {
// We might have already resolved the promise at this stage
p.reject(error)
}

// 2. If request’s body is not null and is readable, then cancel request’s
// body with error.
Expand Down Expand Up @@ -1066,7 +1075,10 @@ function fetchFinale (fetchParams, response) {
// 4. If fetchParams’s process response is non-null, then queue a fetch task to run fetchParams’s
// process response given response, with fetchParams’s task destination.
if (fetchParams.processResponse != null) {
queueMicrotask(() => fetchParams.processResponse(response))
queueMicrotask(() => {
fetchParams.processResponse(response)
fetchParams.processResponse = null
})
}

// 5. Let internalResponse be response, if response is a network error; otherwise response’s internal response.
Expand Down Expand Up @@ -1884,7 +1896,11 @@ async function httpNetworkFetch (
// 12. Let cancelAlgorithm be an algorithm that aborts fetchParams’s
// controller with reason, given reason.
const cancelAlgorithm = (reason) => {
fetchParams.controller.abort(reason)
// If the aborted fetch was already terminated, then we do not
// need to do anything.
if (!isCancelled(fetchParams)) {
fetchParams.controller.abort(reason)
}
}

// 13. Let highWaterMark be a non-negative, non-NaN number, chosen by
Expand Down
19 changes: 19 additions & 0 deletions lib/web/fetch/response.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,23 @@ const { URLSerializer } = require('./data-url')
const { kHeadersList, kConstruct } = require('../../core/symbols')
const assert = require('node:assert')
const { types } = require('node:util')
const { isDisturbed, isErrored } = require('node:stream')

const textEncoder = new TextEncoder('utf-8')

const hasFinalizationRegistry = globalThis.FinalizationRegistry && process.version.indexOf('v18') !== 0
let registry

if (hasFinalizationRegistry) {
registry = new FinalizationRegistry((stream) => {
if (!stream.locked && !isDisturbed(stream) && !isErrored(stream)) {
stream.cancel('Response object has been garbage collected').catch(noop)
}
})
}

function noop () {}

// https://fetch.spec.whatwg.org/#response-class
class Response {
// Creates network error Response.
Expand Down Expand Up @@ -510,6 +524,11 @@ function fromInnerResponse (innerResponse, guard) {
response[kHeaders] = new Headers(kConstruct)
response[kHeaders][kHeadersList] = innerResponse.headersList
response[kHeaders][kGuard] = guard

if (hasFinalizationRegistry && innerResponse.body?.stream) {
registry.register(response, innerResponse.body.stream)
}

return response
}

Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@
"test:eventsource:nobuild": "borp --expose-gc -p \"test/eventsource/*.js\"",
"test:fuzzing": "node test/fuzzing/fuzzing.test.js",
"test:fetch": "npm run build:node && npm run test:fetch:nobuild",
"test:fetch:nobuild": "borp --expose-gc -p \"test/fetch/*.js\" && npm run test:webidl && npm run test:busboy",
"test:fetch:nobuild": "borp --timeout 180000 --expose-gc --concurrency 1 -p \"test/fetch/*.js\" && npm run test:webidl && npm run test:busboy",
"test:interceptors": "borp -p \"test/interceptors/*.js\"",
"test:jest": "cross-env NODE_V8_COVERAGE= jest",
"test:unit": "borp --expose-gc -p \"test/*.js\"",
Expand Down
53 changes: 53 additions & 0 deletions test/fetch/fire-and-forget.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
'use strict'

const { randomFillSync } = require('node:crypto')
const { setTimeout: sleep } = require('timers/promises')
const { test } = require('node:test')
const { fetch, Agent, setGlobalDispatcher } = require('../..')
const { createServer } = require('node:http')
const { closeServerAsPromise } = require('../utils/node-http')

const blob = randomFillSync(new Uint8Array(1024 * 512))

// Enable when/if FinalizationRegistry in Node.js 18 becomes stable again
const isNode18 = process.version.startsWith('v18')

test('does not need the body to be consumed to continue', { timeout: 180_000, skip: isNode18 }, async (t) => {
const agent = new Agent({
keepAliveMaxTimeout: 10,
keepAliveTimeoutThreshold: 10
})
setGlobalDispatcher(agent)
const server = createServer((req, res) => {
res.writeHead(200)
res.end(blob)
})
t.after(closeServerAsPromise(server))

await new Promise((resolve) => {
server.listen(0, resolve)
})

const url = new URL(`http://127.0.0.1:${server.address().port}`)

const batch = 50
const delay = 0
let total = 0
while (total < 10000) {
// eslint-disable-next-line no-undef
gc(true)
const array = new Array(batch)
for (let i = 0; i < batch; i++) {
array[i] = fetch(url).catch(() => {})
}
await Promise.all(array)
await sleep(delay)

console.log(
'RSS',
(process.memoryUsage.rss() / 1024 / 1024) | 0,
'MB after',
(total += batch) + ' fetch() requests'
)
}
})
Loading