Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added diagnostics channels on fetch #2210

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions benchmarks/benchmark.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const parallelRequests = parseInt(process.env.PARALLEL, 10) || 100
const headersTimeout = parseInt(process.env.HEADERS_TIMEOUT, 10) || 0
const bodyTimeout = parseInt(process.env.BODY_TIMEOUT, 10) || 0
const dest = {}
const diagnosticsChannel = require('diagnostics_channel')

if (process.env.PORT) {
dest.port = process.env.PORT
Expand Down Expand Up @@ -264,6 +265,15 @@ if (process.env.PORT) {
}).catch(console.log)
})
}

experiments['undici - fetch (with diagnostics channel)'] = () => {
diagnosticsChannel.channel('undici:fetch:asyncEnd').subscribe(() => {})
return makeParallelRequests(resolve => {
fetch(dest.url).then(res => {
res.body.pipeTo(new WritableStream({ write () {}, close () { resolve() } }))
}).catch(console.log)
})
}
}

async function main () {
Expand Down
73 changes: 73 additions & 0 deletions docs/api/DiagnosticsChannel.md
Original file line number Diff line number Diff line change
Expand Up @@ -202,3 +202,76 @@ diagnosticsChannel.channel('undici:websocket:pong').subscribe(({ payload }) => {
console.log(payload)
})
```

The below channels collectively act as [`tracingChannel.tracePromise`](https://nodejs.org/api/diagnostics_channel.html#tracingchanneltracepromisefn-context-thisarg-args) on `fetch`. So all of them will publish the arguments passed to `fetch`.

## `undici:fetch:start`

This message is published when `fetch` is called, and will publish the arguments passed to `fetch`.

```js
import diagnosticsChannel from 'diagnostics_channel'

diagnosticsChannel.channel('undici:fetch:start').subscribe(({ input, init }) => {
console.log('input', input)
console.log('init', init)
})
```

## `undici:fetch:end`

This message is published at the end of `fetch`'s execution, and will publish any `error` from the synchronous part of `fetch`. Since `fetch` is asynchronous, this should be empty. This channel will publish the same values as `undici:fetch:start`, but we are including it to track when `fetch` finishes execution and to be consistent with [`TracingChannel`](https://nodejs.org/api/diagnostics_channel.html#class-tracingchannel).

```js
import diagnosticsChannel from 'diagnostics_channel'

diagnosticsChannel.channel('undici:fetch:end').subscribe(({ input, init, error }) => {
console.log('input', input)
console.log('init', init)
console.log('error', error) // should be empty
})
```

## `undici:fetch:asyncStart`

This message is published after `fetch` resolves or rejects. If `fetch` resolves, it publishes the response in `result`. If it rejects, it publishes the error in `error`.

```js
import diagnosticsChannel from 'diagnostics_channel'

diagnosticsChannel.channel('undici:fetch:asyncStart').subscribe(({ input, init, result, error }) => {
console.log('input', input)
console.log('init', init)
console.log('response', result)
console.log('error', error)
})
```

## `undici:fetch:asyncEnd`

This channel gets published the same values as and at the same time as `undici:fetch:asyncStart` in the case of [`tracingChannel.tracePromise`](https://nodejs.org/api/diagnostics_channel.html#tracingchanneltracepromisefn-context-thisarg-args)

```js
import diagnosticsChannel from 'diagnostics_channel'

diagnosticsChannel.channel('undici:fetch:asyncEnd').subscribe(({ input, init, result, error }) => {
console.log('input', input)
console.log('init', init)
console.log('response', result)
console.log('error', error)
})
```

## `undici:fetch:error`

This message is published when an error is thrown or promise rejects while calling `fetch`.

```js
import diagnosticsChannel from 'diagnostics_channel'

diagnosticsChannel.channel('undici:fetch:error').subscribe(({ input, init, error }) => {
console.log('input', input)
console.log('init', init)
console.log('error', error)
})
```
58 changes: 51 additions & 7 deletions lib/fetch/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ const {
crossOriginResourcePolicyCheck,
determineRequestsReferrer,
coarsenedSharedCurrentTime,
createDeferredPromise,
isBlobLike,
sameOrigin,
isCancelled,
Expand Down Expand Up @@ -120,12 +119,51 @@ class Fetch extends EE {
}
}

const channels = {}

try {
const diagnosticsChannel = require('diagnostics_channel')
channels.start = diagnosticsChannel.channel('undici:fetch:start')
channels.end = diagnosticsChannel.channel('undici:fetch:end')
channels.asyncStart = diagnosticsChannel.channel('undici:fetch:asyncStart')
channels.asyncEnd = diagnosticsChannel.channel('undici:fetch:asyncEnd')
channels.error = diagnosticsChannel.channel('undici:fetch:error')
} catch {
channels.start = { hasSubscribers: false }
channels.end = { hasSubscribers: false }
channels.asyncStart = { hasSubscribers: false }
channels.asyncEnd = { hasSubscribers: false }
channels.error = { hasSubscribers: false }
}

function createDeferredPromise (context) {
let res
let rej
const promise = new Promise((resolve, reject) => {
res = (result) => {
context.result = result
if (channels.asyncStart.hasSubscribers) channels.asyncStart.publish(context)
if (channels.asyncEnd.hasSubscribers) channels.asyncEnd.publish(context)
delete context.result
resolve(result)
}
rej = (error) => {
context.error = error
if (channels.error.hasSubscribers) channels.error.publish(context)
if (channels.asyncStart.hasSubscribers) channels.asyncStart.publish(context)
if (channels.asyncEnd.hasSubscribers) channels.asyncEnd.publish(context)
delete context.error
reject(error)
}
})
return { promise, resolve: res, reject: rej }
}

// https://fetch.spec.whatwg.org/#fetch-method
async function fetch (input, init = {}) {
async function innerFetch (input, init = {}, context) {
webidl.argumentLengthCheck(arguments, 1, { header: 'globalThis.fetch' })

// 1. Let p be a new promise.
const p = createDeferredPromise()
const p = createDeferredPromise(context)

// 2. Let requestObject be the result of invoking the initial value of
// Request as constructor with input and init as arguments. If this throws
Expand Down Expand Up @@ -223,9 +261,7 @@ async function fetch (input, init = {}) {
// 3. If response is a network error, then reject p with a TypeError
// and terminate these substeps.
if (response.type === 'error') {
p.reject(
Object.assign(new TypeError('fetch failed'), { cause: response.error })
)
p.reject(Object.assign(new TypeError('fetch failed'), { cause: response.error }))
return
}

Expand Down Expand Up @@ -253,6 +289,14 @@ async function fetch (input, init = {}) {
return p.promise
}

function fetch (input, init = {}) {
const context = { input, init }
if (channels.start.hasSubscribers) channels.start.publish(context)
const promise = innerFetch(input, init, context)
if (channels.end.hasSubscribers) channels.end.publish(context)
return promise
}

// https://fetch.spec.whatwg.org/#finalize-and-report-timing
function finalizeAndReportTiming (response, initiatorType = 'other') {
// 1. If response is an aborted network error, then return.
Expand Down
114 changes: 114 additions & 0 deletions test/diagnostics-channel/fetch.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
'use strict'

const t = require('tap')
const fetch = require('../..').fetch

let diagnosticsChannel

try {
diagnosticsChannel = require('diagnostics_channel')
} catch {
t.skip('missing diagnostics_channel')
process.exit(0)
}

const { createServer } = require('http')

t.plan(37)

const server = createServer((req, res) => {
res.setHeader('Content-Type', 'text/plain')
res.setHeader('trailer', 'foo')
res.write('hello')
res.addTrailers({
foo: 'oof'
})
res.end()
})
t.teardown(server.close.bind(server))

let startCalled = 0
diagnosticsChannel.channel('undici:fetch:start').subscribe(({ input, init }) => {
startCalled += 1
if (init.redirect) {
t.equal(input, 'badrequest')
t.same(init, { redirect: 'error' })
} else {
t.equal(input, `http://localhost:${server.address().port}`)
t.same(init, {})
}
})

let endCalled = 0
diagnosticsChannel.channel('undici:fetch:end').subscribe(({ input, init, result, error }) => {
endCalled += 1
if (init.redirect) {
t.equal(input, 'badrequest')
t.same(init, { redirect: 'error' })
} else {
t.equal(input, `http://localhost:${server.address().port}`)
t.same(init, {})
}
t.notOk(result)
t.notOk(error)
})

let asyncStartCalled = 0
diagnosticsChannel.channel('undici:fetch:asyncStart').subscribe(({ input, init, result }) => {
asyncStartCalled += 1
if (init.redirect) {
t.equal(input, 'badrequest')
t.same(init, { redirect: 'error' })
t.notOk(result)
} else {
t.equal(input, `http://localhost:${server.address().port}`)
t.same(init, {})
t.ok(result)
}
})

let asyncEndCalled = 0
diagnosticsChannel.channel('undici:fetch:asyncEnd').subscribe(async ({ input, init, result, error }) => {
asyncEndCalled += 1
if (init.redirect) {
t.equal(input, 'badrequest')
t.same(init, { redirect: 'error' })
t.notOk(result)
t.ok(error)
t.equal(error.cause.code, 'ERR_INVALID_URL')
} else {
t.equal(input, `http://localhost:${server.address().port}`)
t.same(init, {})
t.ok(result)
t.equal(result.status, 200)
t.notOk(error)
}
})

let errorCalled = 0
diagnosticsChannel.channel('undici:fetch:error').subscribe(async ({ input, init, error }) => {
errorCalled += 1
if (init.redirect) {
t.equal(input, 'badrequest')
t.same(init, { redirect: 'error' })
t.ok(error)
t.equal(error.cause.code, 'ERR_INVALID_URL')
} else {
t.equal(input, `http://localhost:${server.address().port}`)
t.same(init, {})
t.notOk(error)
}
})

server.listen(0, async () => {
await fetch(`http://localhost:${server.address().port}`)
try {
await fetch('badrequest', { redirect: 'error' })
} catch (e) {}
server.close()
t.equal(startCalled, 2)
t.equal(endCalled, 2)
t.equal(asyncStartCalled, 2)
t.equal(asyncEndCalled, 2)
t.equal(errorCalled, 1)
})
Loading