Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: auto relay #723

Merged
merged 7 commits into from
Sep 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
231 changes: 231 additions & 0 deletions src/circuit/auto-relay.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
'use strict'

const debug = require('debug')
const log = debug('libp2p:auto-relay')
log.error = debug('libp2p:auto-relay:error')

const uint8ArrayFromString = require('uint8arrays/from-string')
const uint8ArrayToString = require('uint8arrays/to-string')
const multiaddr = require('multiaddr')
const PeerId = require('peer-id')

const { relay: multicodec } = require('./multicodec')
const { canHop } = require('./circuit/hop')

const circuitProtoCode = 290
const hopMetadataKey = 'hop_relay'
const hopMetadataValue = 'true'

class AutoRelay {
/**
* Creates an instance of AutoRelay.
* @constructor
* @param {object} props
* @param {Libp2p} props.libp2p
* @param {number} [props.maxListeners = 1] maximum number of relays to listen.
*/
constructor ({ libp2p, maxListeners = 1 }) {
this._libp2p = libp2p
this._peerId = libp2p.peerId
this._peerStore = libp2p.peerStore
this._connectionManager = libp2p.connectionManager
this._transportManager = libp2p.transportManager

this.maxListeners = maxListeners

/**
* @type {Set<string>}
*/
this._listenRelays = new Set()

this._onProtocolChange = this._onProtocolChange.bind(this)
this._onPeerDisconnected = this._onPeerDisconnected.bind(this)

this._peerStore.on('change:protocols', this._onProtocolChange)
this._connectionManager.on('peer:disconnect', this._onPeerDisconnected)
}

/**
* Check if a peer supports the relay protocol.
* If the protocol is not supported, check if it was supported before and remove it as a listen relay.
* If the protocol is supported, check if the peer supports **HOP** and add it as a listener if
* inside the threshold.
* @param {Object} props
* @param {PeerId} props.peerId
* @param {Array<string>} props.protocols
* @return {Promise<void>}
*/
async _onProtocolChange ({ peerId, protocols }) {
const id = peerId.toB58String()

// Check if it has the protocol
const hasProtocol = protocols.find(protocol => protocol === multicodec)

// If no protocol, check if we were keeping the peer before as a listenRelay
if (!hasProtocol && this._listenRelays.has(id)) {
this._removeListenRelay(id)
return
} else if (!hasProtocol || this._listenRelays.has(id)) {
return
}
jacobheun marked this conversation as resolved.
Show resolved Hide resolved

// If protocol, check if can hop, store info in the metadataBook and listen on it
try {
const connection = this._connectionManager.get(peerId)

// Do not hop on a relayed connection
if (connection.remoteAddr.protoCodes().includes(circuitProtoCode)) {
log(`relayed connection to ${id} will not be used to hop on`)
return
}

const supportsHop = await canHop({ connection })

if (supportsHop) {
this._peerStore.metadataBook.set(peerId, hopMetadataKey, uint8ArrayFromString(hopMetadataValue))
await this._addListenRelay(connection, id)
}
} catch (err) {
log.error(err)
}
}

/**
* Peer disconnects.
* @param {Connection} connection connection to the peer
* @return {void}
*/
_onPeerDisconnected (connection) {
const peerId = connection.remotePeer
const id = peerId.toB58String()

// Not listening on this relay
if (!this._listenRelays.has(id)) {
return
}

this._removeListenRelay(id)
}

/**
* Attempt to listen on the given relay connection.
* @private
* @param {Connection} connection connection to the peer
* @param {string} id peer identifier string
* @return {Promise<void>}
*/
async _addListenRelay (connection, id) {
// Check if already listening on enough relays
if (this._listenRelays.size >= this.maxListeners) {
return
}

// Create relay listen addr
let listenAddr, remoteMultiaddr

try {
const remoteAddrs = this._peerStore.addressBook.get(connection.remotePeer)
// TODO: HOP Relays should avoid advertising private addresses!
remoteMultiaddr = remoteAddrs.find(a => a.isCertified).multiaddr // Get first announced address certified
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't going to be sufficient, we'll need to try and prioritize public addresses for this. It's still pretty common for peers to advertise private addresses. We can make a note to do this in a followup PR though.

HOP relays should really avoid advertising private addresses (we should document this in a "setting up relays" section of the production guides), but we can't rely on this behavior.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we have a milestone for it, but I will add a comment: #699 (comment)

I am not sure yet on the best approach for this, but it is being tracked

} catch (_) {
log.error(`${id} does not have announced certified multiaddrs`)
return
}

if (!remoteMultiaddr.protoNames().includes('p2p')) {
listenAddr = `${remoteMultiaddr.toString()}/p2p/${connection.remotePeer.toB58String()}/p2p-circuit`
} else {
listenAddr = `${remoteMultiaddr.toString()}/p2p-circuit`
}

// Attempt to listen on relay
this._listenRelays.add(id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just do the add after the listen call in the try? Then the delete isn't required if it fails.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, we will block inside the following try ... catch block, waiting for the transportManager.listen.
If we get multiple calls at the same time, all of them will not be blocked from reaching the transportManager.listen and we will probably end up with more listenRelays than needed.
Perhaps, I can use a p-queue instead? Otherwise, I might also cancel others that fail (should not expect transportManager.listen to fail, but it can)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can leave it like this for now. It really shouldn't fail, we're already connected. If it does fail that should mean the connection was dropped during the listen attempt, which should trigger us to connect to another known relay if it exists.


try {
await this._transportManager.listen([multiaddr(listenAddr)])
// TODO: push announce multiaddrs update
// await this._libp2p.identifyService.pushToPeerStore()
} catch (err) {
log.error(err)
this._listenRelays.delete(id)
}
}

/**
* Remove listen relay.
* @private
* @param {string} id peer identifier string.
* @return {void}
*/
_removeListenRelay (id) {
if (this._listenRelays.delete(id)) {
// TODO: this should be responsibility of the connMgr
this._listenOnAvailableHopRelays([id])
}
}

/**
* Try to listen on available hop relay connections.
* The following order will happen while we do not have enough relays.
* 1. Check the metadata store for known relays, try to listen on the ones we are already connected.
* 2. Dial and try to listen on the peers we know that support hop but are not connected.
* 3. Search the network.
* @param {Array<string>} [peersToIgnore]
* @return {Promise<void>}
*/
async _listenOnAvailableHopRelays (peersToIgnore = []) {
// TODO: The peer redial issue on disconnect should be handled by connection gating
// Check if already listening on enough relays
if (this._listenRelays.size >= this.maxListeners) {
return
}

const knownHopsToDial = []

// Check if we have known hop peers to use and attempt to listen on the already connected
for (const [id, metadataMap] of this._peerStore.metadataBook.data.entries()) {
// Continue to next if listening on this or peer to ignore
if (this._listenRelays.has(id) || peersToIgnore.includes(id)) {
continue
}

const supportsHop = metadataMap.get(hopMetadataKey)

// Continue to next if it does not support Hop
if (!supportsHop || uint8ArrayToString(supportsHop) !== hopMetadataValue) {
continue
}

const peerId = PeerId.createFromCID(id)
const connection = this._connectionManager.get(peerId)

// If not connected, store for possible later use.
if (!connection) {
knownHopsToDial.push(peerId)
continue
}

await this._addListenRelay(connection, id)

// Check if already listening on enough relays
if (this._listenRelays.size >= this.maxListeners) {
return
}
}

// Try to listen on known peers that are not connected
for (const peerId of knownHopsToDial) {
const connection = await this._libp2p.dial(peerId)
await this._addListenRelay(connection, peerId.toB58String())

// Check if already listening on enough relays
if (this._listenRelays.size >= this.maxListeners) {
return
}
}

// TODO: Try to find relays to hop on the network
}
}

module.exports = AutoRelay
27 changes: 27 additions & 0 deletions src/circuit/circuit/hop.js
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,33 @@ module.exports.hop = async function hop ({
throw errCode(new Error(`HOP request failed with code ${response.code}`), Errors.ERR_HOP_REQUEST_FAILED)
}

/**
* Performs a CAN_HOP request to a relay peer, in order to understand its capabilities.
* @param {object} options
* @param {Connection} options.connection Connection to the relay
* @returns {Promise<boolean>}
*/
module.exports.canHop = async function canHop ({
connection
}) {
// Create a new stream to the relay
const { stream } = await connection.newStream([multicodec.relay])
// Send the HOP request
const streamHandler = new StreamHandler({ stream })
streamHandler.write({
type: CircuitPB.Type.CAN_HOP
})

const response = await streamHandler.read()
await streamHandler.close()

if (response.code !== CircuitPB.Status.SUCCESS) {
return false
}
jacobheun marked this conversation as resolved.
Show resolved Hide resolved

return true
}

/**
* Creates an unencoded CAN_HOP response based on the Circuits configuration
* @private
Expand Down
20 changes: 15 additions & 5 deletions src/circuit/index.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
'use strict'

const debug = require('debug')
const log = debug('libp2p:circuit')
log.error = debug('libp2p:circuit:error')

const mafmt = require('mafmt')
const multiaddr = require('multiaddr')
const PeerId = require('peer-id')
const withIs = require('class-is')
const { CircuitRelay: CircuitPB } = require('./protocol')

const debug = require('debug')
const log = debug('libp2p:circuit')
log.error = debug('libp2p:circuit:error')
const toConnection = require('libp2p-utils/src/stream-to-ma-conn')

const AutoRelay = require('./auto-relay')
const { relay: multicodec } = require('./multicodec')
const createListener = require('./listener')
const { handleCanHop, handleHop, hop } = require('./circuit/hop')
Expand All @@ -35,11 +37,19 @@ class Circuit {
this._libp2p = libp2p
this.peerId = libp2p.peerId
this._registrar.handle(multicodec, this._onProtocol.bind(this))

// Create autoRelay if enabled
this._autoRelay = this._options.autoRelay.enabled && new AutoRelay({ libp2p, ...this._options.autoRelay })
}

async _onProtocol ({ connection, stream, protocol }) {
async _onProtocol ({ connection, stream }) {
const streamHandler = new StreamHandler({ stream })
const request = await streamHandler.read()

if (!request) {
return
}

const circuit = this
let virtualConnection

Expand Down Expand Up @@ -163,7 +173,7 @@ class Circuit {
// Called on successful HOP and STOP requests
this.handler = handler

return createListener(this, options)
return createListener(this._libp2p, options)
}

/**
Expand Down
16 changes: 13 additions & 3 deletions src/circuit/listener.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,23 @@ const log = debug('libp2p:circuit:listener')
log.err = debug('libp2p:circuit:error:listener')

/**
* @param {*} circuit
* @param {Libp2p} libp2p
* @returns {Listener} a transport listener
*/
module.exports = (circuit) => {
module.exports = (libp2p) => {
const listener = new EventEmitter()
const listeningAddrs = new Map()

// Remove listeningAddrs when a peer disconnects
libp2p.connectionManager.on('peer:disconnect', (connection) => {
const deleted = listeningAddrs.delete(connection.remotePeer.toB58String())

if (deleted) {
// TODO push announce multiaddrs update
// libp2p.identifyService.pushToPeerStore()
}
})

/**
* Add swarm handler and listen for incoming connections
*
Expand All @@ -24,7 +34,7 @@ module.exports = (circuit) => {
listener.listen = async (addr) => {
const addrString = String(addr).split('/p2p-circuit').find(a => a !== '')

const relayConn = await circuit._dialer.connectToPeer(multiaddr(addrString))
const relayConn = await libp2p.dial(multiaddr(addrString))
vasco-santos marked this conversation as resolved.
Show resolved Hide resolved
const relayedAddr = relayConn.remoteAddr.encapsulate('/p2p-circuit')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something to consider here is that in AutoRelay we are creating a listen addr and then calling transportManager.listen to "connect". We will already have a connection per the AutoRelay logic, so that address is going to get thrown out and replaced with whatever this address is. We need to do some reconciliation here, I would find it odd to call listen with one address, and then have my actual address end up being something different.

I think this likely won't be a huge issue for initial relay connections, but if we reconnect to known relays, this address could change. The provided address should be the address we end up with, but if it fails for some reason we will dial other known addresses for the peer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to do some reconciliation here, I would find it odd to call listen with one address, and then have my actual address end up being something different.

That is true! I changed this to avoid creating multiple connections to the same peer, but I agree with your point.
So you suggest that we go back and use the connectToPeer with a fallback to the dial if we fail?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So you suggest that we go back and use the connectToPeer with a fallback to the dial if we fail?

No. We shouldn't care how we're connected to the peer, really, but we need to be careful about the address we're advertising for this. Something like: Prioritize public addresses, if the connected address matches one of those use it, otherwise pick one of the others.

Again, I think we can do a follow PR for this, so we can focus on clear tests for that.


listeningAddrs.set(relayConn.remotePeer.toB58String(), relayedAddr)
Expand Down
4 changes: 4 additions & 0 deletions src/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ const DefaultConfig = {
hop: {
enabled: false,
active: false
},
autoRelay: {
enabled: false,
maxListeners: 2
}
},
transport: {}
Expand Down
6 changes: 3 additions & 3 deletions src/identify/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,12 @@ class IdentifyService {

/**
* Calls `push` for all peers in the `peerStore` that are connected
* @param {PeerStore} peerStore
* @returns {void}
*/
pushToPeerStore (peerStore) {
pushToPeerStore () {
const connections = []
let connection
for (const peer of peerStore.peers.values()) {
for (const peer of this.peerStore.peers.values()) {
if (peer.protocols.includes(MULTICODEC_IDENTIFY_PUSH) && (connection = this.connectionManager.get(peer.id))) {
connections.push(connection)
}
Expand Down
Loading