Skip to content

Commit

Permalink
feat: auto relay (#723)
Browse files Browse the repository at this point in the history
* feat: auto relay

* fix: leverage protoBook events to ask relay peers if they support hop

* chore: refactor disconnect

* chore: do not listen on a relayed conn

* chore: tweaks

* chore: improve _listenOnAvailableHopRelays logic

* chore: default value of 1 to maxListeners on auto-relay
  • Loading branch information
vasco-santos authored Sep 16, 2020
1 parent bd26bde commit c3039a0
Show file tree
Hide file tree
Showing 13 changed files with 762 additions and 25 deletions.
231 changes: 231 additions & 0 deletions src/circuit/auto-relay.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
'use strict'

const debug = require('debug')
const log = debug('libp2p:auto-relay')
log.error = debug('libp2p:auto-relay:error')

const uint8ArrayFromString = require('uint8arrays/from-string')
const uint8ArrayToString = require('uint8arrays/to-string')
const multiaddr = require('multiaddr')
const PeerId = require('peer-id')

const { relay: multicodec } = require('./multicodec')
const { canHop } = require('./circuit/hop')

const circuitProtoCode = 290
const hopMetadataKey = 'hop_relay'
const hopMetadataValue = 'true'

class AutoRelay {
/**
* Creates an instance of AutoRelay.
* @constructor
* @param {object} props
* @param {Libp2p} props.libp2p
* @param {number} [props.maxListeners = 1] maximum number of relays to listen.
*/
constructor ({ libp2p, maxListeners = 1 }) {
this._libp2p = libp2p
this._peerId = libp2p.peerId
this._peerStore = libp2p.peerStore
this._connectionManager = libp2p.connectionManager
this._transportManager = libp2p.transportManager

this.maxListeners = maxListeners

/**
* @type {Set<string>}
*/
this._listenRelays = new Set()

this._onProtocolChange = this._onProtocolChange.bind(this)
this._onPeerDisconnected = this._onPeerDisconnected.bind(this)

this._peerStore.on('change:protocols', this._onProtocolChange)
this._connectionManager.on('peer:disconnect', this._onPeerDisconnected)
}

/**
* Check if a peer supports the relay protocol.
* If the protocol is not supported, check if it was supported before and remove it as a listen relay.
* If the protocol is supported, check if the peer supports **HOP** and add it as a listener if
* inside the threshold.
* @param {Object} props
* @param {PeerId} props.peerId
* @param {Array<string>} props.protocols
* @return {Promise<void>}
*/
async _onProtocolChange ({ peerId, protocols }) {
const id = peerId.toB58String()

// Check if it has the protocol
const hasProtocol = protocols.find(protocol => protocol === multicodec)

// If no protocol, check if we were keeping the peer before as a listenRelay
if (!hasProtocol && this._listenRelays.has(id)) {
this._removeListenRelay(id)
return
} else if (!hasProtocol || this._listenRelays.has(id)) {
return
}

// If protocol, check if can hop, store info in the metadataBook and listen on it
try {
const connection = this._connectionManager.get(peerId)

// Do not hop on a relayed connection
if (connection.remoteAddr.protoCodes().includes(circuitProtoCode)) {
log(`relayed connection to ${id} will not be used to hop on`)
return
}

const supportsHop = await canHop({ connection })

if (supportsHop) {
this._peerStore.metadataBook.set(peerId, hopMetadataKey, uint8ArrayFromString(hopMetadataValue))
await this._addListenRelay(connection, id)
}
} catch (err) {
log.error(err)
}
}

/**
* Peer disconnects.
* @param {Connection} connection connection to the peer
* @return {void}
*/
_onPeerDisconnected (connection) {
const peerId = connection.remotePeer
const id = peerId.toB58String()

// Not listening on this relay
if (!this._listenRelays.has(id)) {
return
}

this._removeListenRelay(id)
}

/**
* Attempt to listen on the given relay connection.
* @private
* @param {Connection} connection connection to the peer
* @param {string} id peer identifier string
* @return {Promise<void>}
*/
async _addListenRelay (connection, id) {
// Check if already listening on enough relays
if (this._listenRelays.size >= this.maxListeners) {
return
}

// Create relay listen addr
let listenAddr, remoteMultiaddr

try {
const remoteAddrs = this._peerStore.addressBook.get(connection.remotePeer)
// TODO: HOP Relays should avoid advertising private addresses!
remoteMultiaddr = remoteAddrs.find(a => a.isCertified).multiaddr // Get first announced address certified
} catch (_) {
log.error(`${id} does not have announced certified multiaddrs`)
return
}

if (!remoteMultiaddr.protoNames().includes('p2p')) {
listenAddr = `${remoteMultiaddr.toString()}/p2p/${connection.remotePeer.toB58String()}/p2p-circuit`
} else {
listenAddr = `${remoteMultiaddr.toString()}/p2p-circuit`
}

// Attempt to listen on relay
this._listenRelays.add(id)

try {
await this._transportManager.listen([multiaddr(listenAddr)])
// TODO: push announce multiaddrs update
// await this._libp2p.identifyService.pushToPeerStore()
} catch (err) {
log.error(err)
this._listenRelays.delete(id)
}
}

/**
* Remove listen relay.
* @private
* @param {string} id peer identifier string.
* @return {void}
*/
_removeListenRelay (id) {
if (this._listenRelays.delete(id)) {
// TODO: this should be responsibility of the connMgr
this._listenOnAvailableHopRelays([id])
}
}

/**
* Try to listen on available hop relay connections.
* The following order will happen while we do not have enough relays.
* 1. Check the metadata store for known relays, try to listen on the ones we are already connected.
* 2. Dial and try to listen on the peers we know that support hop but are not connected.
* 3. Search the network.
* @param {Array<string>} [peersToIgnore]
* @return {Promise<void>}
*/
async _listenOnAvailableHopRelays (peersToIgnore = []) {
// TODO: The peer redial issue on disconnect should be handled by connection gating
// Check if already listening on enough relays
if (this._listenRelays.size >= this.maxListeners) {
return
}

const knownHopsToDial = []

// Check if we have known hop peers to use and attempt to listen on the already connected
for (const [id, metadataMap] of this._peerStore.metadataBook.data.entries()) {
// Continue to next if listening on this or peer to ignore
if (this._listenRelays.has(id) || peersToIgnore.includes(id)) {
continue
}

const supportsHop = metadataMap.get(hopMetadataKey)

// Continue to next if it does not support Hop
if (!supportsHop || uint8ArrayToString(supportsHop) !== hopMetadataValue) {
continue
}

const peerId = PeerId.createFromCID(id)
const connection = this._connectionManager.get(peerId)

// If not connected, store for possible later use.
if (!connection) {
knownHopsToDial.push(peerId)
continue
}

await this._addListenRelay(connection, id)

// Check if already listening on enough relays
if (this._listenRelays.size >= this.maxListeners) {
return
}
}

// Try to listen on known peers that are not connected
for (const peerId of knownHopsToDial) {
const connection = await this._libp2p.dial(peerId)
await this._addListenRelay(connection, peerId.toB58String())

// Check if already listening on enough relays
if (this._listenRelays.size >= this.maxListeners) {
return
}
}

// TODO: Try to find relays to hop on the network
}
}

module.exports = AutoRelay
27 changes: 27 additions & 0 deletions src/circuit/circuit/hop.js
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,33 @@ module.exports.hop = async function hop ({
throw errCode(new Error(`HOP request failed with code ${response.code}`), Errors.ERR_HOP_REQUEST_FAILED)
}

/**
* Performs a CAN_HOP request to a relay peer, in order to understand its capabilities.
* @param {object} options
* @param {Connection} options.connection Connection to the relay
* @returns {Promise<boolean>}
*/
module.exports.canHop = async function canHop ({
connection
}) {
// Create a new stream to the relay
const { stream } = await connection.newStream([multicodec.relay])
// Send the HOP request
const streamHandler = new StreamHandler({ stream })
streamHandler.write({
type: CircuitPB.Type.CAN_HOP
})

const response = await streamHandler.read()
await streamHandler.close()

if (response.code !== CircuitPB.Status.SUCCESS) {
return false
}

return true
}

/**
* Creates an unencoded CAN_HOP response based on the Circuits configuration
* @private
Expand Down
20 changes: 15 additions & 5 deletions src/circuit/index.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
'use strict'

const debug = require('debug')
const log = debug('libp2p:circuit')
log.error = debug('libp2p:circuit:error')

const mafmt = require('mafmt')
const multiaddr = require('multiaddr')
const PeerId = require('peer-id')
const withIs = require('class-is')
const { CircuitRelay: CircuitPB } = require('./protocol')

const debug = require('debug')
const log = debug('libp2p:circuit')
log.error = debug('libp2p:circuit:error')
const toConnection = require('libp2p-utils/src/stream-to-ma-conn')

const AutoRelay = require('./auto-relay')
const { relay: multicodec } = require('./multicodec')
const createListener = require('./listener')
const { handleCanHop, handleHop, hop } = require('./circuit/hop')
Expand All @@ -35,11 +37,19 @@ class Circuit {
this._libp2p = libp2p
this.peerId = libp2p.peerId
this._registrar.handle(multicodec, this._onProtocol.bind(this))

// Create autoRelay if enabled
this._autoRelay = this._options.autoRelay.enabled && new AutoRelay({ libp2p, ...this._options.autoRelay })
}

async _onProtocol ({ connection, stream, protocol }) {
async _onProtocol ({ connection, stream }) {
const streamHandler = new StreamHandler({ stream })
const request = await streamHandler.read()

if (!request) {
return
}

const circuit = this
let virtualConnection

Expand Down Expand Up @@ -163,7 +173,7 @@ class Circuit {
// Called on successful HOP and STOP requests
this.handler = handler

return createListener(this, options)
return createListener(this._libp2p, options)
}

/**
Expand Down
16 changes: 13 additions & 3 deletions src/circuit/listener.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,23 @@ const log = debug('libp2p:circuit:listener')
log.err = debug('libp2p:circuit:error:listener')

/**
* @param {*} circuit
* @param {Libp2p} libp2p
* @returns {Listener} a transport listener
*/
module.exports = (circuit) => {
module.exports = (libp2p) => {
const listener = new EventEmitter()
const listeningAddrs = new Map()

// Remove listeningAddrs when a peer disconnects
libp2p.connectionManager.on('peer:disconnect', (connection) => {
const deleted = listeningAddrs.delete(connection.remotePeer.toB58String())

if (deleted) {
// TODO push announce multiaddrs update
// libp2p.identifyService.pushToPeerStore()
}
})

/**
* Add swarm handler and listen for incoming connections
*
Expand All @@ -24,7 +34,7 @@ module.exports = (circuit) => {
listener.listen = async (addr) => {
const addrString = String(addr).split('/p2p-circuit').find(a => a !== '')

const relayConn = await circuit._dialer.connectToPeer(multiaddr(addrString))
const relayConn = await libp2p.dial(multiaddr(addrString))
const relayedAddr = relayConn.remoteAddr.encapsulate('/p2p-circuit')

listeningAddrs.set(relayConn.remotePeer.toB58String(), relayedAddr)
Expand Down
4 changes: 4 additions & 0 deletions src/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ const DefaultConfig = {
hop: {
enabled: false,
active: false
},
autoRelay: {
enabled: false,
maxListeners: 2
}
},
transport: {}
Expand Down
6 changes: 3 additions & 3 deletions src/identify/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,12 @@ class IdentifyService {

/**
* Calls `push` for all peers in the `peerStore` that are connected
* @param {PeerStore} peerStore
* @returns {void}
*/
pushToPeerStore (peerStore) {
pushToPeerStore () {
const connections = []
let connection
for (const peer of peerStore.peers.values()) {
for (const peer of this.peerStore.peers.values()) {
if (peer.protocols.includes(MULTICODEC_IDENTIFY_PUSH) && (connection = this.connectionManager.get(peer.id))) {
connections.push(connection)
}
Expand Down
Loading

0 comments on commit c3039a0

Please sign in to comment.