-
Notifications
You must be signed in to change notification settings - Fork 445
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: auto relay network query for new relays
- Loading branch information
1 parent
f9235e2
commit 9faf1bf
Showing
9 changed files
with
458 additions
and
184 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
'use strict' | ||
|
||
const minute = 60 * 1000 | ||
|
||
module.exports = { | ||
ADVERTISE_BOOT_DELAY: 15 * minute, | ||
ADVERTISE_TTL: 30 * minute, | ||
CIRCUIT_PROTO_CODE: 290, | ||
HOP_METADATA_KEY: 'hop_relay', | ||
HOP_METADATA_VALUE: 'true', | ||
RELAY_RENDEZVOUS_NS: '/libp2p/relay' | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,197 +1,76 @@ | ||
'use strict' | ||
|
||
const debug = require('debug') | ||
const log = debug('libp2p:circuit') | ||
log.error = debug('libp2p:circuit:error') | ||
|
||
const mafmt = require('mafmt') | ||
const multiaddr = require('multiaddr') | ||
const PeerId = require('peer-id') | ||
const withIs = require('class-is') | ||
const { CircuitRelay: CircuitPB } = require('./protocol') | ||
|
||
const toConnection = require('libp2p-utils/src/stream-to-ma-conn') | ||
const log = debug('libp2p:relay') | ||
log.error = debug('libp2p:relay:error') | ||
|
||
const AutoRelay = require('./auto-relay') | ||
const { relay: multicodec } = require('./multicodec') | ||
const createListener = require('./listener') | ||
const { handleCanHop, handleHop, hop } = require('./circuit/hop') | ||
const { handleStop } = require('./circuit/stop') | ||
const StreamHandler = require('./circuit/stream-handler') | ||
|
||
class Circuit { | ||
const { namespaceToCid } = require('./utils') | ||
const { | ||
ADVERTISE_BOOT_DELAY, | ||
ADVERTISE_TTL, | ||
RELAY_RENDEZVOUS_NS | ||
} = require('./constants') | ||
|
||
class Relay { | ||
/** | ||
* Creates an instance of Circuit. | ||
* Creates an instance of Relay. | ||
* | ||
* @class | ||
* @param {object} options | ||
* @param {Libp2p} options.libp2p | ||
* @param {Upgrader} options.upgrader | ||
* @param {Libp2p} libp2p | ||
*/ | ||
constructor ({ libp2p, upgrader }) { | ||
this._dialer = libp2p.dialer | ||
this._registrar = libp2p.registrar | ||
this._connectionManager = libp2p.connectionManager | ||
this._upgrader = upgrader | ||
constructor (libp2p) { | ||
this._options = libp2p._config.relay | ||
this._libp2p = libp2p | ||
this.peerId = libp2p.peerId | ||
this._registrar.handle(multicodec, this._onProtocol.bind(this)) | ||
|
||
// Create autoRelay if enabled | ||
this._autoRelay = this._options.autoRelay.enabled && new AutoRelay({ libp2p, ...this._options.autoRelay }) | ||
} | ||
|
||
async _onProtocol ({ connection, stream }) { | ||
const streamHandler = new StreamHandler({ stream }) | ||
const request = await streamHandler.read() | ||
|
||
if (!request) { | ||
return | ||
} | ||
|
||
const circuit = this | ||
let virtualConnection | ||
|
||
switch (request.type) { | ||
case CircuitPB.Type.CAN_HOP: { | ||
log('received CAN_HOP request from %s', connection.remotePeer.toB58String()) | ||
await handleCanHop({ circuit, connection, streamHandler }) | ||
break | ||
} | ||
case CircuitPB.Type.HOP: { | ||
log('received HOP request from %s', connection.remotePeer.toB58String()) | ||
virtualConnection = await handleHop({ | ||
connection, | ||
request, | ||
streamHandler, | ||
circuit | ||
}) | ||
break | ||
} | ||
case CircuitPB.Type.STOP: { | ||
log('received STOP request from %s', connection.remotePeer.toB58String()) | ||
virtualConnection = await handleStop({ | ||
connection, | ||
request, | ||
streamHandler, | ||
circuit | ||
}) | ||
break | ||
} | ||
default: { | ||
log('Request of type %s not supported', request.type) | ||
} | ||
} | ||
|
||
if (virtualConnection) { | ||
const remoteAddr = multiaddr(request.dstPeer.addrs[0]) | ||
const localAddr = multiaddr(request.srcPeer.addrs[0]) | ||
const maConn = toConnection({ | ||
stream: virtualConnection, | ||
remoteAddr, | ||
localAddr | ||
}) | ||
const type = CircuitPB.Type === CircuitPB.Type.HOP ? 'relay' : 'inbound' | ||
log('new %s connection %s', type, maConn.remoteAddr) | ||
|
||
const conn = await this._upgrader.upgradeInbound(maConn) | ||
log('%s connection %s upgraded', type, maConn.remoteAddr) | ||
this.handler && this.handler(conn) | ||
} | ||
} | ||
|
||
/** | ||
* Dial a peer over a relay | ||
* | ||
* @param {multiaddr} ma - the multiaddr of the peer to dial | ||
* @param {Object} options - dial options | ||
* @param {AbortSignal} [options.signal] - An optional abort signal | ||
* @returns {Connection} - the connection | ||
* Start Relay service. | ||
* @returns {void} | ||
*/ | ||
async dial (ma, options) { | ||
// Check the multiaddr to see if it contains a relay and a destination peer | ||
const addrs = ma.toString().split('/p2p-circuit') | ||
const relayAddr = multiaddr(addrs[0]) | ||
const destinationAddr = multiaddr(addrs[addrs.length - 1]) | ||
const relayPeer = PeerId.createFromCID(relayAddr.getPeerId()) | ||
const destinationPeer = PeerId.createFromCID(destinationAddr.getPeerId()) | ||
|
||
let disconnectOnFailure = false | ||
let relayConnection = this._connectionManager.get(relayPeer) | ||
if (!relayConnection) { | ||
relayConnection = await this._dialer.connectToPeer(relayAddr, options) | ||
disconnectOnFailure = true | ||
} | ||
|
||
try { | ||
const virtualConnection = await hop({ | ||
connection: relayConnection, | ||
circuit: this, | ||
request: { | ||
type: CircuitPB.Type.HOP, | ||
srcPeer: { | ||
id: this.peerId.toBytes(), | ||
addrs: this._libp2p.multiaddrs.map(addr => addr.bytes) | ||
}, | ||
dstPeer: { | ||
id: destinationPeer.toBytes(), | ||
addrs: [multiaddr(destinationAddr).bytes] | ||
} | ||
} | ||
}) | ||
|
||
const localAddr = relayAddr.encapsulate(`/p2p-circuit/p2p/${this.peerId.toB58String()}`) | ||
const maConn = toConnection({ | ||
stream: virtualConnection, | ||
remoteAddr: ma, | ||
localAddr | ||
}) | ||
log('new outbound connection %s', maConn.remoteAddr) | ||
|
||
return this._upgrader.upgradeOutbound(maConn) | ||
} catch (err) { | ||
log.error('Circuit relay dial failed', err) | ||
disconnectOnFailure && await relayConnection.close() | ||
throw err | ||
start () { | ||
// Advertise service if HOP enabled | ||
const canHop = this._options.hop.enabled | ||
|
||
if (canHop) { | ||
this._timeout = setTimeout(() => { | ||
this._advertiseService() | ||
}, this._options.advertise.bootDelay || ADVERTISE_BOOT_DELAY) | ||
} | ||
} | ||
|
||
/** | ||
* Create a listener | ||
* | ||
* @param {any} options | ||
* @param {Function} handler | ||
* @returns {listener} | ||
* Stop Relay service. | ||
* @returns {void} | ||
*/ | ||
createListener (options, handler) { | ||
if (typeof options === 'function') { | ||
handler = options | ||
options = {} | ||
} | ||
|
||
// Called on successful HOP and STOP requests | ||
this.handler = handler | ||
|
||
return createListener(this._libp2p, options) | ||
stop () { | ||
clearTimeout(this._timeout) | ||
} | ||
|
||
/** | ||
* Filter check for all Multiaddrs that this transport can dial on | ||
* | ||
* @param {Array<Multiaddr>} multiaddrs | ||
* @returns {Array<Multiaddr>} | ||
* Advertise hop relay service in the network. | ||
* @returns {Promise<void>} | ||
*/ | ||
filter (multiaddrs) { | ||
multiaddrs = Array.isArray(multiaddrs) ? multiaddrs : [multiaddrs] | ||
async _advertiseService () { | ||
try { | ||
const cid = await namespaceToCid(RELAY_RENDEZVOUS_NS) | ||
await this._libp2p.contentRouting.provide(cid) | ||
} catch (err) { | ||
if (err.code === 'NO_ROUTERS_AVAILABLE') { | ||
log('there are no routers configured to advertise hop relay service') | ||
} else { | ||
log.error(err) | ||
} | ||
} | ||
|
||
return multiaddrs.filter((ma) => { | ||
return mafmt.Circuit.matches(ma) | ||
}) | ||
// Restart timeout | ||
this._timeout = setTimeout(() => { | ||
this._advertiseService() | ||
}, this._options.advertise.ttl || ADVERTISE_TTL) | ||
} | ||
} | ||
|
||
/** | ||
* @type {Circuit} | ||
*/ | ||
module.exports = withIs(Circuit, { className: 'Circuit', symbolName: '@libp2p/js-libp2p-circuit/circuit' }) | ||
module.exports = Relay |
Oops, something went wrong.