Skip to content

Commit

Permalink
feat: auto relay
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos authored and jacobheun committed Aug 12, 2020
1 parent 02b6248 commit aa5a596
Show file tree
Hide file tree
Showing 12 changed files with 521 additions and 18 deletions.
170 changes: 170 additions & 0 deletions src/circuit/auto-relay.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
'use strict'

const debug = require('debug')
const log = debug('libp2p:auto-relay')
log.error = debug('libp2p:auto-relay:error')

const multiaddr = require('multiaddr')
const MulticodecTopology = require('libp2p-interfaces/src/topology/multicodec-topology')

const { relay: multicodec } = require('./multicodec')
const { canHop } = require('./circuit/hop')

class AutoRelay {
/**
* Creates an instance of AutoRelay
* @constructor
* @param {object} params
* @param {Libp2p} params.libp2p
* @param {number} params.maxListeners maximum number of relays to listen.
*/
constructor ({ libp2p, maxListeners }) {
this._libp2p = libp2p
this._peerId = libp2p.peerId
this._registrar = libp2p.registrar
this._peerStore = libp2p.peerStore
this._connectionManager = libp2p.connectionManager
this._transportManager = libp2p.transportManager

this.maxListeners = maxListeners

/**
* @type {Set<string>}
*/
this._listenRelays = new Set()

this._onPeerConnected = this._onPeerConnected.bind(this)
this._onPeerDisconnected = this._onPeerDisconnected.bind(this)

// register protocol with topology
const topology = new MulticodecTopology({
multicodecs: multicodec,
handlers: {
onConnect: this._onPeerConnected,
onDisconnect: this._onPeerDisconnected
}
})
this._registrar.register(topology)

// TODO: should proactively try to connect?
// We need to figure out a more general approach for the autoDial with topologies
}

/**
* Registrar notifies a connection successfully with circuit protocol.
* @private
* @param {PeerId} peerId remote peer-id
* @param {Connection} connection connection to the peer
*/
async _onPeerConnected (peerId, connection) {
// Check if already listening on enough relays
if (this._listenRelays.size >= this.maxListeners) {
return
}

const idB58Str = peerId.toB58String()
log('connected', idB58Str)

// Check if already listening on the relay
if (this._listenRelays.has(idB58Str)) {
return
}

await this._addListenRelay(connection, idB58Str)
}

/**
* Attempt to listen on the given relay connection.
* @private
* @param {Connection} connection connection to the peer
* @param {string} idB58Str peer id string
* @return {Promise<void>}
*/
async _addListenRelay (connection, idB58Str) {
try {
this._listenRelays.add(idB58Str)
// Ask if relay can hop
await canHop({ connection })

// Create relay listen addr
const remoteMultiaddr = connection.remoteAddr
let listenAddr

if (!remoteMultiaddr.protoNames().includes('p2p')) {
listenAddr = `${remoteMultiaddr.toString()}/p2p/${connection.remotePeer.toB58String()}/p2p-circuit/p2p/${this._peerId.toB58String()}`
} else {
listenAddr = `${remoteMultiaddr.toString()}/p2p-circuit/p2p/${this._peerId.toB58String()}`
}

// Listen on relay
await this._transportManager.listen([multiaddr(listenAddr)])

log('listening on', listenAddr)
} catch (err) {
log.error(err)
this._listenRelays.delete(idB58Str)
}
}

/**
* Registrar notifies a closing connection with circuit protocol.
* @private
* @param {PeerId} peerId peerId
* @param {Error} err error for connection end
* @returns {Promise<void>}
*/
async _onPeerDisconnected (peerId, err) {
const idB58Str = peerId.toB58String()

// Not listening on this relay
if (!this._listenRelays.has(idB58Str)) {
return
}

log('connection ended', idB58Str, err ? err.message : '')

this._listenRelays.delete(idB58Str)

// Listen on alternative relays if available and not listenning on maximum
if (this._listenRelays.size >= this.maxListeners) {
return
}

log('try to listen on other connected peers with circuit')

// TODO: change to have a map of backUp relay nodes instead to simplify?
// Iterate over open connections
for (const [connection] of this._connectionManager.connections.values()) {
const idB58Str = connection.remotePeer.toB58String()
const protocols = this._peerStore.protoBook.get(connection.remotePeer) || []

// If has protocol and is not being used, attempt to listen
if (protocols.includes(multicodec) && !this._listenRelays.has(idB58Str)) {
await this._addListenRelay(connection, idB58Str)
}

if (this._listenRelays.size >= this.maxListeners) {
break
}
}

// Listen on alternative relays if available and not listenning on maxListeners
// if (this._listenRelays.size >= this.maxListeners) {
// return
// }

// TODO: iterate over peer store for possible peers to connect if below max?
// for (const peer of this._peerStore.peers.values()) {
// const idB58Str = peer.id.toB58String()
// // TODO: should it avoid to try the peer in question?
// if (peer.id.equals(peerId) || this._listenRelays.has(idB58Str)) {
// continue
// }
// }
}
}

// TODO: be careful about relay connect to relay peer that might create a double relayed conn
// TODO: trigger identify to share new signed peer record

module.exports = AutoRelay
25 changes: 25 additions & 0 deletions src/circuit/circuit/hop.js
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,31 @@ module.exports.hop = async function hop ({
throw errCode(new Error(`HOP request failed with code ${response.code}`), Errors.ERR_HOP_REQUEST_FAILED)
}

/**
* Performs a CAN_HOP request to a relay peer, in order to understand its capabilities.
* @param {object} options
* @param {Connection} options.connection Connection to the relay
* @returns {Promise<void>}
*/
module.exports.canHop = async function canHop ({
connection
}) {
// Create a new stream to the relay
const { stream } = await connection.newStream([multicodec.relay])
// Send the HOP request
const streamHandler = new StreamHandler({ stream })
streamHandler.write({
type: CircuitPB.Type.CAN_HOP
})

const response = await streamHandler.read()
await streamHandler.close()

if (response.code !== CircuitPB.Status.SUCCESS) {
throw errCode(new Error('Relay is not able to hop'), Errors.ERR_CANNOT_HOP)
}
}

/**
* Creates an unencoded CAN_HOP response based on the Circuits configuration
* @private
Expand Down
20 changes: 15 additions & 5 deletions src/circuit/index.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
'use strict'

const debug = require('debug')
const log = debug('libp2p:circuit')
log.error = debug('libp2p:circuit:error')

const mafmt = require('mafmt')
const multiaddr = require('multiaddr')
const PeerId = require('peer-id')
const withIs = require('class-is')
const { CircuitRelay: CircuitPB } = require('./protocol')

const debug = require('debug')
const log = debug('libp2p:circuit')
log.error = debug('libp2p:circuit:error')
const toConnection = require('libp2p-utils/src/stream-to-ma-conn')

const AutoRelay = require('./auto-relay')
const { relay: multicodec } = require('./multicodec')
const createListener = require('./listener')
const { handleCanHop, handleHop, hop } = require('./circuit/hop')
Expand All @@ -35,11 +37,19 @@ class Circuit {
this._libp2p = libp2p
this.peerId = libp2p.peerId
this._registrar.handle(multicodec, this._onProtocol.bind(this))

// Create autoRelay if enabled
this._autoRelay = this._options.autoRelay.enabled && new AutoRelay({ libp2p, ...this._options.autoRelay })
}

async _onProtocol ({ connection, stream, protocol }) {
async _onProtocol ({ connection, stream }) {
const streamHandler = new StreamHandler({ stream })
const request = await streamHandler.read()

if (!request) {
return
}

const circuit = this
let virtualConnection

Expand Down Expand Up @@ -163,7 +173,7 @@ class Circuit {
// Called on successful HOP and STOP requests
this.handler = handler

return createListener(this, options)
return createListener(this._libp2p, options)
}

/**
Expand Down
11 changes: 8 additions & 3 deletions src/circuit/listener.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,18 @@ const log = debug('libp2p:circuit:listener')
log.err = debug('libp2p:circuit:error:listener')

/**
* @param {*} circuit
* @param {Libp2p} libp2p
* @returns {Listener} a transport listener
*/
module.exports = (circuit) => {
module.exports = (libp2p) => {
const listener = new EventEmitter()
const listeningAddrs = new Map()

// Remove listeningAddrs when a peer disconnects
libp2p.connectionManager.on('peer:disconnect', (connection) => {
listeningAddrs.delete(connection.remotePeer.toB58String())
})

/**
* Add swarm handler and listen for incoming connections
*
Expand All @@ -24,7 +29,7 @@ module.exports = (circuit) => {
listener.listen = async (addr) => {
const addrString = String(addr).split('/p2p-circuit').find(a => a !== '')

const relayConn = await circuit._dialer.connectToPeer(multiaddr(addrString))
const relayConn = await libp2p.dial(multiaddr(addrString))
const relayedAddr = relayConn.remoteAddr.encapsulate('/p2p-circuit')

listeningAddrs.set(relayConn.remotePeer.toB58String(), relayedAddr)
Expand Down
4 changes: 4 additions & 0 deletions src/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ const DefaultConfig = {
hop: {
enabled: false,
active: false
},
autoRelay: {
enabled: false,
maxListeners: 2
}
},
transport: {}
Expand Down
1 change: 1 addition & 0 deletions src/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ exports.codes = {
ERR_DISCOVERED_SELF: 'ERR_DISCOVERED_SELF',
ERR_DUPLICATE_TRANSPORT: 'ERR_DUPLICATE_TRANSPORT',
ERR_ENCRYPTION_FAILED: 'ERR_ENCRYPTION_FAILED',
ERR_CANNOT_HOP: 'ERR_CANNOT_HOP',
ERR_HOP_REQUEST_FAILED: 'ERR_HOP_REQUEST_FAILED',
ERR_INVALID_KEY: 'ERR_INVALID_KEY',
ERR_INVALID_MESSAGE: 'ERR_INVALID_MESSAGE',
Expand Down
5 changes: 3 additions & 2 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -444,8 +444,9 @@ class Libp2p extends EventEmitter {
}

async _onStarting () {
// Listen on the provided transports
await this.transportManager.listen()
// Listen on the provided transports for the provided addresses
const addrs = this.addressManager.getListenAddrs()
await this.transportManager.listen(addrs)

// Start PeerStore
await this.peerStore.start()
Expand Down
7 changes: 4 additions & 3 deletions src/transport-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,12 @@ class TransportManager {
/**
* Starts listeners for each listen Multiaddr.
* @async
* @param {Array<Multiaddr>} addrs addresses to attempt to listen on
*/
async listen () {
const addrs = this.libp2p.addressManager.getListenAddrs()
async listen (addrs) {
// const addrs = this.libp2p.addressManager.getListenAddrs()

if (addrs.length === 0) {
if (!addrs || addrs.length === 0) {
log('no addresses were provided for listening, this node is dial only')
return
}
Expand Down
Loading

0 comments on commit aa5a596

Please sign in to comment.