Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use FinalizationRegistry to cancel the body if response is collected #3199

Merged
merged 26 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 20 additions & 8 deletions lib/web/fetch/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,16 @@ class Fetch extends EE {
}
}

function handleFetchDone (response) {
finalizeAndReportTiming(response, 'fetch')
}

// https://fetch.spec.whatwg.org/#fetch-method
function fetch (input, init = undefined) {
webidl.argumentLengthCheck(arguments, 1, 'globalThis.fetch')

// 1. Let p be a new promise.
const p = createDeferredPromise()
let p = createDeferredPromise()

// 2. Let requestObject be the result of invoking the initial value of
// Request as constructor with input and init as arguments. If this throws
Expand Down Expand Up @@ -185,16 +189,17 @@ function fetch (input, init = undefined) {
// 3. Abort controller with requestObject’s signal’s abort reason.
controller.abort(requestObject.signal.reason)

const realResponse = responseObject?.deref()

// 4. Abort the fetch() call with p, request, responseObject,
// and requestObject’s signal’s abort reason.
abortFetch(p, request, responseObject, requestObject.signal.reason)
abortFetch(p, request, realResponse, requestObject.signal.reason)
}
)

// 12. Let handleFetchDone given response response be to finalize and
// report timing with response, globalObject, and "fetch".
const handleFetchDone = (response) =>
finalizeAndReportTiming(response, 'fetch')
// see function handleFetchDone

// 13. Set controller to the result of calling fetch given request,
// with processResponseEndOfBody set to handleFetchDone, and processResponse
Expand Down Expand Up @@ -228,10 +233,11 @@ function fetch (input, init = undefined) {

// 4. Set responseObject to the result of creating a Response object,
// given response, "immutable", and relevantRealm.
responseObject = fromInnerResponse(response, 'immutable')
responseObject = new WeakRef(fromInnerResponse(response, 'immutable'))

// 5. Resolve p with responseObject.
p.resolve(responseObject)
p.resolve(responseObject.deref())
p = null
}

controller = fetching({
Expand Down Expand Up @@ -314,7 +320,10 @@ const markResourceTiming = performance.markResourceTiming
// https://fetch.spec.whatwg.org/#abort-fetch
function abortFetch (p, request, responseObject, error) {
// 1. Reject promise with error.
p.reject(error)
if (p) {
// We might have already resolved the promise at this stage
p.reject(error)
}

// 2. If request’s body is not null and is readable, then cancel request’s
// body with error.
Expand Down Expand Up @@ -1066,7 +1075,10 @@ function fetchFinale (fetchParams, response) {
// 4. If fetchParams’s process response is non-null, then queue a fetch task to run fetchParams’s
// process response given response, with fetchParams’s task destination.
if (fetchParams.processResponse != null) {
queueMicrotask(() => fetchParams.processResponse(response))
queueMicrotask(() => {
fetchParams.processResponse(response)
fetchParams.processResponse = null
})
}

// 5. Let internalResponse be response, if response is a network error; otherwise response’s internal response.
Expand Down
17 changes: 17 additions & 0 deletions lib/web/fetch/response.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,21 @@ const { URLSerializer } = require('./data-url')
const { kHeadersList, kConstruct } = require('../../core/symbols')
const assert = require('node:assert')
const { types } = require('node:util')
const { isDisturbed } = require('node:stream')

const textEncoder = new TextEncoder('utf-8')

const hasFinalizationRegistry = globalThis.FinalizationRegistry && process.version.indexOf('v18') !== 0
let registry

if (hasFinalizationRegistry) {
registry = new FinalizationRegistry((stream) => {
if (!stream.locked && !isDisturbed(stream)) {
stream.cancel('Response object has been garbage collected')
tsctx marked this conversation as resolved.
Show resolved Hide resolved
}
})
}

// https://fetch.spec.whatwg.org/#response-class
class Response {
// Creates network error Response.
Expand Down Expand Up @@ -510,6 +522,11 @@ function fromInnerResponse (innerResponse, guard) {
response[kHeaders] = new Headers(kConstruct)
response[kHeaders][kHeadersList] = innerResponse.headersList
response[kHeaders][kGuard] = guard

if (hasFinalizationRegistry && innerResponse.body?.stream) {
registry.register(response, innerResponse.body.stream)
}

return response
}

Expand Down
49 changes: 49 additions & 0 deletions test/fetch/fire-and-forget.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
'use strict'

Check failure on line 1 in test/fetch/fire-and-forget.js

View workflow job for this annotation

GitHub Actions / Test with Node.js 20 compiled --without-intl

test/fetch/fire-and-forget.js

[Error: test failed] { code: 'ERR_TEST_FAILURE', failureType: 'testCodeFailure', cause: 'test failed', exitCode: 1, signal: null }

Check failure on line 1 in test/fetch/fire-and-forget.js

View workflow job for this annotation

GitHub Actions / test (20, macos-latest) / Test with Node.js 20 on macos-latest

test/fetch/fire-and-forget.js

[Error [ERR_TEST_FAILURE]: test timed out after 30000ms] { code: 'ERR_TEST_FAILURE', failureType: 'testTimeoutFailure', cause: 'test timed out after 30000ms' }

Check failure on line 1 in test/fetch/fire-and-forget.js

View workflow job for this annotation

GitHub Actions / test (21, macos-latest) / Test with Node.js 21 on macos-latest

test/fetch/fire-and-forget.js

[Error [ERR_TEST_FAILURE]: test timed out after 30000ms] { code: 'ERR_TEST_FAILURE', failureType: 'testTimeoutFailure', cause: 'test timed out after 30000ms' }

Check failure on line 1 in test/fetch/fire-and-forget.js

View workflow job for this annotation

GitHub Actions / test (18, macos-latest) / Test with Node.js 18 on macos-latest

test/fetch/fire-and-forget.js

[Error [ERR_TEST_FAILURE]: test timed out after 30000ms] { failureType: 'testTimeoutFailure', cause: 'test timed out after 30000ms', code: 'ERR_TEST_FAILURE' }

const { randomFillSync } = require('node:crypto')
const { setTimeout: sleep } = require('timers/promises')
const { test } = require('node:test')
const { fetch, Agent, setGlobalDispatcher } = require('../..')
const { createServer } = require('node:http')
const { closeServerAsPromise } = require('../utils/node-http')

const blob = randomFillSync(new Uint8Array(1024 * 512))
const fmt = new Intl.NumberFormat()

test('does not need the body to be consumed to continue', async (t) => {
const agent = new Agent({
keepAliveMaxTimeout: 10,
keepAliveTimeoutThreshold: 10
})
setGlobalDispatcher(agent)
const server = createServer((req, res) => {
res.writeHead(200)
res.end(blob)
})
t.after(closeServerAsPromise(server))

await new Promise((resolve) => {
server.listen(0, resolve)
})

const url = new URL(`http://127.0.0.1:${server.address().port}`)

const batch = 50
const delay = 0
let total = 0
while (total < 10000) {
const array = new Array(batch)
for (let i = 0; i < batch; i++) {
array[i] = fetch(url).catch(() => {})
}
await Promise.all(array)
await sleep(delay)

console.log(
'RSS',
(process.memoryUsage.rss() / 1024 / 1024) | 0,
'MB after',
fmt.format((total += batch)) + ' fetch() requests'
)
}
})
Loading