Skip to content

Commit

Permalink
fix: leverage protoBook events to ask relay peers if they support hop
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos committed Aug 28, 2020
1 parent 80996f9 commit 2c11cca
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 84 deletions.
184 changes: 102 additions & 82 deletions src/circuit/auto-relay.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@ const debug = require('debug')
const log = debug('libp2p:auto-relay')
log.error = debug('libp2p:auto-relay:error')

const uint8ArrayFromString = require('uint8arrays/from-string')
const uint8ArrayToString = require('uint8arrays/to-string')
const multiaddr = require('multiaddr')
const MulticodecTopology = require('libp2p-interfaces/src/topology/multicodec-topology')

const { relay: multicodec } = require('./multicodec')
const { canHop } = require('./circuit/hop')

const hopMetadataKey = 'hop_relay'

class AutoRelay {
/**
* Creates an instance of AutoRelay
Expand All @@ -33,138 +36,155 @@ class AutoRelay {
*/
this._listenRelays = new Set()

this._onPeerConnected = this._onPeerConnected.bind(this)
this._onProtocolChange = this._onProtocolChange.bind(this)
this._onPeerDisconnected = this._onPeerDisconnected.bind(this)

// register protocol with topology
const topology = new MulticodecTopology({
multicodecs: multicodec,
handlers: {
onConnect: this._onPeerConnected,
onDisconnect: this._onPeerDisconnected
}
})
this._registrar.register(topology)
this._peerStore.on('change:protocols', this._onProtocolChange)
this._connectionManager.on('peer:disconnect', this._onPeerDisconnected)

// TODO: should proactively try to connect?
// We need to figure out a more general approach for the autoDial with topologies
// TODO: proactively try to connect via connMgr
}

/**
* Registrar notifies a connection successfully with circuit protocol.
* @private
* @param {PeerId} peerId remote peer-id
* @param {Connection} connection connection to the peer
* Check if a new peer supports the multicodec for the relay.
* @param {Object} props
* @param {PeerId} props.peerId
* @param {Array<string>} props.protocols
* @return {Promise<void>}
*/
async _onPeerConnected (peerId, connection) {
// Check if already listening on enough relays
if (this._listenRelays.size >= this.maxListeners) {
async _onProtocolChange ({ peerId, protocols }) {
const id = peerId.toB58String()

// Check if it has the protocol
const hasProtocol = protocols.find(protocol => protocol === multicodec)

// If no protocol, check if we were keeping the peer before as a listenRelay
if (!hasProtocol && this._listenRelays.has(id)) {
await this._removeListenRelay(id)
this._listenOnAvailableHopRelays()
return
} else if (!hasProtocol || this._listenRelays.has(id)) {
return
}

const idB58Str = peerId.toB58String()
log('connected', idB58Str)
// If protocol, check if can hop, store info in the metadataBook and listen on it
try {
const connection = this._connectionManager.get(peerId)

await canHop({ connection })

// Save peer metadata
this._peerStore.metadataBook.set(peerId, hopMetadataKey, uint8ArrayFromString('true'))

// Check if already listening on the relay
if (this._listenRelays.has(idB58Str)) {
// Listen on relay
await this._addListenRelay(connection, id)
} catch (err) {
log.error(err)
}
}

/**
* Peer disconnects.
* @param {Connection} connection connection to the peer
* @return {Promise<void>}
*/
async _onPeerDisconnected (connection) {
const peerId = connection.remotePeer
const id = peerId.toB58String()

// Not listening on this relay
if (!this._listenRelays.has(id)) {
return
}

await this._addListenRelay(connection, idB58Str)
await this._removeListenRelay(id)
await this._listenOnAvailableHopRelays()
}

/**
* Attempt to listen on the given relay connection.
* @private
* @param {Connection} connection connection to the peer
* @param {string} idB58Str peer id string
* @param {string} id peer identifier string
* @return {Promise<void>}
*/
async _addListenRelay (connection, idB58Str) {
try {
this._listenRelays.add(idB58Str)
// Ask if relay can hop
await canHop({ connection })
async _addListenRelay (connection, id) {
// Check if already listening on enough relays
if (this._listenRelays.size >= this.maxListeners) {
return
}

// Create relay listen addr
const remoteMultiaddr = connection.remoteAddr
let listenAddr
this._listenRelays.add(id)

if (!remoteMultiaddr.protoNames().includes('p2p')) {
listenAddr = `${remoteMultiaddr.toString()}/p2p/${connection.remotePeer.toB58String()}/p2p-circuit/p2p/${this._peerId.toB58String()}`
} else {
listenAddr = `${remoteMultiaddr.toString()}/p2p-circuit/p2p/${this._peerId.toB58String()}`
}
// Create relay listen addr
const remoteMultiaddr = connection.remoteAddr
let listenAddr

// Listen on relay
await this._transportManager.listen([multiaddr(listenAddr)])
if (!remoteMultiaddr.protoNames().includes('p2p')) {
listenAddr = `${remoteMultiaddr.toString()}/p2p/${connection.remotePeer.toB58String()}/p2p-circuit/p2p/${this._peerId.toB58String()}`
} else {
listenAddr = `${remoteMultiaddr.toString()}/p2p-circuit/p2p/${this._peerId.toB58String()}`
}

log('listening on', listenAddr)
// Attempt to listen on relay
try {
await this._transportManager.listen([multiaddr(listenAddr)])
} catch (err) {
log.error(err)
this._listenRelays.delete(idB58Str)
this._listenRelays.delete(id)
}
}

/**
* Registrar notifies a closing connection with circuit protocol.
* Remove listen relay.
* @private
* @param {PeerId} peerId peerId
* @param {Error} err error for connection end
* @returns {Promise<void>}
* @param {string} id peer identifier string.
*/
async _onPeerDisconnected (peerId, err) {
const idB58Str = peerId.toB58String()
_removeListenRelay (id) {
this._listenRelays.delete(id)

// Not listening on this relay
if (!this._listenRelays.has(idB58Str)) {
return
}

log('connection ended', idB58Str, err ? err.message : '')

this._listenRelays.delete(idB58Str)
// TODO: remove listen
// TODO: check if we really need to do this
}

// Listen on alternative relays if available and not listenning on maximum
/**
* Try to listen on available hop relay connections.
* @return {Promise<void>}
*/
async _listenOnAvailableHopRelays () {
// Check if already listening on enough relays
if (this._listenRelays.size >= this.maxListeners) {
return
}

log('try to listen on other connected peers with circuit')

// TODO: change to have a map of backUp relay nodes instead to simplify?
// Iterate over open connections
// Verify if there are available connections to hop
for (const [connection] of this._connectionManager.connections.values()) {
const idB58Str = connection.remotePeer.toB58String()
const protocols = this._peerStore.protoBook.get(connection.remotePeer) || []
const peerId = connection.remotePeer
const id = peerId.toB58String()

// Continue to next if listening on this
if (this._listenRelays.has(id)) {
continue
}

// If has protocol and is not being used, attempt to listen
if (protocols.includes(multicodec) && !this._listenRelays.has(idB58Str)) {
await this._addListenRelay(connection, idB58Str)
const supportsHop = this._peerStore.metadataBook.getValue(peerId, hopMetadataKey)

// Continue to next if it does not support Hop
if (!supportsHop || uint8ArrayToString(supportsHop) !== 'true') {
continue
}

await this._addListenRelay(connection, id)

// Check if already listening on enough relays
if (this._listenRelays.size >= this.maxListeners) {
break
}
}

// Listen on alternative relays if available and not listenning on maxListeners
// if (this._listenRelays.size >= this.maxListeners) {
// return
// }

// TODO: iterate over peer store for possible peers to connect if below max?
// for (const peer of this._peerStore.peers.values()) {
// const idB58Str = peer.id.toB58String()
// // TODO: should it avoid to try the peer in question?
// if (peer.id.equals(peerId) || this._listenRelays.has(idB58Str)) {
// continue
// }
// }
// Auto dial: Iterate peer store...
}
}

// TODO: be careful about relay connect to relay peer that might create a double relayed conn
// TODO: trigger identify to share new signed peer record

module.exports = AutoRelay
6 changes: 4 additions & 2 deletions test/relay/auto-relay.node.js
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ describe('auto-relay', () => {
it('should only add maxListeners relayed addresses', async () => {
// Spy if a connected peer is being added as listen relay
sinon.spy(autoRelay1, '_addListenRelay')
sinon.spy(autoRelay1._listenRelays, 'add')

// Discover one relay and connect
relayLibp2p1.peerStore.addressBook.add(relayLibp2p2.peerId, relayLibp2p2.multiaddrs)
Expand All @@ -133,7 +134,7 @@ describe('auto-relay', () => {
expect(relayLibp2p1.connectionManager.size).to.eql(1)

// Wait for peer added as listen relay
await pWaitFor(() => autoRelay1._addListenRelay.callCount === 1)
await pWaitFor(() => autoRelay1._addListenRelay.callCount === 1 && autoRelay1._listenRelays.add.callCount === 1)
expect(autoRelay1._listenRelays.size).to.equal(1)

// Wait for listen multiaddr update
Expand All @@ -154,7 +155,8 @@ describe('auto-relay', () => {
// Wait to guarantee the dialed peer is not added as a listen relay
await delay(300)

expect(autoRelay1._addListenRelay.callCount).to.equal(1)
expect(autoRelay1._addListenRelay.callCount).to.equal(2)
expect(autoRelay1._listenRelays.add.callCount).to.equal(1)
expect(autoRelay1._listenRelays.size).to.equal(1)
expect(relayLibp2p1.connectionManager.size).to.eql(2)

Expand Down

0 comments on commit 2c11cca

Please sign in to comment.