Skip to content

Commit

Permalink
fix: use fasttimers for all connection timeouts (#3552)
Browse files Browse the repository at this point in the history
* fix: use fasttimers for all connection timeouts

* Apply suggestions from code review

* activate some tests

* also use fastTimers in connect.js

* fix tests

* fix tests

* fix tests

* fix: use native timeouts for TIMEOUT_IDLE, rename TIMEOUT_IDLE to TIMEOUT_KEEP_ALIVE (#3554)

* fix: use native timeouts for TIMEOUT_IDLE, rename TIMEOUT_IDLE to TIMEOUT_KEEP_ALIVE

* ensure that fast timers and native timers are set properly

* .

* .

* rename import

* more informative connection error

* ignore request-timeout binary file, rename clearAll to reset

* fix

* add test

* use queueMicrotask earlier in the socket callbacks
  • Loading branch information
Uzlopak authored Sep 8, 2024
1 parent 8ce5ff3 commit dca0aa0
Show file tree
Hide file tree
Showing 8 changed files with 191 additions and 153 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,6 @@ undici-fetch.js
.npmrc

.tap

# File generated by /test/request-timeout.js
test/request-timeout.10mb.bin
3 changes: 3 additions & 0 deletions .npmignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,6 @@ lib/llhttp/llhttp.wasm
!index.d.ts
!docs/docs/**/*
!scripts/strip-comments.js

# File generated by /test/request-timeout.js
test/request-timeout.10mb.bin
69 changes: 48 additions & 21 deletions lib/core/connect.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ const util = require('./util')
const { InvalidArgumentError, ConnectTimeoutError } = require('./errors')
const timers = require('../util/timers')

function noop () {}

let tls // include tls conditionally since it is not always available

// TODO: session re-use does not wait for the first
Expand Down Expand Up @@ -96,6 +98,8 @@ function buildConnector ({ allowH2, maxCachedSessions, socketPath, timeout, sess

const session = customSession || sessionCache.get(sessionKey) || null

port = port || 443

socket = tls.connect({
highWaterMark: 16384, // TLS in node can't have bigger HWM anyway...
...options,
Expand All @@ -105,7 +109,7 @@ function buildConnector ({ allowH2, maxCachedSessions, socketPath, timeout, sess
// TODO(HTTP/2): Add support for h2c
ALPNProtocols: allowH2 ? ['http/1.1', 'h2'] : ['http/1.1'],
socket: httpSocket, // upgrade socket connection
port: port || 443,
port,
host: hostname
})

Expand All @@ -116,11 +120,14 @@ function buildConnector ({ allowH2, maxCachedSessions, socketPath, timeout, sess
})
} else {
assert(!httpSocket, 'httpSocket can only be sent on TLS update')

port = port || 80

socket = net.connect({
highWaterMark: 64 * 1024, // Same as nodejs fs streams.
...options,
localAddress,
port: port || 80,
port,
host: hostname
})
}
Expand All @@ -131,12 +138,12 @@ function buildConnector ({ allowH2, maxCachedSessions, socketPath, timeout, sess
socket.setKeepAlive(true, keepAliveInitialDelay)
}

const cancelConnectTimeout = setupConnectTimeout(new WeakRef(socket), timeout)
const clearConnectTimeout = setupConnectTimeout(new WeakRef(socket), { timeout, hostname, port })

socket
.setNoDelay(true)
.once(protocol === 'https:' ? 'secureConnect' : 'connect', function () {
cancelConnectTimeout()
queueMicrotask(clearConnectTimeout)

if (callback) {
const cb = callback
Expand All @@ -145,7 +152,7 @@ function buildConnector ({ allowH2, maxCachedSessions, socketPath, timeout, sess
}
})
.on('error', function (err) {
cancelConnectTimeout()
queueMicrotask(clearConnectTimeout)

if (callback) {
const cb = callback
Expand All @@ -158,50 +165,70 @@ function buildConnector ({ allowH2, maxCachedSessions, socketPath, timeout, sess
}
}

/**
* @param {WeakRef<net.Socket>} socketWeakRef
* @param {object} opts
* @param {number} opts.timeout
* @param {string} opts.hostname
* @param {number} opts.port
* @returns {() => void}
*/
const setupConnectTimeout = process.platform === 'win32'
? (socket, timeout) => {
if (!timeout) {
return () => { }
? (socketWeakRef, opts) => {
if (!opts.timeout) {
return noop
}

let s1 = null
let s2 = null
const timer = timers.setTimeout(() => {
const fastTimer = timers.setFastTimeout(() => {
// setImmediate is added to make sure that we prioritize socket error events over timeouts
s1 = setImmediate(() => {
// Windows needs an extra setImmediate probably due to implementation differences in the socket logic
s2 = setImmediate(() => onConnectTimeout(socket.deref()))
s2 = setImmediate(() => onConnectTimeout(socketWeakRef.deref(), opts))
})
}, timeout)
}, opts.timeout)
return () => {
timers.clearTimeout(timer)
timers.clearFastTimeout(fastTimer)
clearImmediate(s1)
clearImmediate(s2)
}
}
: (socket, timeout) => {
if (!timeout) {
return () => { }
: (socketWeakRef, opts) => {
if (!opts.timeout) {
return noop
}

let s1 = null
const timer = timers.setTimeout(() => {
const fastTimer = timers.setFastTimeout(() => {
// setImmediate is added to make sure that we prioritize socket error events over timeouts
s1 = setImmediate(() => {
onConnectTimeout(socket.deref())
onConnectTimeout(socketWeakRef.deref(), opts)
})
}, timeout)
}, opts.timeout)
return () => {
timers.clearTimeout(timer)
timers.clearFastTimeout(fastTimer)
clearImmediate(s1)
}
}

function onConnectTimeout (socket) {
/**
* @param {net.Socket} socket
* @param {object} opts
* @param {number} opts.timeout
* @param {string} opts.hostname
* @param {number} opts.port
*/
function onConnectTimeout (socket, opts) {
let message = 'Connect Timeout Error'
if (Array.isArray(socket.autoSelectFamilyAttemptedAddresses)) {
message += ` (attempted addresses: ${socket.autoSelectFamilyAttemptedAddresses.join(', ')})`
message += ` (attempted addresses: ${socket.autoSelectFamilyAttemptedAddresses.join(', ')},`
} else {
message += ` (attempted address: ${opts.hostname}:${opts.port},`
}

message += ` timeout: ${opts.timeout}ms)`

util.destroy(socket, new ConnectTimeoutError(message))
}

Expand Down
49 changes: 35 additions & 14 deletions lib/dispatcher/client-h1.js
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,17 @@ let currentBufferRef = null
let currentBufferSize = 0
let currentBufferPtr = null

const TIMEOUT_HEADERS = 1
const TIMEOUT_BODY = 2
const TIMEOUT_IDLE = 3
const USE_NATIVE_TIMER = 0
const USE_FAST_TIMER = 1

// Use fast timers for headers and body to take eventual event loop
// latency into account.
const TIMEOUT_HEADERS = 2 | USE_FAST_TIMER
const TIMEOUT_BODY = 4 | USE_FAST_TIMER

// Use native timers to ignore event loop latency for keep-alive
// handling.
const TIMEOUT_KEEP_ALIVE = 8 | USE_NATIVE_TIMER

class Parser {
/**
Expand Down Expand Up @@ -224,25 +232,38 @@ class Parser {
}

setTimeout (delay, type) {
this.timeoutType = type
if (delay !== this.timeoutValue) {
this.timeout && timers.clearTimeout(this.timeout)
// If the existing timer and the new timer are of different timer type
// (fast or native) or have different delay, we need to clear the existing
// timer and set a new one.
if (
delay !== this.timeoutValue ||
(type & USE_FAST_TIMER) ^ (this.timeoutType & USE_FAST_TIMER)
) {
// If a timeout is already set, clear it with clearTimeout of the fast
// timer implementation, as it can clear fast and native timers.
if (this.timeout) {
timers.clearTimeout(this.timeout)
this.timeout = null
}

if (delay) {
this.timeout = timers.setTimeout(onParserTimeout, delay, new WeakRef(this))
// istanbul ignore else: only for jest
if (this.timeout.unref) {
if (type & USE_FAST_TIMER) {
this.timeout = timers.setFastTimeout(onParserTimeout, delay, new WeakRef(this))
} else {
this.timeout = setTimeout(onParserTimeout, delay, new WeakRef(this))
this.timeout.unref()
}
} else {
this.timeout = null
}

this.timeoutValue = delay
} else if (this.timeout) {
// istanbul ignore else: only for jest
if (this.timeout.refresh) {
this.timeout.refresh()
}
}

this.timeoutType = type
}

resume () {
Expand Down Expand Up @@ -735,7 +756,7 @@ function onParserTimeout (parser) {
if (!paused) {
util.destroy(socket, new BodyTimeoutError())
}
} else if (timeoutType === TIMEOUT_IDLE) {
} else if (timeoutType === TIMEOUT_KEEP_ALIVE) {
assert(client[kRunning] === 0 && client[kKeepAliveTimeoutValue])
util.destroy(socket, new InformationalError('socket idle timeout'))
}
Expand Down Expand Up @@ -930,8 +951,8 @@ function resumeH1 (client) {
}

if (client[kSize] === 0) {
if (socket[kParser].timeoutType !== TIMEOUT_IDLE) {
socket[kParser].setTimeout(client[kKeepAliveTimeoutValue], TIMEOUT_IDLE)
if (socket[kParser].timeoutType !== TIMEOUT_KEEP_ALIVE) {
socket[kParser].setTimeout(client[kKeepAliveTimeoutValue], TIMEOUT_KEEP_ALIVE)
}
} else if (client[kRunning] > 0 && socket[kParser].statusCode < 200) {
if (socket[kParser].timeoutType !== TIMEOUT_HEADERS) {
Expand Down
49 changes: 48 additions & 1 deletion lib/util/timers.js
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ module.exports = {
* The clearTimeout method cancels an instantiated Timer previously created
* by calling setTimeout.
*
* @param {FastTimer} timeout
* @param {NodeJS.Timeout|FastTimer} timeout
*/
clearTimeout (timeout) {
// If the timeout is a FastTimer, call its own clear method.
Expand All @@ -362,6 +362,29 @@ module.exports = {
nativeClearTimeout(timeout)
}
},
/**
* The setFastTimeout() method sets a fastTimer which executes a function once
* the timer expires.
* @param {Function} callback A function to be executed after the timer
* expires.
* @param {number} delay The time, in milliseconds that the timer should
* wait before the specified function or code is executed.
* @param {*} [arg] An optional argument to be passed to the callback function
* when the timer expires.
* @returns {FastTimer}
*/
setFastTimeout (callback, delay, arg) {
return new FastTimer(callback, delay, arg)
},
/**
* The clearTimeout method cancels an instantiated FastTimer previously
* created by calling setFastTimeout.
*
* @param {FastTimer} timeout
*/
clearFastTimeout (timeout) {
timeout.clear()
},
/**
* The now method returns the value of the internal fast timer clock.
*
Expand All @@ -370,6 +393,30 @@ module.exports = {
now () {
return fastNow
},
/**
* Trigger the onTick function to process the fastTimers array.
* Exported for testing purposes only.
* Marking as deprecated to discourage any use outside of testing.
* @deprecated
* @param {number} [delay=0] The delay in milliseconds to add to the now value.
*/
tick (delay = 0) {
fastNow += delay - RESOLUTION_MS + 1
onTick()
onTick()
},
/**
* Reset FastTimers.
* Exported for testing purposes only.
* Marking as deprecated to discourage any use outside of testing.
* @deprecated
*/
reset () {
fastNow = 0
fastTimers.length = 0
clearTimeout(fastNowTimeout)
fastNowTimeout = null
},
/**
* Exporting for testing purposes only.
* Marking as deprecated to discourage any use outside of testing.
Expand Down
11 changes: 8 additions & 3 deletions test/connect-timeout.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@ const skip = !!process.env.CITGM

// Using describe instead of test to avoid the timeout
describe('prioritize socket errors over timeouts', { skip }, async () => {
const t = tspl({ ...assert, after: () => {} }, { plan: 1 })
const t = tspl({ ...assert, after: () => {} }, { plan: 2 })
const client = new Pool('http://foorbar.invalid:1234', { connectTimeout: 1 })

client.request({ method: 'GET', path: '/foobar' })
.then(() => t.fail())
.catch((err) => {
t.strictEqual(err.code, 'ENOTFOUND')
t.strictEqual(err.code !== 'UND_ERR_CONNECT_TIMEOUT', true)
})

Expand All @@ -32,7 +33,7 @@ net.connect = function (options) {
}

test('connect-timeout', { skip }, async t => {
t = tspl(t, { plan: 1 })
t = tspl(t, { plan: 3 })

const client = new Client('http://localhost:9000', {
connectTimeout: 1e3
Expand All @@ -48,14 +49,16 @@ test('connect-timeout', { skip }, async t => {
method: 'GET'
}, (err) => {
t.ok(err instanceof errors.ConnectTimeoutError)
t.strictEqual(err.code, 'UND_ERR_CONNECT_TIMEOUT')
t.strictEqual(err.message, 'Connect Timeout Error (attempted address: localhost:9000, timeout: 1000ms)')
clearTimeout(timeout)
})

await t.completed
})

test('connect-timeout', { skip }, async t => {
t = tspl(t, { plan: 1 })
t = tspl(t, { plan: 3 })

const client = new Pool('http://localhost:9000', {
connectTimeout: 1e3
Expand All @@ -71,6 +74,8 @@ test('connect-timeout', { skip }, async t => {
method: 'GET'
}, (err) => {
t.ok(err instanceof errors.ConnectTimeoutError)
t.strictEqual(err.code, 'UND_ERR_CONNECT_TIMEOUT')
t.strictEqual(err.message, 'Connect Timeout Error (attempted address: localhost:9000, timeout: 1000ms)')
clearTimeout(timeout)
})

Expand Down
5 changes: 4 additions & 1 deletion test/issue-3356.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const { tspl } = require('@matteo.collina/tspl')
const { test, after } = require('node:test')
const { createServer } = require('node:http')
const { once } = require('node:events')

const { tick: fastTimersTick } = require('../lib/util/timers')
const { fetch, Agent, RetryAgent } = require('..')

test('https://github.com/nodejs/undici/issues/3356', async (t) => {
Expand Down Expand Up @@ -42,6 +42,9 @@ test('https://github.com/nodejs/undici/issues/3356', async (t) => {
const response = await fetch(`http://localhost:${server.address().port}`, {
dispatcher: agent
})

fastTimersTick()

setTimeout(async () => {
try {
t.equal(response.status, 200)
Expand Down
Loading

0 comments on commit dca0aa0

Please sign in to comment.