Skip to content

Commit

Permalink
fix: check for connection status before storing (#2732)
Browse files Browse the repository at this point in the history
It's possible that the remote can close the connection very shortly
after it is opened, so check the connection status before adding it
to the list of connections.

Fixes a memory leak whereby the connection is already closed and
then is never removed from the connections list.
  • Loading branch information
achingbrain authored Sep 27, 2024
1 parent a74c75d commit 7e4e6bd
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 84 deletions.
24 changes: 16 additions & 8 deletions packages/libp2p/src/connection-manager/connection-pruner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,29 @@ export class ConnectionPruner {
this.peerStore = components.peerStore
this.events = components.events
this.log = components.logger.forComponent('libp2p:connection-manager:connection-pruner')
this.maybePruneConnections = this.maybePruneConnections.bind(this)
}

// check the max connection limit whenever a peer connects
components.events.addEventListener('connection:open', () => {
this.maybePruneConnections()
.catch(err => {
this.log.error(err)
})
})
start (): void {
this.events.addEventListener('connection:open', this.maybePruneConnections)
}

stop (): void {
this.events.removeEventListener('connection:open', this.maybePruneConnections)
}

maybePruneConnections (): void {
this._maybePruneConnections()
.catch(err => {
this.log.error('error while pruning connections %e', err)
})
}

/**
* If we have more connections than our maximum, select some excess connections
* to prune based on peer value
*/
async maybePruneConnections (): Promise<void> {
private async _maybePruneConnections (): Promise<void> {
const connections = this.connectionManager.getConnections()
const numConnections = connections.length

Expand Down
2 changes: 1 addition & 1 deletion packages/libp2p/src/connection-manager/dial-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ export class DialQueue {
})
})

if (existingConnection != null) {
if (existingConnection?.status === 'open') {
this.log('already connected to %a', existingConnection.remoteAddr)
options.onProgress?.(new CustomProgressEvent('dial-queue:already-connected'))
return existingConnection
Expand Down
75 changes: 37 additions & 38 deletions packages/libp2p/src/connection-manager/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { InvalidMultiaddrError, InvalidParametersError, InvalidPeerIdError, NotStartedError, start, stop } from '@libp2p/interface'
import { ConnectionClosedError, InvalidMultiaddrError, InvalidParametersError, InvalidPeerIdError, NotStartedError, start, stop } from '@libp2p/interface'
import { PeerMap } from '@libp2p/peer-collections'
import { defaultAddressSort } from '@libp2p/utils/address-sort'
import { RateLimiter } from '@libp2p/utils/rate-limiter'
Expand Down Expand Up @@ -214,8 +214,6 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {

this.onConnect = this.onConnect.bind(this)
this.onDisconnect = this.onDisconnect.bind(this)
this.events.addEventListener('connection:open', this.onConnect)
this.events.addEventListener('connection:close', this.onDisconnect)

// allow/deny lists
this.allow = (init.allow ?? []).map(ma => multiaddr(ma))
Expand Down Expand Up @@ -268,10 +266,6 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {

readonly [Symbol.toStringTag] = '@libp2p/connection-manager'

isStarted (): boolean {
return this.started
}

/**
* Starts the Connection Manager. If Metrics are not enabled on libp2p
* only event loop and connection limits will be monitored.
Expand All @@ -288,11 +282,7 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {

for (const conns of this.connections.values()) {
for (const conn of conns) {
if (conn.direction === 'inbound') {
metric.inbound++
} else {
metric.outbound++
}
metric[conn.direction]++
}
}

Expand Down Expand Up @@ -356,9 +346,13 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {
}
})

this.events.addEventListener('connection:open', this.onConnect)
this.events.addEventListener('connection:close', this.onDisconnect)

await start(
this.dialQueue,
this.reconnectQueue
this.reconnectQueue,
this.connectionPruner
)

this.started = true
Expand All @@ -369,9 +363,13 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {
* Stops the Connection Manager
*/
async stop (): Promise<void> {
this.events.removeEventListener('connection:open', this.onConnect)
this.events.removeEventListener('connection:close', this.onDisconnect)

await stop(
this.reconnectQueue,
this.dialQueue
this.dialQueue,
this.connectionPruner
)

// Close all connections we're tracking
Expand Down Expand Up @@ -413,17 +411,19 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {
return
}

const peerId = connection.remotePeer
const storedConns = this.connections.get(peerId)
let isNewPeer = false

if (storedConns != null) {
storedConns.push(connection)
} else {
isNewPeer = true
this.connections.set(peerId, [connection])
if (connection.status !== 'open') {
// this can happen when the remote closes the connection immediately after
// opening
return
}

const peerId = connection.remotePeer
const isNewPeer = !this.connections.has(peerId)
const storedConns = this.connections.get(peerId) ?? []
storedConns.push(connection)

this.connections.set(peerId, storedConns)

// only need to store RSA public keys, all other types are embedded in the peer id
if (peerId.publicKey != null && peerId.type === 'RSA') {
await this.peerStore.patch(peerId, {
Expand All @@ -441,20 +441,21 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {
*/
onDisconnect (evt: CustomEvent<Connection>): void {
const { detail: connection } = evt
const peerId = connection.remotePeer
const peerConns = this.connections.get(peerId) ?? []

if (!this.started) {
// This can happen when we are in the process of shutting down the node
return
}
// remove closed connection
const filteredPeerConns = peerConns.filter(conn => conn.id !== connection.id)

const peerId = connection.remotePeer
let storedConn = this.connections.get(peerId)
// update peer connections
this.connections.set(peerId, filteredPeerConns)

if (storedConn != null && storedConn.length > 1) {
storedConn = storedConn.filter((conn) => conn.id !== connection.id)
this.connections.set(peerId, storedConn)
} else if (storedConn != null) {
if (filteredPeerConns.length === 0) {
// trigger disconnect event if no connections remain
this.log('onDisconnect remove all connections for peer %p', peerId)
this.connections.delete(peerId)

// broadcast disconnect event
this.events.safeDispatchEvent('peer:disconnect', { detail: connection.remotePeer })
}
}
Expand All @@ -478,7 +479,7 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {
}

async openConnection (peerIdOrMultiaddr: PeerId | Multiaddr | Multiaddr[], options: OpenConnectionOptions = {}): Promise<Connection> {
if (!this.isStarted()) {
if (!this.started) {
throw new NotStartedError('Not started')
}

Expand Down Expand Up @@ -508,10 +509,8 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {
priority: options.priority ?? DEFAULT_DIAL_PRIORITY
})

if (connection.remotePeer.equals(this.peerId)) {
const err = new InvalidPeerIdError('Can not dial self')
connection.abort(err)
throw err
if (connection.status !== 'open') {
throw new ConnectionClosedError('Remote closed connection during opening')
}

let peerConnections = this.connections.get(connection.remotePeer)
Expand Down
16 changes: 5 additions & 11 deletions packages/libp2p/src/connection/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,6 @@ export class ConnectionImpl implements Connection {
}

try {
this.log.trace('closing all streams')

// close all streams gracefully - this can throw if we're not multiplexed
await Promise.all(
this.streams.map(async s => s.close(options))
)

this.log.trace('closing underlying transport')

// close raw connection
Expand All @@ -184,18 +177,19 @@ export class ConnectionImpl implements Connection {
}

abort (err: Error): void {
if (this.status === 'closed') {
return
}

this.log.error('aborting connection to %a due to error', this.remoteAddr, err)

this.status = 'closing'
this.streams.forEach(s => { s.abort(err) })

this.log.error('all streams aborted', this.streams.length)

// Abort raw connection
this._abort(err)

this.timeline.close = Date.now()
this.status = 'closed'
this.timeline.close = Date.now()
}
}

Expand Down
41 changes: 25 additions & 16 deletions packages/libp2p/src/upgrader.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { InvalidMultiaddrError, TooManyInboundProtocolStreamsError, TooManyOutboundProtocolStreamsError, LimitedConnectionError, setMaxListeners } from '@libp2p/interface'
import { InvalidMultiaddrError, TooManyInboundProtocolStreamsError, TooManyOutboundProtocolStreamsError, LimitedConnectionError, setMaxListeners, InvalidPeerIdError } from '@libp2p/interface'
import * as mss from '@libp2p/multistream-select'
import { peerIdFromString } from '@libp2p/peer-id'
import { anySignal } from 'any-signal'
Expand Down Expand Up @@ -304,6 +304,14 @@ export class DefaultUpgrader implements Upgrader {
remotePeer = remotePeerId
}

// this can happen if we dial a multiaddr without a peer id, we only find
// out the identity of the remote after the connection is encrypted
if (remotePeer.equals(this.components.peerId)) {
const err = new InvalidPeerIdError('Can not dial self')
maConn.abort(err)
throw err
}

upgradedConn = encryptedConn
if (opts?.muxerFactory != null) {
muxerFactory = opts.muxerFactory
Expand All @@ -326,6 +334,8 @@ export class DefaultUpgrader implements Upgrader {
} catch (err: any) {
maConn.log.error('failed to upgrade inbound connection %s %a - %e', direction === 'inbound' ? 'from' : 'to', maConn.remoteAddr, err)
throw err
} finally {
signal.clear()
}

await this.shouldBlockConnection(direction === 'inbound' ? 'denyInboundUpgradedConnection' : 'denyOutboundUpgradedConnection', remotePeer, maConn)
Expand Down Expand Up @@ -538,22 +548,22 @@ export class DefaultUpgrader implements Upgrader {
const _timeline = maConn.timeline
maConn.timeline = new Proxy(_timeline, {
set: (...args) => {
if (connection != null && args[1] === 'close' && args[2] != null && _timeline.close == null) {
if (args[1] === 'close' && args[2] != null && _timeline.close == null) {
// Wait for close to finish before notifying of the closure
(async () => {
try {
if (connection.status === 'open') {
await connection.close()
}
} catch (err: any) {
connection.log.error('error closing connection after timeline close', err)
connection.log.error('error closing connection after timeline close %e', err)
} finally {
this.events.safeDispatchEvent('connection:close', {
detail: connection
})
}
})().catch(err => {
connection.log.error('error thrown while dispatching connection:close event', err)
connection.log.error('error thrown while dispatching connection:close event %e', err)
})
}

Expand All @@ -578,32 +588,31 @@ export class DefaultUpgrader implements Upgrader {
limits,
logger: this.components.logger,
newStream: newStream ?? errConnectionNotMultiplexed,
getStreams: () => { if (muxer != null) { return muxer.streams } else { return [] } },
getStreams: () => {
return muxer?.streams ?? []
},
close: async (options?: AbortOptions) => {
// Ensure remaining streams are closed gracefully
if (muxer != null) {
connection.log.trace('close muxer')
await muxer.close(options)
}
// ensure remaining streams are closed gracefully
await muxer?.close(options)

connection.log.trace('close maconn')
// close the underlying transport
await maConn.close(options)
connection.log.trace('closed maconn')
},
abort: (err) => {
maConn.abort(err)
// Ensure remaining streams are aborted
if (muxer != null) {
muxer.abort(err)
}

// ensure remaining streams are aborted
muxer?.abort(err)
}
})

this.events.safeDispatchEvent('connection:open', {
detail: connection
})

// @ts-expect-error nah
connection.__maConnTimeline = _timeline

return connection
}

Expand Down
Loading

0 comments on commit 7e4e6bd

Please sign in to comment.