From c3039a0e56fcb230dd64aca3f0839a51a3d32010 Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Wed, 16 Sep 2020 16:43:09 +0200 Subject: [PATCH] feat: auto relay (#723) * feat: auto relay * fix: leverage protoBook events to ask relay peers if they support hop * chore: refactor disconnect * chore: do not listen on a relayed conn * chore: tweaks * chore: improve _listenOnAvailableHopRelays logic * chore: default value of 1 to maxListeners on auto-relay --- src/circuit/auto-relay.js | 231 +++++++++++ src/circuit/circuit/hop.js | 27 ++ src/circuit/index.js | 20 +- src/circuit/listener.js | 16 +- src/config.js | 4 + src/identify/index.js | 6 +- src/index.js | 9 +- src/peer-store/address-book.js | 2 +- src/transport-manager.js | 7 +- test/relay/auto-relay.node.js | 455 ++++++++++++++++++++++ test/{dialing => relay}/relay.node.js | 4 +- test/transports/transport-manager.node.js | 4 +- test/transports/transport-manager.spec.js | 2 +- 13 files changed, 762 insertions(+), 25 deletions(-) create mode 100644 src/circuit/auto-relay.js create mode 100644 test/relay/auto-relay.node.js rename test/{dialing => relay}/relay.node.js (97%) diff --git a/src/circuit/auto-relay.js b/src/circuit/auto-relay.js new file mode 100644 index 0000000000..5617e94eff --- /dev/null +++ b/src/circuit/auto-relay.js @@ -0,0 +1,231 @@ +'use strict' + +const debug = require('debug') +const log = debug('libp2p:auto-relay') +log.error = debug('libp2p:auto-relay:error') + +const uint8ArrayFromString = require('uint8arrays/from-string') +const uint8ArrayToString = require('uint8arrays/to-string') +const multiaddr = require('multiaddr') +const PeerId = require('peer-id') + +const { relay: multicodec } = require('./multicodec') +const { canHop } = require('./circuit/hop') + +const circuitProtoCode = 290 +const hopMetadataKey = 'hop_relay' +const hopMetadataValue = 'true' + +class AutoRelay { + /** + * Creates an instance of AutoRelay. + * @constructor + * @param {object} props + * @param {Libp2p} props.libp2p + * @param {number} [props.maxListeners = 1] maximum number of relays to listen. + */ + constructor ({ libp2p, maxListeners = 1 }) { + this._libp2p = libp2p + this._peerId = libp2p.peerId + this._peerStore = libp2p.peerStore + this._connectionManager = libp2p.connectionManager + this._transportManager = libp2p.transportManager + + this.maxListeners = maxListeners + + /** + * @type {Set} + */ + this._listenRelays = new Set() + + this._onProtocolChange = this._onProtocolChange.bind(this) + this._onPeerDisconnected = this._onPeerDisconnected.bind(this) + + this._peerStore.on('change:protocols', this._onProtocolChange) + this._connectionManager.on('peer:disconnect', this._onPeerDisconnected) + } + + /** + * Check if a peer supports the relay protocol. + * If the protocol is not supported, check if it was supported before and remove it as a listen relay. + * If the protocol is supported, check if the peer supports **HOP** and add it as a listener if + * inside the threshold. + * @param {Object} props + * @param {PeerId} props.peerId + * @param {Array} props.protocols + * @return {Promise} + */ + async _onProtocolChange ({ peerId, protocols }) { + const id = peerId.toB58String() + + // Check if it has the protocol + const hasProtocol = protocols.find(protocol => protocol === multicodec) + + // If no protocol, check if we were keeping the peer before as a listenRelay + if (!hasProtocol && this._listenRelays.has(id)) { + this._removeListenRelay(id) + return + } else if (!hasProtocol || this._listenRelays.has(id)) { + return + } + + // If protocol, check if can hop, store info in the metadataBook and listen on it + try { + const connection = this._connectionManager.get(peerId) + + // Do not hop on a relayed connection + if (connection.remoteAddr.protoCodes().includes(circuitProtoCode)) { + log(`relayed connection to ${id} will not be used to hop on`) + return + } + + const supportsHop = await canHop({ connection }) + + if (supportsHop) { + this._peerStore.metadataBook.set(peerId, hopMetadataKey, uint8ArrayFromString(hopMetadataValue)) + await this._addListenRelay(connection, id) + } + } catch (err) { + log.error(err) + } + } + + /** + * Peer disconnects. + * @param {Connection} connection connection to the peer + * @return {void} + */ + _onPeerDisconnected (connection) { + const peerId = connection.remotePeer + const id = peerId.toB58String() + + // Not listening on this relay + if (!this._listenRelays.has(id)) { + return + } + + this._removeListenRelay(id) + } + + /** + * Attempt to listen on the given relay connection. + * @private + * @param {Connection} connection connection to the peer + * @param {string} id peer identifier string + * @return {Promise} + */ + async _addListenRelay (connection, id) { + // Check if already listening on enough relays + if (this._listenRelays.size >= this.maxListeners) { + return + } + + // Create relay listen addr + let listenAddr, remoteMultiaddr + + try { + const remoteAddrs = this._peerStore.addressBook.get(connection.remotePeer) + // TODO: HOP Relays should avoid advertising private addresses! + remoteMultiaddr = remoteAddrs.find(a => a.isCertified).multiaddr // Get first announced address certified + } catch (_) { + log.error(`${id} does not have announced certified multiaddrs`) + return + } + + if (!remoteMultiaddr.protoNames().includes('p2p')) { + listenAddr = `${remoteMultiaddr.toString()}/p2p/${connection.remotePeer.toB58String()}/p2p-circuit` + } else { + listenAddr = `${remoteMultiaddr.toString()}/p2p-circuit` + } + + // Attempt to listen on relay + this._listenRelays.add(id) + + try { + await this._transportManager.listen([multiaddr(listenAddr)]) + // TODO: push announce multiaddrs update + // await this._libp2p.identifyService.pushToPeerStore() + } catch (err) { + log.error(err) + this._listenRelays.delete(id) + } + } + + /** + * Remove listen relay. + * @private + * @param {string} id peer identifier string. + * @return {void} + */ + _removeListenRelay (id) { + if (this._listenRelays.delete(id)) { + // TODO: this should be responsibility of the connMgr + this._listenOnAvailableHopRelays([id]) + } + } + + /** + * Try to listen on available hop relay connections. + * The following order will happen while we do not have enough relays. + * 1. Check the metadata store for known relays, try to listen on the ones we are already connected. + * 2. Dial and try to listen on the peers we know that support hop but are not connected. + * 3. Search the network. + * @param {Array} [peersToIgnore] + * @return {Promise} + */ + async _listenOnAvailableHopRelays (peersToIgnore = []) { + // TODO: The peer redial issue on disconnect should be handled by connection gating + // Check if already listening on enough relays + if (this._listenRelays.size >= this.maxListeners) { + return + } + + const knownHopsToDial = [] + + // Check if we have known hop peers to use and attempt to listen on the already connected + for (const [id, metadataMap] of this._peerStore.metadataBook.data.entries()) { + // Continue to next if listening on this or peer to ignore + if (this._listenRelays.has(id) || peersToIgnore.includes(id)) { + continue + } + + const supportsHop = metadataMap.get(hopMetadataKey) + + // Continue to next if it does not support Hop + if (!supportsHop || uint8ArrayToString(supportsHop) !== hopMetadataValue) { + continue + } + + const peerId = PeerId.createFromCID(id) + const connection = this._connectionManager.get(peerId) + + // If not connected, store for possible later use. + if (!connection) { + knownHopsToDial.push(peerId) + continue + } + + await this._addListenRelay(connection, id) + + // Check if already listening on enough relays + if (this._listenRelays.size >= this.maxListeners) { + return + } + } + + // Try to listen on known peers that are not connected + for (const peerId of knownHopsToDial) { + const connection = await this._libp2p.dial(peerId) + await this._addListenRelay(connection, peerId.toB58String()) + + // Check if already listening on enough relays + if (this._listenRelays.size >= this.maxListeners) { + return + } + } + + // TODO: Try to find relays to hop on the network + } +} + +module.exports = AutoRelay diff --git a/src/circuit/circuit/hop.js b/src/circuit/circuit/hop.js index c8ac0fddb0..f52648a429 100644 --- a/src/circuit/circuit/hop.js +++ b/src/circuit/circuit/hop.js @@ -117,6 +117,33 @@ module.exports.hop = async function hop ({ throw errCode(new Error(`HOP request failed with code ${response.code}`), Errors.ERR_HOP_REQUEST_FAILED) } +/** + * Performs a CAN_HOP request to a relay peer, in order to understand its capabilities. + * @param {object} options + * @param {Connection} options.connection Connection to the relay + * @returns {Promise} + */ +module.exports.canHop = async function canHop ({ + connection +}) { + // Create a new stream to the relay + const { stream } = await connection.newStream([multicodec.relay]) + // Send the HOP request + const streamHandler = new StreamHandler({ stream }) + streamHandler.write({ + type: CircuitPB.Type.CAN_HOP + }) + + const response = await streamHandler.read() + await streamHandler.close() + + if (response.code !== CircuitPB.Status.SUCCESS) { + return false + } + + return true +} + /** * Creates an unencoded CAN_HOP response based on the Circuits configuration * @private diff --git a/src/circuit/index.js b/src/circuit/index.js index baf10e370e..df3799fa74 100644 --- a/src/circuit/index.js +++ b/src/circuit/index.js @@ -1,16 +1,18 @@ 'use strict' +const debug = require('debug') +const log = debug('libp2p:circuit') +log.error = debug('libp2p:circuit:error') + const mafmt = require('mafmt') const multiaddr = require('multiaddr') const PeerId = require('peer-id') const withIs = require('class-is') const { CircuitRelay: CircuitPB } = require('./protocol') -const debug = require('debug') -const log = debug('libp2p:circuit') -log.error = debug('libp2p:circuit:error') const toConnection = require('libp2p-utils/src/stream-to-ma-conn') +const AutoRelay = require('./auto-relay') const { relay: multicodec } = require('./multicodec') const createListener = require('./listener') const { handleCanHop, handleHop, hop } = require('./circuit/hop') @@ -35,11 +37,19 @@ class Circuit { this._libp2p = libp2p this.peerId = libp2p.peerId this._registrar.handle(multicodec, this._onProtocol.bind(this)) + + // Create autoRelay if enabled + this._autoRelay = this._options.autoRelay.enabled && new AutoRelay({ libp2p, ...this._options.autoRelay }) } - async _onProtocol ({ connection, stream, protocol }) { + async _onProtocol ({ connection, stream }) { const streamHandler = new StreamHandler({ stream }) const request = await streamHandler.read() + + if (!request) { + return + } + const circuit = this let virtualConnection @@ -163,7 +173,7 @@ class Circuit { // Called on successful HOP and STOP requests this.handler = handler - return createListener(this, options) + return createListener(this._libp2p, options) } /** diff --git a/src/circuit/listener.js b/src/circuit/listener.js index 055b4ef3d4..2790918116 100644 --- a/src/circuit/listener.js +++ b/src/circuit/listener.js @@ -8,13 +8,23 @@ const log = debug('libp2p:circuit:listener') log.err = debug('libp2p:circuit:error:listener') /** - * @param {*} circuit + * @param {Libp2p} libp2p * @returns {Listener} a transport listener */ -module.exports = (circuit) => { +module.exports = (libp2p) => { const listener = new EventEmitter() const listeningAddrs = new Map() + // Remove listeningAddrs when a peer disconnects + libp2p.connectionManager.on('peer:disconnect', (connection) => { + const deleted = listeningAddrs.delete(connection.remotePeer.toB58String()) + + if (deleted) { + // TODO push announce multiaddrs update + // libp2p.identifyService.pushToPeerStore() + } + }) + /** * Add swarm handler and listen for incoming connections * @@ -24,7 +34,7 @@ module.exports = (circuit) => { listener.listen = async (addr) => { const addrString = String(addr).split('/p2p-circuit').find(a => a !== '') - const relayConn = await circuit._dialer.connectToPeer(multiaddr(addrString)) + const relayConn = await libp2p.dial(multiaddr(addrString)) const relayedAddr = relayConn.remoteAddr.encapsulate('/p2p-circuit') listeningAddrs.set(relayConn.remotePeer.toB58String(), relayedAddr) diff --git a/src/config.js b/src/config.js index 7e3d630efe..a3905cfe18 100644 --- a/src/config.js +++ b/src/config.js @@ -54,6 +54,10 @@ const DefaultConfig = { hop: { enabled: false, active: false + }, + autoRelay: { + enabled: false, + maxListeners: 2 } }, transport: {} diff --git a/src/identify/index.js b/src/identify/index.js index a313ad0e3f..5bd4a51ba0 100644 --- a/src/identify/index.js +++ b/src/identify/index.js @@ -119,12 +119,12 @@ class IdentifyService { /** * Calls `push` for all peers in the `peerStore` that are connected - * @param {PeerStore} peerStore + * @returns {void} */ - pushToPeerStore (peerStore) { + pushToPeerStore () { const connections = [] let connection - for (const peer of peerStore.peers.values()) { + for (const peer of this.peerStore.peers.values()) { if (peer.protocols.includes(MULTICODEC_IDENTIFY_PUSH) && (connection = this.connectionManager.get(peer.id))) { connections.push(connection) } diff --git a/src/index.js b/src/index.js index 8ff4e0f7ec..1b2f65fc1d 100644 --- a/src/index.js +++ b/src/index.js @@ -424,7 +424,7 @@ class Libp2p extends EventEmitter { // Only push if libp2p is running if (this.isStarted() && this.identifyService) { - this.identifyService.pushToPeerStore(this.peerStore) + this.identifyService.pushToPeerStore() } } @@ -441,13 +441,14 @@ class Libp2p extends EventEmitter { // Only push if libp2p is running if (this.isStarted() && this.identifyService) { - this.identifyService.pushToPeerStore(this.peerStore) + this.identifyService.pushToPeerStore() } } async _onStarting () { - // Listen on the provided transports - await this.transportManager.listen() + // Listen on the provided transports for the provided addresses + const addrs = this.addressManager.getListenAddrs() + await this.transportManager.listen(addrs) // Start PeerStore await this.peerStore.start() diff --git a/src/peer-store/address-book.js b/src/peer-store/address-book.js index 0036a0ce2b..606505e217 100644 --- a/src/peer-store/address-book.js +++ b/src/peer-store/address-book.js @@ -260,7 +260,7 @@ class AddressBook extends Book { * Get the known data of a provided peer. * @override * @param {PeerId} peerId - * @returns {Array} + * @returns {Array
|undefined} */ get (peerId) { if (!PeerId.isPeerId(peerId)) { diff --git a/src/transport-manager.js b/src/transport-manager.js index beeb6bb6e2..5030c594b5 100644 --- a/src/transport-manager.js +++ b/src/transport-manager.js @@ -131,11 +131,10 @@ class TransportManager { /** * Starts listeners for each listen Multiaddr. * @async + * @param {Array} addrs addresses to attempt to listen on */ - async listen () { - const addrs = this.libp2p.addressManager.getListenAddrs() - - if (addrs.length === 0) { + async listen (addrs) { + if (!addrs || addrs.length === 0) { log('no addresses were provided for listening, this node is dial only') return } diff --git a/test/relay/auto-relay.node.js b/test/relay/auto-relay.node.js new file mode 100644 index 0000000000..2a4ba20d57 --- /dev/null +++ b/test/relay/auto-relay.node.js @@ -0,0 +1,455 @@ +'use strict' +/* eslint-env mocha */ + +const chai = require('chai') +chai.use(require('dirty-chai')) +const { expect } = chai + +const delay = require('delay') +const pWaitFor = require('p-wait-for') +const sinon = require('sinon') + +const multiaddr = require('multiaddr') +const Libp2p = require('../../src') +const { relay: relayMulticodec } = require('../../src/circuit/multicodec') + +const { createPeerId } = require('../utils/creators/peer') +const baseOptions = require('../utils/base-options') + +const listenAddr = '/ip4/0.0.0.0/tcp/0' + +describe('auto-relay', () => { + describe('basics', () => { + let libp2p + let relayLibp2p + let autoRelay + + beforeEach(async () => { + const peerIds = await createPeerId({ number: 2 }) + // Create 2 nodes, and turn HOP on for the relay + ;[libp2p, relayLibp2p] = peerIds.map((peerId, index) => { + const opts = { + ...baseOptions, + config: { + ...baseOptions.config, + relay: { + hop: { + enabled: index !== 0 + }, + autoRelay: { + enabled: true, + maxListeners: 1 + } + } + } + } + + return new Libp2p({ + ...opts, + addresses: { + listen: [listenAddr] + }, + connectionManager: { + autoDial: false + }, + peerDiscovery: { + autoDial: false + }, + peerId + }) + }) + + autoRelay = libp2p.transportManager._transports.get('Circuit')._autoRelay + + expect(autoRelay.maxListeners).to.eql(1) + }) + + beforeEach(() => { + // Start each node + return Promise.all([libp2p, relayLibp2p].map(libp2p => libp2p.start())) + }) + + afterEach(() => { + // Stop each node + return Promise.all([libp2p, relayLibp2p].map(libp2p => libp2p.stop())) + }) + + it('should ask if node supports hop on protocol change (relay protocol) and add to listen multiaddrs', async () => { + // Spy if a connected peer is being added as listen relay + sinon.spy(autoRelay, '_addListenRelay') + + const originalMultiaddrsLength = relayLibp2p.multiaddrs.length + + // Discover relay + libp2p.peerStore.addressBook.add(relayLibp2p.peerId, relayLibp2p.multiaddrs) + await libp2p.dial(relayLibp2p.peerId) + + // Wait for peer added as listen relay + await pWaitFor(() => autoRelay._addListenRelay.callCount === 1) + expect(autoRelay._listenRelays.size).to.equal(1) + + // Wait for listen multiaddr update + await pWaitFor(() => libp2p.multiaddrs.length === originalMultiaddrsLength + 1) + expect(libp2p.multiaddrs[originalMultiaddrsLength].getPeerId()).to.eql(relayLibp2p.peerId.toB58String()) + + // Peer has relay multicodec + const knownProtocols = libp2p.peerStore.protoBook.get(relayLibp2p.peerId) + expect(knownProtocols).to.include(relayMulticodec) + }) + }) + + describe('flows with 1 listener max', () => { + let libp2p + let relayLibp2p1 + let relayLibp2p2 + let relayLibp2p3 + let autoRelay1 + + beforeEach(async () => { + const peerIds = await createPeerId({ number: 4 }) + // Create 4 nodes, and turn HOP on for the relay + ;[libp2p, relayLibp2p1, relayLibp2p2, relayLibp2p3] = peerIds.map((peerId, index) => { + let opts = baseOptions + + if (index !== 0) { + opts = { + ...baseOptions, + config: { + ...baseOptions.config, + relay: { + hop: { + enabled: true + }, + autoRelay: { + enabled: true, + maxListeners: 1 + } + } + } + } + } + + return new Libp2p({ + ...opts, + addresses: { + listen: [listenAddr] + }, + connectionManager: { + autoDial: false + }, + peerDiscovery: { + autoDial: false + }, + peerId + }) + }) + + autoRelay1 = relayLibp2p1.transportManager._transports.get('Circuit')._autoRelay + + expect(autoRelay1.maxListeners).to.eql(1) + }) + + beforeEach(() => { + // Start each node + return Promise.all([libp2p, relayLibp2p1, relayLibp2p2, relayLibp2p3].map(libp2p => libp2p.start())) + }) + + afterEach(() => { + // Stop each node + return Promise.all([libp2p, relayLibp2p1, relayLibp2p2, relayLibp2p3].map(libp2p => libp2p.stop())) + }) + + it('should ask if node supports hop on protocol change (relay protocol) and add to listen multiaddrs', async () => { + // Spy if a connected peer is being added as listen relay + sinon.spy(autoRelay1, '_addListenRelay') + + // Discover relay + relayLibp2p1.peerStore.addressBook.add(relayLibp2p2.peerId, relayLibp2p2.multiaddrs) + + const originalMultiaddrs1Length = relayLibp2p1.multiaddrs.length + const originalMultiaddrs2Length = relayLibp2p2.multiaddrs.length + + await relayLibp2p1.dial(relayLibp2p2.peerId) + + // Wait for peer added as listen relay + await pWaitFor(() => autoRelay1._addListenRelay.callCount === 1) + expect(autoRelay1._listenRelays.size).to.equal(1) + + // Wait for listen multiaddr update + await Promise.all([ + pWaitFor(() => relayLibp2p1.multiaddrs.length === originalMultiaddrs1Length + 1), + pWaitFor(() => relayLibp2p2.multiaddrs.length === originalMultiaddrs2Length + 1) + ]) + expect(relayLibp2p1.multiaddrs[originalMultiaddrs1Length].getPeerId()).to.eql(relayLibp2p2.peerId.toB58String()) + + // Peer has relay multicodec + const knownProtocols = relayLibp2p1.peerStore.protoBook.get(relayLibp2p2.peerId) + expect(knownProtocols).to.include(relayMulticodec) + }) + + it('should be able to dial a peer from its relayed address previously added', async () => { + const originalMultiaddrs1Length = relayLibp2p1.multiaddrs.length + const originalMultiaddrs2Length = relayLibp2p2.multiaddrs.length + + // Discover relay + relayLibp2p1.peerStore.addressBook.add(relayLibp2p2.peerId, relayLibp2p2.multiaddrs) + + await relayLibp2p1.dial(relayLibp2p2.peerId) + + // Wait for listen multiaddr update + await Promise.all([ + pWaitFor(() => relayLibp2p1.multiaddrs.length === originalMultiaddrs1Length + 1), + pWaitFor(() => relayLibp2p2.multiaddrs.length === originalMultiaddrs2Length + 1) + ]) + expect(relayLibp2p1.multiaddrs[originalMultiaddrs1Length].getPeerId()).to.eql(relayLibp2p2.peerId.toB58String()) + + // Dial from the other through a relay + const relayedMultiaddr2 = multiaddr(`${relayLibp2p1.multiaddrs[0]}/p2p/${relayLibp2p1.peerId.toB58String()}/p2p-circuit`) + libp2p.peerStore.addressBook.add(relayLibp2p2.peerId, [relayedMultiaddr2]) + + await libp2p.dial(relayLibp2p2.peerId) + }) + + it('should only add maxListeners relayed addresses', async () => { + const originalMultiaddrs1Length = relayLibp2p1.multiaddrs.length + const originalMultiaddrs2Length = relayLibp2p2.multiaddrs.length + + // Spy if a connected peer is being added as listen relay + sinon.spy(autoRelay1, '_addListenRelay') + sinon.spy(autoRelay1._listenRelays, 'add') + + // Discover one relay and connect + relayLibp2p1.peerStore.addressBook.add(relayLibp2p2.peerId, relayLibp2p2.multiaddrs) + await relayLibp2p1.dial(relayLibp2p2.peerId) + + expect(relayLibp2p1.connectionManager.size).to.eql(1) + + // Wait for peer added as listen relay + await pWaitFor(() => autoRelay1._addListenRelay.callCount === 1 && autoRelay1._listenRelays.add.callCount === 1) + expect(autoRelay1._listenRelays.size).to.equal(1) + + // Wait for listen multiaddr update + await Promise.all([ + pWaitFor(() => relayLibp2p1.multiaddrs.length === originalMultiaddrs1Length + 1), + pWaitFor(() => relayLibp2p2.multiaddrs.length === originalMultiaddrs2Length + 1) + ]) + expect(relayLibp2p1.multiaddrs[originalMultiaddrs1Length].getPeerId()).to.eql(relayLibp2p2.peerId.toB58String()) + + // Relay2 has relay multicodec + const knownProtocols2 = relayLibp2p1.peerStore.protoBook.get(relayLibp2p2.peerId) + expect(knownProtocols2).to.include(relayMulticodec) + + // Discover an extra relay and connect + relayLibp2p1.peerStore.addressBook.add(relayLibp2p3.peerId, relayLibp2p3.multiaddrs) + await relayLibp2p1.dial(relayLibp2p3.peerId) + + // Wait to guarantee the dialed peer is not added as a listen relay + await delay(300) + + expect(autoRelay1._addListenRelay.callCount).to.equal(2) + expect(autoRelay1._listenRelays.add.callCount).to.equal(1) + expect(autoRelay1._listenRelays.size).to.equal(1) + expect(relayLibp2p1.connectionManager.size).to.eql(2) + + // Relay2 has relay multicodec + const knownProtocols3 = relayLibp2p1.peerStore.protoBook.get(relayLibp2p3.peerId) + expect(knownProtocols3).to.include(relayMulticodec) + }) + + it('should not listen on a relayed address if peer disconnects', async () => { + const originalMultiaddrs1Length = relayLibp2p1.multiaddrs.length + + // Discover one relay and connect + relayLibp2p1.peerStore.addressBook.add(relayLibp2p2.peerId, relayLibp2p2.multiaddrs) + await relayLibp2p1.dial(relayLibp2p2.peerId) + + // Wait for listenning on the relay + await pWaitFor(() => relayLibp2p1.multiaddrs.length === originalMultiaddrs1Length + 1) + expect(autoRelay1._listenRelays.size).to.equal(1) + expect(relayLibp2p1.multiaddrs[originalMultiaddrs1Length].getPeerId()).to.eql(relayLibp2p2.peerId.toB58String()) + + // Spy if identify push is fired + sinon.spy(relayLibp2p1.identifyService, 'pushToPeerStore') + + // Disconnect from peer used for relay + await relayLibp2p1.hangUp(relayLibp2p2.peerId) + + // Wait for removed listening on the relay + await pWaitFor(() => relayLibp2p1.multiaddrs.length === originalMultiaddrs1Length) + expect(autoRelay1._listenRelays.size).to.equal(0) + // TODO: identify-push expect(relayLibp2p1.identifyService.pushToPeerStore.callCount).to.equal(1) + }) + + it('should try to listen on other connected peers relayed address if one used relay disconnects', async () => { + const originalMultiaddrs1Length = relayLibp2p1.multiaddrs.length + + // Spy if a connected peer is being added as listen relay + sinon.spy(autoRelay1, '_addListenRelay') + sinon.spy(relayLibp2p1.transportManager, 'listen') + + // Discover one relay and connect + relayLibp2p1.peerStore.addressBook.add(relayLibp2p2.peerId, relayLibp2p2.multiaddrs) + await relayLibp2p1.dial(relayLibp2p2.peerId) + + // Discover an extra relay and connect + relayLibp2p1.peerStore.addressBook.add(relayLibp2p3.peerId, relayLibp2p3.multiaddrs) + await relayLibp2p1.dial(relayLibp2p3.peerId) + + // Wait for both peer to be attempted to added as listen relay + await pWaitFor(() => autoRelay1._addListenRelay.callCount === 1) + expect(autoRelay1._listenRelays.size).to.equal(1) + expect(relayLibp2p1.connectionManager.size).to.equal(2) + + // Wait for listen multiaddr update + await pWaitFor(() => relayLibp2p1.multiaddrs.length === originalMultiaddrs1Length + 1) + expect(relayLibp2p1.multiaddrs[originalMultiaddrs1Length].getPeerId()).to.eql(relayLibp2p2.peerId.toB58String()) + + // Only one will be used for listeninng + expect(relayLibp2p1.transportManager.listen.callCount).to.equal(1) + + // Spy if relay from listen map was removed + sinon.spy(autoRelay1._listenRelays, 'delete') + + // Disconnect from peer used for relay + await relayLibp2p1.hangUp(relayLibp2p2.peerId) + expect(autoRelay1._listenRelays.delete.callCount).to.equal(1) + expect(autoRelay1._addListenRelay.callCount).to.equal(1) + + // Wait for other peer connected to be added as listen addr + await pWaitFor(() => relayLibp2p1.transportManager.listen.callCount === 2) + expect(autoRelay1._listenRelays.size).to.equal(1) + expect(relayLibp2p1.connectionManager.size).to.eql(1) + + // Wait for listen multiaddr update + await pWaitFor(() => relayLibp2p1.multiaddrs.length === originalMultiaddrs1Length + 1) + expect(relayLibp2p1.multiaddrs[originalMultiaddrs1Length].getPeerId()).to.eql(relayLibp2p3.peerId.toB58String()) + }) + + it('should try to listen on stored peers relayed address if one used relay disconnects and there are not enough connected', async () => { + // Spy if a connected peer is being added as listen relay + sinon.spy(autoRelay1, '_addListenRelay') + sinon.spy(relayLibp2p1.transportManager, 'listen') + + // Discover one relay and connect + relayLibp2p1.peerStore.addressBook.add(relayLibp2p2.peerId, relayLibp2p2.multiaddrs) + await relayLibp2p1.dial(relayLibp2p2.peerId) + + // Discover an extra relay and connect to gather its Hop support + relayLibp2p1.peerStore.addressBook.add(relayLibp2p3.peerId, relayLibp2p3.multiaddrs) + await relayLibp2p1.dial(relayLibp2p3.peerId) + + // Wait for both peer to be attempted to added as listen relay + await pWaitFor(() => autoRelay1._addListenRelay.callCount === 2) + expect(autoRelay1._listenRelays.size).to.equal(1) + expect(relayLibp2p1.connectionManager.size).to.equal(2) + + // Only one will be used for listeninng + expect(relayLibp2p1.transportManager.listen.callCount).to.equal(1) + + // Disconnect not used listen relay + await relayLibp2p1.hangUp(relayLibp2p3.peerId) + + expect(autoRelay1._listenRelays.size).to.equal(1) + expect(relayLibp2p1.connectionManager.size).to.equal(1) + + // Spy on dial + sinon.spy(relayLibp2p1, 'dial') + + // Remove peer used as relay from peerStore and disconnect it + relayLibp2p1.peerStore.delete(relayLibp2p2.peerId) + await relayLibp2p1.hangUp(relayLibp2p2.peerId) + expect(autoRelay1._listenRelays.size).to.equal(0) + expect(relayLibp2p1.connectionManager.size).to.equal(0) + + // Wait for other peer connected to be added as listen addr + await pWaitFor(() => relayLibp2p1.transportManager.listen.callCount === 2) + expect(autoRelay1._listenRelays.size).to.equal(1) + expect(relayLibp2p1.connectionManager.size).to.eql(1) + }) + }) + + describe('flows with 2 max listeners', () => { + let relayLibp2p1 + let relayLibp2p2 + let relayLibp2p3 + let autoRelay1 + let autoRelay2 + + beforeEach(async () => { + const peerIds = await createPeerId({ number: 3 }) + // Create 3 nodes, and turn HOP on for the relay + ;[relayLibp2p1, relayLibp2p2, relayLibp2p3] = peerIds.map((peerId) => { + return new Libp2p({ + ...baseOptions, + config: { + ...baseOptions.config, + relay: { + ...baseOptions.config.relay, + hop: { + enabled: true + }, + autoRelay: { + enabled: true, + maxListeners: 2 + } + } + }, + addresses: { + listen: [listenAddr] + }, + connectionManager: { + autoDial: false + }, + peerDiscovery: { + autoDial: false + }, + peerId + }) + }) + + autoRelay1 = relayLibp2p1.transportManager._transports.get('Circuit')._autoRelay + autoRelay2 = relayLibp2p2.transportManager._transports.get('Circuit')._autoRelay + }) + + beforeEach(() => { + // Start each node + return Promise.all([relayLibp2p1, relayLibp2p2, relayLibp2p3].map(libp2p => libp2p.start())) + }) + + afterEach(() => { + // Stop each node + return Promise.all([relayLibp2p1, relayLibp2p2, relayLibp2p3].map(libp2p => libp2p.stop())) + }) + + it('should not add listener to a already relayed connection', async () => { + // Spy if a connected peer is being added as listen relay + sinon.spy(autoRelay1, '_addListenRelay') + sinon.spy(autoRelay2, '_addListenRelay') + + // Relay 1 discovers Relay 3 and connect + relayLibp2p1.peerStore.addressBook.add(relayLibp2p3.peerId, relayLibp2p3.multiaddrs) + await relayLibp2p1.dial(relayLibp2p3.peerId) + + // Wait for peer added as listen relay + await pWaitFor(() => autoRelay1._addListenRelay.callCount === 1) + expect(autoRelay1._listenRelays.size).to.equal(1) + + // Relay 2 discovers Relay 3 and connect + relayLibp2p2.peerStore.addressBook.add(relayLibp2p3.peerId, relayLibp2p3.multiaddrs) + await relayLibp2p2.dial(relayLibp2p3.peerId) + + // Wait for peer added as listen relay + await pWaitFor(() => autoRelay2._addListenRelay.callCount === 1) + expect(autoRelay2._listenRelays.size).to.equal(1) + + // Relay 1 discovers Relay 2 relayed multiaddr via Relay 3 + const ma2RelayedBy3 = relayLibp2p2.multiaddrs[relayLibp2p2.multiaddrs.length - 1] + relayLibp2p1.peerStore.addressBook.add(relayLibp2p2.peerId, [ma2RelayedBy3]) + await relayLibp2p1.dial(relayLibp2p2.peerId) + + // Peer not added as listen relay + expect(autoRelay1._addListenRelay.callCount).to.equal(1) + expect(autoRelay1._listenRelays.size).to.equal(1) + }) + }) +}) diff --git a/test/dialing/relay.node.js b/test/relay/relay.node.js similarity index 97% rename from test/dialing/relay.node.js rename to test/relay/relay.node.js index a591940801..67f90a7f98 100644 --- a/test/dialing/relay.node.js +++ b/test/relay/relay.node.js @@ -72,7 +72,7 @@ describe('Dialing (via relay, TCP)', () => { const tcpAddrs = dstLibp2p.transportManager.getAddrs() sinon.stub(dstLibp2p.addressManager, 'listen').value([multiaddr(`/p2p-circuit${relayAddr}/p2p/${relayIdString}`)]) - await dstLibp2p.transportManager.listen() + await dstLibp2p.transportManager.listen(dstLibp2p.addressManager.getListenAddrs()) expect(dstLibp2p.transportManager.getAddrs()).to.have.deep.members([...tcpAddrs, dialAddr.decapsulate('p2p')]) const connection = await srcLibp2p.dial(dialAddr) @@ -157,7 +157,7 @@ describe('Dialing (via relay, TCP)', () => { const tcpAddrs = dstLibp2p.transportManager.getAddrs() sinon.stub(dstLibp2p.addressManager, 'getListenAddrs').returns([multiaddr(`${relayAddr}/p2p-circuit`)]) - await dstLibp2p.transportManager.listen() + await dstLibp2p.transportManager.listen(dstLibp2p.addressManager.getListenAddrs()) expect(dstLibp2p.transportManager.getAddrs()).to.have.deep.members([...tcpAddrs, dialAddr.decapsulate('p2p')]) // Tamper with the our multiaddrs for the circuit message diff --git a/test/transports/transport-manager.node.js b/test/transports/transport-manager.node.js index 1036230acb..6e8cc12aea 100644 --- a/test/transports/transport-manager.node.js +++ b/test/transports/transport-manager.node.js @@ -41,7 +41,7 @@ describe('Transport Manager (TCP)', () => { it('should be able to listen', async () => { tm.add(Transport.prototype[Symbol.toStringTag], Transport) - await tm.listen() + await tm.listen(addrs) expect(tm._listeners).to.have.key(Transport.prototype[Symbol.toStringTag]) expect(tm._listeners.get(Transport.prototype[Symbol.toStringTag])).to.have.length(addrs.length) // Ephemeral ip addresses may result in multiple listeners @@ -52,7 +52,7 @@ describe('Transport Manager (TCP)', () => { it('should be able to dial', async () => { tm.add(Transport.prototype[Symbol.toStringTag], Transport) - await tm.listen() + await tm.listen(addrs) const addr = tm.getAddrs().shift() const connection = await tm.dial(addr) expect(connection).to.exist() diff --git a/test/transports/transport-manager.spec.js b/test/transports/transport-manager.spec.js index b32b280725..9f1bbf434c 100644 --- a/test/transports/transport-manager.spec.js +++ b/test/transports/transport-manager.spec.js @@ -87,7 +87,7 @@ describe('Transport Manager (WebSockets)', () => { it('should fail to listen with no valid address', async () => { tm.add(Transport.prototype[Symbol.toStringTag], Transport) - await expect(tm.listen()) + await expect(tm.listen([listenAddr])) .to.eventually.be.rejected() .and.to.have.property('code', ErrorCodes.ERR_NO_VALID_ADDRESSES) })