Skip to content

Commit

Permalink
remove anti-pattern dispatcher hooks
Browse files Browse the repository at this point in the history
onBodySent on onRequestSent are footguns that easily cause bugs
when implementing logic will send multiple requests, e.g. redirect
and retry.

To achieve similar functionality wrap body into a stream and listen
to 'data' and 'end' events.

Refs: #2722
  • Loading branch information
ronag committed Feb 9, 2024
1 parent 0a069ab commit 1c71f36
Show file tree
Hide file tree
Showing 20 changed files with 11 additions and 369 deletions.
11 changes: 0 additions & 11 deletions docs/api/DiagnosticsChannel.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,6 @@ diagnosticsChannel.channel('undici:request:create').subscribe(({ request }) => {

Note: a request is only loosely completed to a given socket.


## `undici:request:bodySent`

```js
import diagnosticsChannel from 'diagnostics_channel'

diagnosticsChannel.channel('undici:request:bodySent').subscribe(({ request }) => {
// request is the same object undici:request:create
})
```

## `undici:request:headers`

This message is published after the response headers have been received, i.e. the response has been completed.
Expand Down
2 changes: 0 additions & 2 deletions docs/api/Dispatcher.md
Original file line number Diff line number Diff line change
Expand Up @@ -209,11 +209,9 @@ Returns: `Boolean` - `false` if dispatcher is busy and further dispatch calls wo
* **onConnect** `(abort: () => void, context: object) => void` - Invoked before request is dispatched on socket. May be invoked multiple times when a request is retried when the request at the head of the pipeline fails.
* **onError** `(error: Error) => void` - Invoked when an error has occurred. May not throw.
* **onUpgrade** `(statusCode: number, headers: Buffer[], socket: Duplex) => void` (optional) - Invoked when request is upgraded. Required if `DispatchOptions.upgrade` is defined or `DispatchOptions.method === 'CONNECT'`.
* **onResponseStarted** `() => void` (optional) - Invoked when response is received, before headers have been read.
* **onHeaders** `(statusCode: number, headers: Buffer[], resume: () => void, statusText: string) => boolean` - Invoked when statusCode and headers have been received. May be invoked multiple times due to 1xx informational headers. Not required for `upgrade` requests.
* **onData** `(chunk: Buffer) => boolean` - Invoked when response payload data is received. Not required for `upgrade` requests.
* **onComplete** `(trailers: Buffer[]) => void` - Invoked when response payload and trailers have been received and the request has completed. Not required for `upgrade` requests.
* **onBodySent** `(chunk: string | Buffer | Uint8Array) => void` - Invoked when a body chunk is sent to the server. Not required. For a stream or iterable body this will be invoked for every chunk. For other body types, it will be invoked once after the body is sent.

#### Example 1 - Dispatch GET request

Expand Down
8 changes: 0 additions & 8 deletions docs/api/RedirectHandler.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,3 @@ Called when the request is complete.
Parameters:

- **trailers** `object` - The trailers received.

#### `onBodySent(chunk)`

Called when the request body is sent.

Parameters:

- **chunk** `Buffer` - The chunk of the request body sent.
2 changes: 0 additions & 2 deletions docs/api/RetryHandler.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ const handler = new RetryHandler(
},
handler: {
onConnect() {},
onBodySent() {},
onHeaders(status, _rawHeaders, resume, _statusMessage) {
// do something with headers
},
Expand All @@ -98,7 +97,6 @@ const handler = new RetryHandler(dispatchOptions, {
dispatch: client.dispatch.bind(client),
handler: {
onConnect() {},
onBodySent() {},
onHeaders(status, _rawHeaders, resume, _statusMessage) {},
onData(chunk) {},
onComplete() {},
Expand Down
29 changes: 3 additions & 26 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,6 @@ class Parser {
if (!request) {
return -1
}
request.onResponseStarted()
}

onHeaderField (buf) {
Expand Down Expand Up @@ -1606,16 +1605,13 @@ function write (client, request) {
assert(contentLength === null, 'no body must not have content length')
socket.write(`${header}\r\n`, 'latin1')
}
request.onRequestSent()
} else if (util.isBuffer(body)) {
assert(contentLength === body.byteLength, 'buffer body must have content length')

socket.cork()
socket.write(`${header}content-length: ${contentLength}\r\n\r\n`, 'latin1')
socket.write(body)
socket.uncork()
request.onBodySent(body)
request.onRequestSent()
if (!expectsPayload) {
socket[kReset] = true
}
Expand Down Expand Up @@ -1789,7 +1785,6 @@ function writeH2 (client, session, request) {

stream.once('response', headers => {
const { [HTTP2_HEADER_STATUS]: statusCode, ...realHeaders } = headers
request.onResponseStarted()

if (request.onHeaders(Number(statusCode), realHeaders, stream.resume.bind(stream), '') === false) {
stream.pause()
Expand Down Expand Up @@ -1870,15 +1865,13 @@ function writeH2 (client, session, request) {
function writeBodyH2 () {
/* istanbul ignore else: assertion */
if (!body) {
request.onRequestSent()
// Do nothing...
} else if (util.isBuffer(body)) {
assert(contentLength === body.byteLength, 'buffer body must have content length')
stream.cork()
stream.write(body)
stream.uncork()
stream.end()
request.onBodySent(body)
request.onRequestSent()
} else if (util.isBlobLike(body)) {
if (typeof body.stream === 'function') {
writeIterable({
Expand Down Expand Up @@ -1943,22 +1936,14 @@ function writeStream ({ h2stream, body, client, request, socket, contentLength,
if (err) {
util.destroy(body, err)
util.destroy(h2stream, err)
} else {
request.onRequestSent()
}
}
)

pipe.on('data', onPipeData)
pipe.once('end', () => {
pipe.removeListener('data', onPipeData)
util.destroy(pipe)
})

function onPipeData (chunk) {
request.onBodySent(chunk)
}

return
}

Expand Down Expand Up @@ -2074,9 +2059,6 @@ async function writeBlob ({ h2stream, body, client, request, socket, contentLeng
socket.uncork()
}

request.onBodySent(buffer)
request.onRequestSent()

if (!expectsPayload) {
socket[kReset] = true
}
Expand Down Expand Up @@ -2122,15 +2104,13 @@ async function writeIterable ({ h2stream, body, client, request, socket, content
}

const res = h2stream.write(chunk)
request.onBodySent(chunk)
if (!res) {
await waitForDrain()
}
}
} catch (err) {
h2stream.destroy(err)
} finally {
request.onRequestSent()
h2stream.end()
h2stream
.off('close', onDrain)
Expand Down Expand Up @@ -2181,7 +2161,7 @@ class AsyncWriter {
}

write (chunk) {
const { socket, request, contentLength, client, bytesWritten, expectsPayload, header } = this
const { socket, contentLength, client, bytesWritten, expectsPayload, header } = this

if (socket[kError]) {
throw socket[kError]
Expand Down Expand Up @@ -2229,8 +2209,6 @@ class AsyncWriter {

socket.uncork()

request.onBodySent(chunk)

if (!ret) {
if (socket[kParser].timeout && socket[kParser].timeoutType === TIMEOUT_HEADERS) {
// istanbul ignore else: only for jest
Expand All @@ -2244,8 +2222,7 @@ class AsyncWriter {
}

end () {
const { socket, contentLength, client, bytesWritten, expectsPayload, header, request } = this
request.onRequestSent()
const { socket, contentLength, client, bytesWritten, expectsPayload, header } = this

socket[kWriting] = false

Expand Down
1 change: 0 additions & 1 deletion lib/core/diagnostics.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ const channels = {
sendHeaders: diagnosticsChannel.channel('undici:client:sendHeaders'),
// Request
create: diagnosticsChannel.channel('undici:request:create'),
bodySent: diagnosticsChannel.channel('undici:request:bodySent'),
headers: diagnosticsChannel.channel('undici:request:headers'),
trailers: diagnosticsChannel.channel('undici:request:trailers'),
error: diagnosticsChannel.channel('undici:request:error'),
Expand Down
28 changes: 0 additions & 28 deletions lib/core/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -201,30 +201,6 @@ class Request {
}
}

onBodySent (chunk) {
if (this[kHandler].onBodySent) {
try {
return this[kHandler].onBodySent(chunk)
} catch (err) {
this.abort(err)
}
}
}

onRequestSent () {
if (channels.bodySent.hasSubscribers) {
channels.bodySent.publish({ request: this })
}

if (this[kHandler].onRequestSent) {
try {
return this[kHandler].onRequestSent()
} catch (err) {
this.abort(err)
}
}
}

onConnect (abort) {
assert(!this.aborted)
assert(!this.completed)
Expand All @@ -237,10 +213,6 @@ class Request {
}
}

onResponseStarted () {
return this[kHandler].onResponseStarted?.()
}

onHeaders (statusCode, headers, resume, statusText) {
assert(!this.aborted)
assert(!this.completed)
Expand Down
4 changes: 0 additions & 4 deletions lib/core/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -323,10 +323,6 @@ function validateHandler (handler, method, upgrade) {
throw new InvalidArgumentError('invalid onError method')
}

if (typeof handler.onBodySent !== 'function' && handler.onBodySent !== undefined) {
throw new InvalidArgumentError('invalid onBodySent method')
}

if (upgrade || method === 'CONNECT') {
if (typeof handler.onUpgrade !== 'function') {
throw new InvalidArgumentError('invalid onUpgrade method')
Expand Down
4 changes: 1 addition & 3 deletions lib/fetch/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2113,15 +2113,13 @@ async function httpNetworkFetch (
timingInfo.finalNetworkRequestStartTime = coarsenedSharedCurrentTime(fetchParams.crossOriginIsolatedCapability)
},

onResponseStarted () {
onHeaders (status, rawHeaders, resume, statusText) {
// Set timingInfo’s final network-response start time to the coarsened shared current
// time given fetchParams’s cross-origin isolated capability, immediately after the
// user agent’s HTTP parser receives the first byte of the response (e.g., frame header
// bytes for HTTP/2 or response status line for HTTP/1.x).
timingInfo.finalNetworkResponseStartTime = coarsenedSharedCurrentTime(fetchParams.crossOriginIsolatedCapability)
},

onHeaders (status, rawHeaders, resume, statusText) {
if (status < 200) {
return
}
Expand Down
4 changes: 0 additions & 4 deletions lib/handler/DecoratorHandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,4 @@ module.exports = class DecoratorHandler {
onComplete (...args) {
return this.handler.onComplete(...args)
}

onBodySent (...args) {
return this.handler.onBodySent(...args)
}
}
6 changes: 0 additions & 6 deletions lib/handler/RedirectHandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -173,12 +173,6 @@ class RedirectHandler {
this.handler.onComplete(trailers)
}
}

onBodySent (chunk) {
if (this.handler.onBodySent) {
this.handler.onBodySent(chunk)
}
}
}

function parseLocation (statusCode, headers) {
Expand Down
10 changes: 0 additions & 10 deletions lib/handler/RetryHandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,6 @@ class RetryHandler {
})
}

onRequestSent () {
if (this.handler.onRequestSent) {
this.handler.onRequestSent()
}
}

onUpgrade (statusCode, headers, socket) {
if (this.handler.onUpgrade) {
this.handler.onUpgrade(statusCode, headers, socket)
Expand All @@ -94,10 +88,6 @@ class RetryHandler {
}
}

onBodySent (chunk) {
if (this.handler.onBodySent) return this.handler.onBodySent(chunk)
}

static [kRetryHandlerDefaultRetry] (err, { state, opts }, cb) {
const { statusCode, code, headers } = err
const { method, retryOptions } = opts
Expand Down
5 changes: 1 addition & 4 deletions test/http2.js
Original file line number Diff line number Diff line change
Expand Up @@ -804,7 +804,7 @@ test('Should handle h2 request with body (string or buffer) - dispatch', t => {
stream.end('hello h2!')
})

t.plan(7)
t.plan(6)

server.listen(0, () => {
const client = new Client(`https://localhost:${server.address().port}`, {
Expand Down Expand Up @@ -842,9 +842,6 @@ test('Should handle h2 request with body (string or buffer) - dispatch', t => {
onData (chunk) {
response.push(chunk)
},
onBodySent (body) {
t.equal(body.toString('utf-8'), expectedBody)
},
onComplete () {
t.equal(Buffer.concat(response).toString('utf-8'), 'hello h2!')
t.equal(
Expand Down
8 changes: 0 additions & 8 deletions test/jest/interceptor.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -143,14 +143,6 @@ describe('interceptors with NtlmRequestHandler', () => {
return this.handler.onComplete(...args)
}
}

onBodySent (...args) {
if (this.requestCount < 2) {
// Do nothing
} else {
return this.handler.onBodySent(...args)
}
}
}
let server

Expand Down
Loading

0 comments on commit 1c71f36

Please sign in to comment.