Skip to content

Commit

Permalink
feat: connection gater (#1142)
Browse files Browse the repository at this point in the history
Port of https://github.com/libp2p/go-libp2p-core/blob/master/connmgr/gater.go

Adds a new configuration key `connectionGater` which allows denying the dialing of certain peers, individual multiaddrs and the creation of connections at certain points in the connection flow.

Fixes: #175
Refs: #744
Refs: #769

Co-authored-by: mzdws <8580712+mzdws@user.noreply.gitee.com>
  • Loading branch information
achingbrain and mzdws authored Jan 25, 2022
1 parent 9b22c6e commit ff32eba
Show file tree
Hide file tree
Showing 21 changed files with 770 additions and 98 deletions.
124 changes: 124 additions & 0 deletions doc/CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
- [Setup with Keychain](#setup-with-keychain)
- [Configuring Dialing](#configuring-dialing)
- [Configuring Connection Manager](#configuring-connection-manager)
- [Configuring Connection Gater](#configuring-connection-gater)
- [Outgoing connections](#outgoing-connections)
- [Incoming connections](#incoming-connections)
- [Configuring Transport Manager](#configuring-transport-manager)
- [Configuring Metrics](#configuring-metrics)
- [Configuring PeerStore](#configuring-peerstore)
Expand Down Expand Up @@ -590,6 +593,127 @@ const node = await Libp2p.create({
})
```
#### Configuring Connection Gater
The Connection Gater allows us to prevent making incoming and outgoing connections to peers and storing
multiaddrs in the address book.
The order in which methods are called is as follows:
##### Outgoing connections
1. `connectionGater.denyDialPeer(...)`
2. `connectionGater.denyDialMultiaddr(...)`
3. `connectionGater.denyOutboundConnection(...)`
4. `connectionGater.denyOutboundEncryptedConnection(...)`
5. `connectionGater.denyOutboundUpgradedConnection(...)`
##### Incoming connections
1. `connectionGater.denyInboundConnection(...)`
2. `connectionGater.denyInboundEncryptedConnection(...)`
3. `connectionGater.denyInboundUpgradedConnection(...)`
```js
const node = await Libp2p.create({
// .. other config
connectionGater: {
/**
* denyDialMultiaddr tests whether we're permitted to Dial the
* specified peer.
*
* This is called by the dialer.connectToPeer implementation before
* dialling a peer.
*
* Return true to prevent dialing the passed peer.
*/
denyDialPeer: (peerId: PeerId) => Promise<boolean>

/**
* denyDialMultiaddr tests whether we're permitted to dial the specified
* multiaddr for the given peer.
*
* This is called by the dialer.connectToPeer implementation after it has
* resolved the peer's addrs, and prior to dialling each.
*
* Return true to prevent dialing the passed peer on the passed multiaddr.
*/
denyDialMultiaddr: (peerId: PeerId, multiaddr: Multiaddr) => Promise<boolean>

/**
* denyInboundConnection tests whether an incipient inbound connection is allowed.
*
* This is called by the upgrader, or by the transport directly (e.g. QUIC,
* Bluetooth), straight after it has accepted a connection from its socket.
*
* Return true to deny the incoming passed connection.
*/
denyInboundConnection: (maConn: MultiaddrConnection) => Promise<boolean>

/**
* denyOutboundConnection tests whether an incipient outbound connection is allowed.
*
* This is called by the upgrader, or by the transport directly (e.g. QUIC,
* Bluetooth), straight after it has created a connection with its socket.
*
* Return true to deny the incoming passed connection.
*/
denyOutboundConnection: (peerId: PeerId, maConn: MultiaddrConnection) => Promise<boolean>

/**
* denyInboundEncryptedConnection tests whether a given connection, now encrypted,
* is allowed.
*
* This is called by the upgrader, after it has performed the security
* handshake, and before it negotiates the muxer, or by the directly by the
* transport, at the exact same checkpoint.
*
* Return true to deny the passed secured connection.
*/
denyInboundEncryptedConnection: (peerId: PeerId, maConn: MultiaddrConnection) => Promise<boolean>

/**
* denyOutboundEncryptedConnection tests whether a given connection, now encrypted,
* is allowed.
*
* This is called by the upgrader, after it has performed the security
* handshake, and before it negotiates the muxer, or by the directly by the
* transport, at the exact same checkpoint.
*
* Return true to deny the passed secured connection.
*/
denyOutboundEncryptedConnection: (peerId: PeerId, maConn: MultiaddrConnection) => Promise<boolean>

/**
* denyInboundUpgradedConnection tests whether a fully capable connection is allowed.
*
* This is called after encryption has been negotiated and the connection has been
* multiplexed, if a multiplexer is configured.
*
* Return true to deny the passed upgraded connection.
*/
denyInboundUpgradedConnection: (peerId: PeerId, maConn: MultiaddrConnection) => Promise<boolean>

/**
* denyOutboundUpgradedConnection tests whether a fully capable connection is allowed.
*
* This is called after encryption has been negotiated and the connection has been
* multiplexed, if a multiplexer is configured.
*
* Return true to deny the passed upgraded connection.
*/
denyOutboundUpgradedConnection: (peerId: PeerId, maConn: MultiaddrConnection) => Promise<boolean>

/**
* Used by the address book to filter passed addresses.
*
* Return true to allow storing the passed multiaddr for the passed peer.
*/
filterMultiaddrForPeer: (peer: PeerId, multiaddr: Multiaddr) => Promise<boolean>
}
})
```
#### Configuring Transport Manager
The Transport Manager is responsible for managing the libp2p transports life cycle. This includes starting listeners for the provided listen addresses, closing these listeners and dialing using the provided transports. By default, if a libp2p node has a list of multiaddrs for listening on and there are no valid transports for those multiaddrs, libp2p will throw an error on startup and shutdown. However, for some applications it is perfectly acceptable for libp2p nodes to start in dial only mode if all the listen multiaddrs failed. This error tolerance can be enabled as follows:
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
"it-drain": "^1.0.3",
"it-filter": "^1.0.1",
"it-first": "^1.0.4",
"it-foreach": "^0.1.1",
"it-handshake": "^2.0.0",
"it-length-prefixed": "^5.0.2",
"it-map": "^1.0.4",
Expand Down
2 changes: 2 additions & 0 deletions src/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const { FaultTolerance } = require('./transport-manager')

/**
* @typedef {import('multiaddr').Multiaddr} Multiaddr
* @typedef {import('./types').ConnectionGater} ConnectionGater
* @typedef {import('.').Libp2pOptions} Libp2pOptions
* @typedef {import('.').constructorOptions} constructorOptions
*/
Expand All @@ -27,6 +28,7 @@ const DefaultConfig = {
connectionManager: {
minConnections: 25
},
connectionGater: /** @type {ConnectionGater} */ {},
transportManager: {
faultTolerance: FaultTolerance.FATAL_ALL
},
Expand Down
21 changes: 20 additions & 1 deletion src/dialer/index.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
'use strict'

const debug = require('debug')
const all = require('it-all')
const filter = require('it-filter')
const { pipe } = require('it-pipe')
const log = Object.assign(debug('libp2p:dialer'), {
error: debug('libp2p:dialer:err')
})
Expand Down Expand Up @@ -33,12 +36,14 @@ const METRICS_PENDING_DIAL_TARGETS = 'pending-dial-targets'
* @typedef {import('../peer-store/types').PeerStore} PeerStore
* @typedef {import('../peer-store/types').Address} Address
* @typedef {import('../transport-manager')} TransportManager
* @typedef {import('../types').ConnectionGater} ConnectionGater
*/

/**
* @typedef {Object} DialerProperties
* @property {PeerStore} peerStore
* @property {TransportManager} transportManager
* @property {ConnectionGater} connectionGater
*
* @typedef {(addr:Multiaddr) => Promise<string[]>} Resolver
*
Expand Down Expand Up @@ -70,6 +75,7 @@ class Dialer {
constructor ({
transportManager,
peerStore,
connectionGater,
addressSorter = publicAddressesFirst,
maxParallelDials = MAX_PARALLEL_DIALS,
maxAddrsToDial = MAX_ADDRS_TO_DIAL,
Expand All @@ -78,6 +84,7 @@ class Dialer {
resolvers = {},
metrics
}) {
this.connectionGater = connectionGater
this.transportManager = transportManager
this.peerStore = peerStore
this.addressSorter = addressSorter
Expand Down Expand Up @@ -136,6 +143,12 @@ class Dialer {
* @returns {Promise<Connection>}
*/
async connectToPeer (peer, options = {}) {
const { id } = getPeer(peer)

if (await this.connectionGater.denyDialPeer(id)) {
throw errCode(new Error('The dial request is blocked by gater.allowDialPeer'), codes.ERR_PEER_DIAL_INTERCEPTED)
}

const dialTarget = await this._createCancellableDialTarget(peer)

if (!dialTarget.addrs.length) {
Expand Down Expand Up @@ -203,7 +216,13 @@ class Dialer {
await this.peerStore.addressBook.add(id, multiaddrs)
}

let knownAddrs = await this.peerStore.addressBook.getMultiaddrsForPeer(id, this.addressSorter) || []
let knownAddrs = await pipe(
await this.peerStore.addressBook.getMultiaddrsForPeer(id, this.addressSorter),
(source) => filter(source, async (multiaddr) => {
return !(await this.connectionGater.denyDialMultiaddr(id, multiaddr))
}),
(source) => all(source)
)

// If received a multiaddr to dial, it should be the first to use
// But, if we know other multiaddrs for the peer, we should try them too.
Expand Down
2 changes: 2 additions & 0 deletions src/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ exports.codes = {
PUBSUB_NOT_STARTED: 'ERR_PUBSUB_NOT_STARTED',
DHT_NOT_STARTED: 'ERR_DHT_NOT_STARTED',
CONN_ENCRYPTION_REQUIRED: 'ERR_CONN_ENCRYPTION_REQUIRED',
ERR_PEER_DIAL_INTERCEPTED: 'ERR_PEER_DIAL_INTERCEPTED',
ERR_CONNECTION_INTERCEPTED: 'ERR_CONNECTION_INTERCEPTED',
ERR_INVALID_PROTOCOLS_FOR_STREAM: 'ERR_INVALID_PROTOCOLS_FOR_STREAM',
ERR_CONNECTION_ENDED: 'ERR_CONNECTION_ENDED',
ERR_CONNECTION_FAILED: 'ERR_CONNECTION_FAILED',
Expand Down
21 changes: 20 additions & 1 deletion src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ const { updateSelfPeerRecord } = require('./record/utils')
* @typedef {import('libp2p-interfaces/src/pubsub').PubsubOptions} PubsubOptions
* @typedef {import('interface-datastore').Datastore} Datastore
* @typedef {import('./pnet')} Protector
* @typedef {import('./types').ConnectionGater} ConnectionGater
* @typedef {Object} PersistentPeerStoreOptions
* @property {number} [threshold]
*/
Expand Down Expand Up @@ -106,6 +107,7 @@ const { updateSelfPeerRecord } = require('./record/utils')
* @property {Libp2pModules} modules libp2p modules to use
* @property {import('./address-manager').AddressManagerOptions} [addresses]
* @property {import('./connection-manager').ConnectionManagerOptions} [connectionManager]
* @property {Partial<import('./types').ConnectionGater>} [connectionGater]
* @property {Datastore} [datastore]
* @property {import('./dialer').DialerOptions} [dialer]
* @property {import('./identify/index').HostProperties} [host] libp2p host
Expand Down Expand Up @@ -172,10 +174,25 @@ class Libp2p extends EventEmitter {
this.metrics = metrics
}

/** @type {ConnectionGater} */
this.connectionGater = {
denyDialPeer: async () => Promise.resolve(false),
denyDialMultiaddr: async () => Promise.resolve(false),
denyInboundConnection: async () => Promise.resolve(false),
denyOutboundConnection: async () => Promise.resolve(false),
denyInboundEncryptedConnection: async () => Promise.resolve(false),
denyOutboundEncryptedConnection: async () => Promise.resolve(false),
denyInboundUpgradedConnection: async () => Promise.resolve(false),
denyOutboundUpgradedConnection: async () => Promise.resolve(false),
filterMultiaddrForPeer: async () => Promise.resolve(true),
...this._options.connectionGater
}

/** @type {import('./peer-store/types').PeerStore} */
this.peerStore = new PeerStore({
peerId: this.peerId,
datastore: (this.datastore && this._options.peerStore.persistence) ? this.datastore : new MemoryDatastore()
datastore: (this.datastore && this._options.peerStore.persistence) ? this.datastore : new MemoryDatastore(),
addressFilter: this.connectionGater.filterMultiaddrForPeer
})

// Addresses {listen, announce, noAnnounce}
Expand Down Expand Up @@ -220,6 +237,7 @@ class Libp2p extends EventEmitter {

// Setup the Upgrader
this.upgrader = new Upgrader({
connectionGater: this.connectionGater,
localPeer: this.peerId,
metrics: this.metrics,
onConnection: (connection) => this.connectionManager.onConnect(connection),
Expand Down Expand Up @@ -262,6 +280,7 @@ class Libp2p extends EventEmitter {

this.dialer = new Dialer({
transportManager: this.transportManager,
connectionGater: this.connectionGater,
peerStore: this.peerStore,
metrics: this.metrics,
...this._options.dialer
Expand Down
Loading

0 comments on commit ff32eba

Please sign in to comment.