Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(@libp2p/tcp): race condition in onSocket #2763

Merged
merged 5 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/transport-tcp/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
"@multiformats/multiaddr": "^12.2.3",
"@types/sinon": "^17.0.3",
"p-defer": "^4.0.1",
"p-event": "^6.0.1",
"progress-events": "^1.0.0",
"race-event": "^1.3.0",
"stream-to-it": "^1.0.1"
Expand Down
72 changes: 43 additions & 29 deletions packages/transport-tcp/src/listener.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import net from 'net'
import { AbortError, AlreadyStartedError, InvalidParametersError, NotStartedError, TypedEventEmitter } from '@libp2p/interface'
import { AlreadyStartedError, InvalidParametersError, NotStartedError, TypedEventEmitter, setMaxListeners } from '@libp2p/interface'
import { pEvent } from 'p-event'
import { CODE_P2P } from './constants.js'
import { toMultiaddrConnection } from './socket-to-conn.js'
import {
Expand Down Expand Up @@ -67,19 +68,23 @@

export class TCPListener extends TypedEventEmitter<ListenerEvents> implements Listener {
private readonly server: net.Server
/** Keep track of open connections to destroy in case of timeout */
private readonly connections = new Set<MultiaddrConnection>()
/** Keep track of open sockets to destroy in case of timeout */
private readonly sockets = new Set<net.Socket>()
private status: Status = { code: TCPListenerStatusCode.INACTIVE }
private metrics?: TCPListenerMetrics
private addr: string
private readonly log: Logger
private readonly shutdownController: AbortController

constructor (private readonly context: Context) {
super()

context.keepAlive = context.keepAlive ?? true
context.noDelay = context.noDelay ?? true

this.shutdownController = new AbortController()
setMaxListeners(Infinity, this.shutdownController.signal)

this.log = context.logger.forComponent('libp2p:tcp:listener')
this.addr = 'unknown'
this.server = net.createServer(context, this.onSocket.bind(this))
Expand Down Expand Up @@ -119,7 +124,7 @@
help: 'Current active connections in TCP listener',
calculate: () => {
return {
[this.addr]: this.connections.size
[this.addr]: this.sockets.size

Check warning on line 127 in packages/transport-tcp/src/listener.ts

View check run for this annotation

Codecov / codecov/patch

packages/transport-tcp/src/listener.ts#L127

Added line #L127 was not covered by tests
}
}
})
Expand Down Expand Up @@ -195,18 +200,20 @@
}

this.log('new inbound connection %s', maConn.remoteAddr)
this.sockets.add(socket)

this.context.upgrader.upgradeInbound(maConn)
this.context.upgrader.upgradeInbound(maConn, {
signal: this.shutdownController.signal
})
.then((conn) => {
this.log('inbound connection upgraded %s', maConn.remoteAddr)
this.connections.add(maConn)

socket.once('close', () => {
this.connections.delete(maConn)
this.sockets.delete(socket)

if (
this.context.closeServerOnMaxConnections != null &&
this.connections.size < this.context.closeServerOnMaxConnections.listenBelow
this.sockets.size < this.context.closeServerOnMaxConnections.listenBelow
) {
// The most likely case of error is if the port taken by this
// application is bound by another process during the time the
Expand All @@ -227,18 +234,17 @@

if (
this.context.closeServerOnMaxConnections != null &&
this.connections.size >= this.context.closeServerOnMaxConnections.closeAbove
this.sockets.size >= this.context.closeServerOnMaxConnections.closeAbove
) {
this.pause(false).catch(e => {
this.log.error('error attempting to close server once connection count over limit', e)
})
this.pause()
}

this.safeDispatchEvent('connection', { detail: conn })
})
.catch(async err => {
this.log.error('inbound connection upgrade failed', err)
this.metrics?.errors.increment({ [`${this.addr} inbound_upgrade`]: true })
this.sockets.delete(socket)
maConn.abort(err)
})
}
Expand Down Expand Up @@ -300,15 +306,28 @@
}

async close (): Promise<void> {
const err = new AbortError('Listener is closing')
const events: Array<Promise<void>> = []

// synchronously close each connection
this.connections.forEach(conn => {
conn.abort(err)
})
if (this.server.listening) {
events.push(pEvent(this.server, 'close'))
}

// shut down the server socket, permanently
await this.pause(true)
this.pause(true)

// stop any in-progress connection upgrades
this.shutdownController.abort()

// synchronously close any open connections - should be done after closing
// the server socket in case new sockets are opened during the shutdown
this.sockets.forEach(socket => {
if (socket.readable) {
events.push(pEvent(socket, 'close'))
socket.destroy()
}
})

await Promise.all(events)
}

/**
Expand All @@ -332,7 +351,7 @@
this.log('listening on %s', this.server.address())
}

private async pause (permanent: boolean): Promise<void> {
private pause (permanent: boolean = false): void {
if (!this.server.listening && this.status.code === TCPListenerStatusCode.PAUSED && permanent) {
this.status = { code: TCPListenerStatusCode.INACTIVE }
return
Expand Down Expand Up @@ -361,15 +380,10 @@
// during the time the server is closing
this.status = permanent ? { code: TCPListenerStatusCode.INACTIVE } : { ...this.status, code: TCPListenerStatusCode.PAUSED }

await new Promise<void>((resolve, reject) => {
this.server.close(err => {
if (err != null) {
reject(err)
return
}

resolve()
})
})
// stop accepting incoming connections - existing connections are maintained
// - any callback passed here would be invoked after existing connections
// close, we want to maintain them so no callback is passed otherwise his
// method will never return
this.server.close()
}
}
59 changes: 26 additions & 33 deletions packages/transport-tcp/test/connection-limits.spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import net from 'node:net'
import { promisify } from 'util'
import { TypedEventEmitter } from '@libp2p/interface'
import { mockUpgrader } from '@libp2p/interface-compliance-tests/mocks'
import { defaultLogger } from '@libp2p/logger'
import { multiaddr } from '@multiformats/multiaddr'
Expand Down Expand Up @@ -64,38 +63,40 @@
// Expect server connections but allow time for sockets to connect or disconnect
for (let i = 0; i < 100; i++) {
// eslint-disable-next-line @typescript-eslint/dot-notation
if (listener['connections'].size === connections) {
if (listener['sockets'].size === connections) {
return
} else {
await promisify(setTimeout)(10)
}
}
// eslint-disable-next-line @typescript-eslint/dot-notation
expect(listener['connections'].size).equals(connections, 'invalid amount of server connections')
expect(listener['sockets'].size).equals(connections, 'invalid amount of server connections')

Check warning on line 73 in packages/transport-tcp/test/connection-limits.spec.ts

View check run for this annotation

Codecov / codecov/patch

packages/transport-tcp/test/connection-limits.spec.ts#L73

Added line #L73 was not covered by tests
}

describe('closeAbove/listenBelow', () => {
const afterEachCallbacks: Array<() => Promise<any> | any> = []
let afterEachCallbacks: Array<() => Promise<any> | any> = []

beforeEach(() => {
afterEachCallbacks = []
})

afterEach(async () => {
await Promise.all(afterEachCallbacks.map(fn => fn()))
afterEachCallbacks.length = 0
})

it('reject dial of connection above closeAbove', async () => {
const listenBelow = 2
const closeAbove = 3
const port = 9900

const trasnport = tcp({ closeServerOnMaxConnections: { listenBelow, closeAbove } })({
const transport = tcp({ closeServerOnMaxConnections: { listenBelow, closeAbove } })({
logger: defaultLogger()
})

const upgrader = mockUpgrader({
events: new TypedEventEmitter()
})
const listener = trasnport.createListener({ upgrader }) as TCPListener
// eslint-disable-next-line @typescript-eslint/promise-function-async
afterEachCallbacks.push(() => listener.close())
const upgrader = mockUpgrader()
const listener = transport.createListener({ upgrader }) as TCPListener
afterEachCallbacks.push(async () => listener.close())

await listener.listen(multiaddr(`/ip4/127.0.0.1/tcp/${port}`))
const { assertConnectedSocket, assertRefusedSocket } = buildSocketAssertions(port, afterEachCallbacks)

Expand All @@ -115,16 +116,14 @@
const closeAbove = 3
const port = 9900

const trasnport = tcp({ closeServerOnMaxConnections: { listenBelow, closeAbove } })({
const transport = tcp({ closeServerOnMaxConnections: { listenBelow, closeAbove } })({
logger: defaultLogger()
})

const upgrader = mockUpgrader({
events: new TypedEventEmitter()
})
const listener = trasnport.createListener({ upgrader }) as TCPListener
// eslint-disable-next-line @typescript-eslint/promise-function-async
afterEachCallbacks.push(() => listener.close())
const upgrader = mockUpgrader()
const listener = transport.createListener({ upgrader }) as TCPListener
afterEachCallbacks.push(async () => listener.close())

await listener.listen(multiaddr(`/ip4/127.0.0.1/tcp/${port}`))
const { assertConnectedSocket } = buildSocketAssertions(port, afterEachCallbacks)

Expand Down Expand Up @@ -152,16 +151,13 @@
const closeAbove = 3
const port = 9900

const trasnport = tcp({ closeServerOnMaxConnections: { listenBelow, closeAbove } })({
const transport = tcp({ closeServerOnMaxConnections: { listenBelow, closeAbove } })({
logger: defaultLogger()
})

const upgrader = mockUpgrader({
events: new TypedEventEmitter()
})
const listener = trasnport.createListener({ upgrader }) as TCPListener
// eslint-disable-next-line @typescript-eslint/promise-function-async
afterEachCallbacks.push(() => listener.close())
const upgrader = mockUpgrader()
const listener = transport.createListener({ upgrader }) as TCPListener
afterEachCallbacks.push(async () => listener.close())

let closeEventCallCount = 0
listener.addEventListener('close', () => {
Expand All @@ -185,16 +181,13 @@
const closeAbove = 3
const port = 9900

const trasnport = tcp({ closeServerOnMaxConnections: { listenBelow, closeAbove } })({
const transport = tcp({ closeServerOnMaxConnections: { listenBelow, closeAbove } })({
logger: defaultLogger()
})

const upgrader = mockUpgrader({
events: new TypedEventEmitter()
})
const listener = trasnport.createListener({ upgrader }) as TCPListener
// eslint-disable-next-line @typescript-eslint/promise-function-async
afterEachCallbacks.push(() => listener.close())
const upgrader = mockUpgrader()
const listener = transport.createListener({ upgrader }) as TCPListener
afterEachCallbacks.push(async () => listener.close())

let listeningEventCallCount = 0
listener.addEventListener('listening', () => {
Expand Down
82 changes: 82 additions & 0 deletions packages/transport-tcp/test/listen-dial.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -394,4 +394,86 @@

await listener.close()
})

it('should close before connection upgrade is completed', async () => {
// create a Promise that resolves when the upgrade starts
const upgradeStarted = pDefer()

// create a listener with the handler
const listener = transport.createListener({
upgrader: {
async upgradeInbound () {
upgradeStarted.resolve()

return new Promise(() => {})
},
async upgradeOutbound () {
return new Promise(() => {})
}

Check warning on line 412 in packages/transport-tcp/test/listen-dial.spec.ts

View check run for this annotation

Codecov / codecov/patch

packages/transport-tcp/test/listen-dial.spec.ts#L411-L412

Added lines #L411 - L412 were not covered by tests
}
})

// listen on a multiaddr
await listener.listen(multiaddr('/ip4/127.0.0.1/tcp/0'))

const localAddrs = listener.getAddrs()
expect(localAddrs.length).to.equal(1)

// dial the listener address
transport.dial(localAddrs[0], {
upgrader
}).catch(() => {})

// wait for the upgrade to start
await upgradeStarted.promise

// close the listener, process should exit normally
await listener.close()
})

it('should abort inbound upgrade on close', async () => {
// create a Promise that resolves when the upgrade starts
const upgradeStarted = pDefer()
const abortedUpgrade = pDefer()

// create a listener with the handler
const listener = transport.createListener({
upgrader: {
async upgradeInbound (maConn, opts) {
upgradeStarted.resolve()

opts?.signal?.addEventListener('abort', () => {
abortedUpgrade.resolve()
}, {
once: true
})

return new Promise(() => {})
},
async upgradeOutbound () {
return new Promise(() => {})
}

Check warning on line 455 in packages/transport-tcp/test/listen-dial.spec.ts

View check run for this annotation

Codecov / codecov/patch

packages/transport-tcp/test/listen-dial.spec.ts#L454-L455

Added lines #L454 - L455 were not covered by tests
}
})

// listen on a multiaddr
await listener.listen(multiaddr('/ip4/127.0.0.1/tcp/0'))

const localAddrs = listener.getAddrs()
expect(localAddrs.length).to.equal(1)

// dial the listener address
transport.dial(localAddrs[0], {
upgrader
}).catch(() => {})

// wait for the upgrade to start
await upgradeStarted.promise

// close the listener
await listener.close()

// should abort the upgrade
await abortedUpgrade.promise
})
})
Loading