Skip to content

Commit

Permalink
chore: create signed peer record on new listen addresses in transport…
Browse files Browse the repository at this point in the history
… manager
  • Loading branch information
vasco-santos committed Oct 7, 2020
1 parent 66139ef commit 436ba47
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 82 deletions.
9 changes: 0 additions & 9 deletions doc/API.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@
* [`libp2p`](#libp2p)
* [`libp2p.connectionManager`](#libp2pconnectionmanager)
* [`libp2p.peerStore`](#libp2ppeerStore)
* [`libp2p.transportManager`](#libp2ptransportmanager)
* [Types](#types)
* [`Stats`](#stats)

Expand Down Expand Up @@ -1986,14 +1985,6 @@ This event will be triggered anytime we are disconnected from another peer, rega
- `peerId`: instance of [`PeerId`][peer-id]
- `protocols`: array of known, supported protocols for the peer (string identifiers)

### libp2p.transportManager

#### Listening addresses change

This event will be triggered anytime the listening addresses change.

`libp2p.transportManager.on('listening', () => {})`

## Types

### Stats
Expand Down
2 changes: 1 addition & 1 deletion src/circuit/listener.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ module.exports = (libp2p) => {

if (deleted) {
// Announce listen addresses change
listener.emit('listening')
listener.emit('close')
}
})

Expand Down
52 changes: 7 additions & 45 deletions src/identify/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,6 @@ class IdentifyService {
*/
this.connectionManager = libp2p.connectionManager

/**
* @property {TransportManager}
*/
this.transportManager = libp2p.transportManager

/**
* @property {PeerId}
*/
Expand All @@ -89,10 +84,11 @@ class IdentifyService {
this.identify(connection, peerId).catch(log.error)
})

// When new addresses are used for listening, update self peer record
this.transportManager.on('listening', async () => {
await this._createSelfPeerRecord()
this.pushToPeerStore()
// When self multiaddrs change, trigger identify-push
this.peerStore.on('change:multiaddrs', ({ peerId }) => {
if (peerId.toString() === this.peerId.toString()) {
this.pushToPeerStore()
}
})
}

Expand All @@ -103,7 +99,7 @@ class IdentifyService {
* @returns {Promise<void>}
*/
async push (connections) {
const signedPeerRecord = await this._getSelfPeerRecord()
const signedPeerRecord = await this.peerStore.addressBook.getRawEnvelope(this.peerId)
const listenAddrs = this._libp2p.multiaddrs.map((ma) => ma.bytes)
const protocols = Array.from(this._protocols.keys())

Expand Down Expand Up @@ -253,7 +249,7 @@ class IdentifyService {
publicKey = this.peerId.pubKey.bytes
}

const signedPeerRecord = await this._getSelfPeerRecord()
const signedPeerRecord = await this.peerStore.addressBook.getRawEnvelope(this.peerId)

const message = Message.encode({
protocolVersion: PROTOCOL_VERSION,
Expand Down Expand Up @@ -323,40 +319,6 @@ class IdentifyService {
// Update the protocols
this.peerStore.protoBook.set(id, message.protocols)
}

/**
* Get self signed peer record raw envelope.
* @return {Promise<Uint8Array>}
*/
_getSelfPeerRecord () {
const selfSignedPeerRecord = this.peerStore.addressBook.getRawEnvelope(this.peerId)

if (selfSignedPeerRecord) {
return selfSignedPeerRecord
}

return this._createSelfPeerRecord()
}

/**
* Create self signed peer record raw envelope.
* @return {Uint8Array}
*/
async _createSelfPeerRecord () {
try {
const peerRecord = new PeerRecord({
peerId: this.peerId,
multiaddrs: this._libp2p.multiaddrs
})
const envelope = await Envelope.seal(peerRecord, this.peerId)
this.peerStore.addressBook.consumePeerRecord(envelope)

return this.peerStore.addressBook.getRawEnvelope(this.peerId)
} catch (err) {
log.error('failed to get self peer record')
}
return null
}
}

module.exports.IdentifyService = IdentifyService
Expand Down
39 changes: 29 additions & 10 deletions src/transport-manager.js
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
'use strict'

const { EventEmitter } = require('events')
const pSettle = require('p-settle')
const { codes } = require('./errors')
const errCode = require('err-code')
const debug = require('debug')
const log = debug('libp2p:transports')
log.error = debug('libp2p:transports:error')

/**
* Responsible for managing the transports and their listeners.
* @fires TransportManager#listening Emitted when listening addresses change.
*/
class TransportManager extends EventEmitter {
const Envelope = require('./record/envelope')
const PeerRecord = require('./record/peer-record')

class TransportManager {
/**
* @class
* @param {object} options
Expand All @@ -21,8 +19,6 @@ class TransportManager extends EventEmitter {
* @param {boolean} [options.faultTolerance = FAULT_TOLERANCE.FATAL_ALL] - Address listen error tolerance.
*/
constructor ({ libp2p, upgrader, faultTolerance = FAULT_TOLERANCE.FATAL_ALL }) {
super()

this.libp2p = libp2p
this.upgrader = upgrader
this._transports = new Map()
Expand Down Expand Up @@ -71,6 +67,7 @@ class TransportManager extends EventEmitter {
while (listeners.length) {
const listener = listeners.pop()
listener.removeAllListeners('listening')
listener.removeAllListeners('close')
tasks.push(listener.close())
}
}
Expand Down Expand Up @@ -164,8 +161,9 @@ class TransportManager extends EventEmitter {
const listener = transport.createListener({}, this.onConnection)
this._listeners.get(key).push(listener)

// Track listen events
listener.on('listening', () => this.emit('listening'))
// Track listen/close events
listener.on('listening', () => this._createSelfPeerRecord())
listener.on('close', () => this._createSelfPeerRecord())

// We need to attempt to listen on everything
tasks.push(listener.listen(addr))
Expand Down Expand Up @@ -212,6 +210,7 @@ class TransportManager extends EventEmitter {
// Close any running listeners
for (const listener of this._listeners.get(key)) {
listener.removeAllListeners('listening')
listener.removeAllListeners('close')
await listener.close()
}
}
Expand All @@ -234,6 +233,26 @@ class TransportManager extends EventEmitter {

await Promise.all(tasks)
}

/**
* Create self signed peer record raw envelope.
* @return {Uint8Array}
*/
async _createSelfPeerRecord () {
try {
const peerRecord = new PeerRecord({
peerId: this.libp2p.peerId,
multiaddrs: this.libp2p.multiaddrs
})
const envelope = await Envelope.seal(peerRecord, this.libp2p.peerId)
this.libp2p.peerStore.addressBook.consumePeerRecord(envelope)

return this.libp2p.peerStore.addressBook.getRawEnvelope(this.libp2p.peerId)
} catch (err) {
log.error('failed to get self peer record')
}
return null
}
}

/**
Expand Down
3 changes: 0 additions & 3 deletions test/dialing/direct.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,6 @@ describe('Dialing (direct, WebSockets)', () => {
const connection = await libp2p.dial(remoteAddr)
expect(connection).to.exist()

sinon.spy(libp2p.peerStore.addressBook, 'consumePeerRecord')
sinon.spy(libp2p.peerStore.protoBook, 'set')

// Wait for onConnection to be called
Expand All @@ -363,8 +362,6 @@ describe('Dialing (direct, WebSockets)', () => {
expect(libp2p.identifyService.identify.callCount).to.equal(1)
await libp2p.identifyService.identify.firstCall.returnValue

// Self + New peer
expect(libp2p.peerStore.addressBook.consumePeerRecord.callCount).to.equal(2)
expect(libp2p.peerStore.protoBook.set.callCount).to.equal(1)
})

Expand Down
39 changes: 26 additions & 13 deletions test/identify/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const { IdentifyService, multicodecs } = require('../../src/identify')
const Peers = require('../fixtures/peers')
const Libp2p = require('../../src')
const Envelope = require('../../src/record/envelope')
const PeerRecord = require('../../src/record/peer-record')
const PeerStore = require('../../src/peer-store')
const baseOptions = require('../utils/base-options.browser')
const pkg = require('../../package.json')
Expand Down Expand Up @@ -52,7 +53,6 @@ describe('Identify', () => {
libp2p: {
peerId: localPeer,
connectionManager: new EventEmitter(),
transportManager: new EventEmitter(),
peerStore: new PeerStore({ peerId: localPeer }),
multiaddrs: listenMaddrs
},
Expand All @@ -63,7 +63,6 @@ describe('Identify', () => {
libp2p: {
peerId: remotePeer,
connectionManager: new EventEmitter(),
transportManager: new EventEmitter(),
peerStore: new PeerStore({ peerId: remotePeer }),
multiaddrs: listenMaddrs
},
Expand All @@ -80,6 +79,9 @@ describe('Identify', () => {
sinon.spy(localIdentify.peerStore.addressBook, 'consumePeerRecord')
sinon.spy(localIdentify.peerStore.protoBook, 'set')

// Transport Manager creates signed peer record
await _createSelfPeerRecord(remoteIdentify._libp2p)

// Run identify
await Promise.all([
localIdentify.identify(localConnectionMock),
Expand Down Expand Up @@ -107,7 +109,6 @@ describe('Identify', () => {
libp2p: {
peerId: localPeer,
connectionManager: new EventEmitter(),
transportManager: new EventEmitter(),
peerStore: new PeerStore({ peerId: localPeer }),
multiaddrs: listenMaddrs
},
Expand All @@ -118,7 +119,6 @@ describe('Identify', () => {
libp2p: {
peerId: remotePeer,
connectionManager: new EventEmitter(),
transportManager: new EventEmitter(),
peerStore: new PeerStore({ peerId: remotePeer }),
multiaddrs: listenMaddrs
},
Expand Down Expand Up @@ -168,7 +168,6 @@ describe('Identify', () => {
libp2p: {
peerId: localPeer,
connectionManager: new EventEmitter(),
transportManager: new EventEmitter(),
peerStore: new PeerStore({ peerId: localPeer }),
multiaddrs: []
},
Expand All @@ -178,7 +177,6 @@ describe('Identify', () => {
libp2p: {
peerId: remotePeer,
connectionManager: new EventEmitter(),
transportManager: new EventEmitter(),
peerStore: new PeerStore({ peerId: remotePeer }),
multiaddrs: []
},
Expand Down Expand Up @@ -216,7 +214,6 @@ describe('Identify', () => {
libp2p: {
peerId: localPeer,
connectionManager: new EventEmitter(),
transportManager: new EventEmitter(),
peerStore: new PeerStore({ peerId: localPeer }),
multiaddrs: listenMaddrs
},
Expand All @@ -230,7 +227,6 @@ describe('Identify', () => {
libp2p: {
peerId: remotePeer,
connectionManager,
transportManager: new EventEmitter(),
peerStore: new PeerStore({ peerId: remotePeer }),
multiaddrs: []
}
Expand All @@ -247,6 +243,10 @@ describe('Identify', () => {
sinon.spy(remoteIdentify.peerStore.addressBook, 'consumePeerRecord')
sinon.spy(remoteIdentify.peerStore.protoBook, 'set')

// Transport Manager creates signed peer record
await _createSelfPeerRecord(localIdentify._libp2p)
await _createSelfPeerRecord(remoteIdentify._libp2p)

// Run identify
await Promise.all([
localIdentify.push([localConnectionMock]),
Expand All @@ -257,7 +257,7 @@ describe('Identify', () => {
})
])

expect(remoteIdentify.peerStore.addressBook.consumePeerRecord.callCount).to.equal(1)
expect(remoteIdentify.peerStore.addressBook.consumePeerRecord.callCount).to.equal(2)
expect(remoteIdentify.peerStore.protoBook.set.callCount).to.equal(1)

const addresses = localIdentify.peerStore.addressBook.get(localPeer)
Expand All @@ -279,7 +279,6 @@ describe('Identify', () => {
libp2p: {
peerId: localPeer,
connectionManager: new EventEmitter(),
transportManager: new EventEmitter(),
peerStore: new PeerStore({ peerId: localPeer }),
multiaddrs: listenMaddrs
},
Expand All @@ -293,7 +292,6 @@ describe('Identify', () => {
libp2p: {
peerId: remotePeer,
connectionManager,
transportManager: new EventEmitter(),
peerStore: new PeerStore({ peerId: remotePeer }),
multiaddrs: []
}
Expand Down Expand Up @@ -369,8 +367,8 @@ describe('Identify', () => {
expect(connection).to.exist()

// Wait for peer store to be updated
// Dialer._createDialTarget (add), Identify (consume), Create self (consume)
await pWaitFor(() => peerStoreSpyConsumeRecord.callCount === 2 && peerStoreSpyAdd.callCount === 1)
// Dialer._createDialTarget (add), Identify (consume)
await pWaitFor(() => peerStoreSpyConsumeRecord.callCount === 1 && peerStoreSpyAdd.callCount === 1)
expect(libp2p.identifyService.identify.callCount).to.equal(1)

// The connection should have no open streams
Expand Down Expand Up @@ -416,3 +414,18 @@ describe('Identify', () => {
})
})
})

// Self peer record creating on Transport Manager simulation
const _createSelfPeerRecord = async (libp2p) => {
try {
const peerRecord = new PeerRecord({
peerId: libp2p.peerId,
multiaddrs: libp2p.multiaddrs
})
const envelope = await Envelope.seal(peerRecord, libp2p.peerId)
libp2p.peerStore.addressBook.consumePeerRecord(envelope)

return libp2p.peerStore.addressBook.getRawEnvelope(libp2p.peerId)
} catch (_) {}
return null
}
Loading

0 comments on commit 436ba47

Please sign in to comment.