From 24a16dfc5a4418041ee14ecf6dda8257aececf94 Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Thu, 24 Sep 2020 11:04:05 +0200 Subject: [PATCH] chore: store self protocols in protobook --- doc/API.md | 77 +++++++++++++------- src/identify/index.js | 23 ++++-- src/index.js | 17 ++--- src/peer-store/proto-book.js | 41 ++++++++++- test/identify/index.spec.js | 110 ++++++++++++++++------------- test/peer-store/proto-book.spec.js | 107 ++++++++++++++++++++++++++++ 6 files changed, 279 insertions(+), 96 deletions(-) diff --git a/doc/API.md b/doc/API.md index 4b412dd97b..0807350fdb 100644 --- a/doc/API.md +++ b/doc/API.md @@ -37,6 +37,7 @@ * [`peerStore.protoBook.add`](#peerstoreprotobookadd) * [`peerStore.protoBook.delete`](#peerstoreprotobookdelete) * [`peerStore.protoBook.get`](#peerstoreprotobookget) + * [`peerStore.protoBook.remove`](#peerstoreprotobookremove) * [`peerStore.protoBook.set`](#peerstoreprotobookset) * [`peerStore.delete`](#peerstoredelete) * [`peerStore.get`](#peerstoreget) @@ -841,32 +842,6 @@ Consider using `addressBook.add()` if you're not sure this is what you want to d peerStore.addressBook.add(peerId, multiaddr) ``` -### peerStore.protoBook.add - -Add known `protocols` of a given peer. - -`peerStore.protoBook.add(peerId, protocols)` - -#### Parameters - -| Name | Type | Description | -|------|------|-------------| -| peerId | [`PeerId`][peer-id] | peerId to set | -| protocols | `Array` | protocols to add | - -#### Returns - -| Type | Description | -|------|-------------| -| `ProtoBook` | Returns the Proto Book component | - -#### Example - -```js -peerStore.protoBook.add(peerId, protocols) -``` - - ### peerStore.keyBook.delete Delete the provided peer from the book. @@ -1089,6 +1064,31 @@ Set known metadata of a given `peerId`. peerStore.metadataBook.set(peerId, 'location', uint8ArrayFromString('Berlin')) ``` +### peerStore.protoBook.add + +Add known `protocols` of a given peer. + +`peerStore.protoBook.add(peerId, protocols)` + +#### Parameters + +| Name | Type | Description | +|------|------|-------------| +| peerId | [`PeerId`][peer-id] | peerId to set | +| protocols | `Array` | protocols to add | + +#### Returns + +| Type | Description | +|------|-------------| +| `ProtoBook` | Returns the Proto Book component | + +#### Example + +```js +peerStore.protoBook.add(peerId, protocols) +``` + ### peerStore.protoBook.delete Delete the provided peer from the book. @@ -1145,6 +1145,31 @@ peerStore.protoBook.get(peerId) // [ '/proto/1.0.0', '/proto/1.1.0' ] ``` +### peerStore.protoBook.remove + +Remove given `protocols` of a given peer. + +`peerStore.protoBook.remove(peerId, protocols)` + +#### Parameters + +| Name | Type | Description | +|------|------|-------------| +| peerId | [`PeerId`][peer-id] | peerId to set | +| protocols | `Array` | protocols to remove | + +#### Returns + +| Type | Description | +|------|-------------| +| `ProtoBook` | Returns the Proto Book component | + +#### Example + +```js +peerStore.protoBook.remove(peerId, protocols) +``` + ### peerStore.protoBook.set Set known `protocols` of a given peer. diff --git a/src/identify/index.js b/src/identify/index.js index 53c2cee8f2..351bfbe4c7 100644 --- a/src/identify/index.js +++ b/src/identify/index.js @@ -51,9 +51,8 @@ class IdentifyService { * @class * @param {object} options * @param {Libp2p} options.libp2p - * @param {Map} options.protocols - A reference to the protocols we support */ - constructor ({ libp2p, protocols }) { + constructor ({ libp2p }) { /** * @property {PeerStore} */ @@ -74,10 +73,9 @@ class IdentifyService { */ this._libp2p = libp2p - this._protocols = protocols - this.handleMessage = this.handleMessage.bind(this) + // When a new connection happens, trigger identify this.connectionManager.on('peer:connect', (connection) => { const peerId = connection.remotePeer @@ -90,6 +88,13 @@ class IdentifyService { this.pushToPeerStore() } }) + + // When self protocols change, trigger identify-push + this.peerStore.on('change:protocols', ({ peerId }) => { + if (peerId.toString() === this.peerId.toString()) { + this.pushToPeerStore() + } + }) } /** @@ -101,7 +106,7 @@ class IdentifyService { async push (connections) { const signedPeerRecord = await this.peerStore.addressBook.getRawEnvelope(this.peerId) const listenAddrs = this._libp2p.multiaddrs.map((ma) => ma.bytes) - const protocols = Array.from(this._protocols.keys()) + const protocols = this.peerStore.protoBook.get(this.peerId) || [] const pushes = connections.map(async connection => { try { @@ -132,6 +137,11 @@ class IdentifyService { * @returns {void} */ pushToPeerStore () { + // Do not try to push if libp2p node is not running + if (!this._libp2p.isStarted()) { + return + } + const connections = [] let connection for (const peer of this.peerStore.peers.values()) { @@ -251,6 +261,7 @@ class IdentifyService { } const signedPeerRecord = await this.peerStore.addressBook.getRawEnvelope(this.peerId) + const protocols = this.peerStore.protoBook.get(this.peerId) || [] const message = Message.encode({ protocolVersion: PROTOCOL_VERSION, @@ -259,7 +270,7 @@ class IdentifyService { listenAddrs: this._libp2p.multiaddrs.map((ma) => ma.bytes), signedPeerRecord, observedAddr: connection.remoteAddr.bytes, - protocols: Array.from(this._protocols.keys()) + protocols }) try { diff --git a/src/index.js b/src/index.js index 61e3ad1861..0fcc331596 100644 --- a/src/index.js +++ b/src/index.js @@ -155,10 +155,7 @@ class Libp2p extends EventEmitter { }) // Add the identify service since we can multiplex - this.identifyService = new IdentifyService({ - libp2p: this, - protocols: this.upgrader.protocols - }) + this.identifyService = new IdentifyService({ libp2p: this }) this.handle(Object.values(IDENTIFY_PROTOCOLS), this.identifyService.handleMessage) } @@ -430,10 +427,8 @@ class Libp2p extends EventEmitter { this.upgrader.protocols.set(protocol, handler) }) - // Only push if libp2p is running - if (this.isStarted() && this.identifyService) { - this.identifyService.pushToPeerStore() - } + // Add new protocols to self protocols in the Protobook + this.peerStore.protoBook.add(this.peerId, protocols) } /** @@ -448,10 +443,8 @@ class Libp2p extends EventEmitter { this.upgrader.protocols.delete(protocol) }) - // Only push if libp2p is running - if (this.isStarted() && this.identifyService) { - this.identifyService.pushToPeerStore() - } + // Remove protocols from self protocols in the Protobook + this.peerStore.protoBook.remove(this.peerId, protocols) } async _onStarting () { diff --git a/src/peer-store/proto-book.js b/src/peer-store/proto-book.js index 073b7e47e5..a08f5a284d 100644 --- a/src/peer-store/proto-book.js +++ b/src/peer-store/proto-book.js @@ -112,13 +112,50 @@ class ProtoBook extends Book { return this } - protocols = [...newSet] - this._setData(peerId, newSet) log(`added provided protocols for ${id}`) return this } + + /** + * Removes known protocols of a provided peer. + * If the protocols did not exist before, nothing will be done. + * + * @param {PeerId} peerId + * @param {Array} protocols + * @returns {ProtoBook} + */ + remove (peerId, protocols) { + if (!PeerId.isPeerId(peerId)) { + log.error('peerId must be an instance of peer-id to store data') + throw errcode(new Error('peerId must be an instance of peer-id'), ERR_INVALID_PARAMETERS) + } + + if (!protocols) { + log.error('protocols must be provided to store data') + throw errcode(new Error('protocols must be provided'), ERR_INVALID_PARAMETERS) + } + + const id = peerId.toB58String() + const recSet = this.data.get(id) + + if (recSet) { + const newSet = new Set([ + ...recSet + ].filter((p) => !protocols.includes(p))) + + // Any protocol removed? + if (recSet.size === newSet.size) { + return this + } + + this._setData(peerId, newSet) + log(`removed provided protocols for ${id}`) + } + + return this + } } module.exports = ProtoBook diff --git a/test/identify/index.spec.js b/test/identify/index.spec.js index d3b14273b7..4a5e308c7f 100644 --- a/test/identify/index.spec.js +++ b/test/identify/index.spec.js @@ -29,18 +29,21 @@ const remoteAddr = MULTIADDRS_WEBSOCKETS[0] const listenMaddrs = [multiaddr('/ip4/127.0.0.1/tcp/15002/ws')] describe('Identify', () => { - let localPeer - let remotePeer - const protocols = new Map([ - [multicodecs.IDENTIFY, () => {}], - [multicodecs.IDENTIFY_PUSH, () => {}] - ]) + let localPeer, localPeerStore + let remotePeer, remotePeerStore + const protocols = [multicodecs.IDENTIFY, multicodecs.IDENTIFY_PUSH] before(async () => { [localPeer, remotePeer] = (await Promise.all([ PeerId.createFromJSON(Peers[0]), PeerId.createFromJSON(Peers[1]) ])) + + localPeerStore = new PeerStore({ peerId: localPeer }) + localPeerStore.protoBook.set(localPeer, protocols) + + remotePeerStore = new PeerStore({ peerId: remotePeer }) + remotePeerStore.protoBook.set(remotePeer, protocols) }) afterEach(() => { @@ -52,20 +55,19 @@ describe('Identify', () => { libp2p: { peerId: localPeer, connectionManager: new EventEmitter(), - peerStore: new PeerStore({ peerId: localPeer }), - multiaddrs: listenMaddrs - }, - protocols + peerStore: localPeerStore, + multiaddrs: listenMaddrs, + isStarted: () => true + } }) - const remoteIdentify = new IdentifyService({ libp2p: { peerId: remotePeer, connectionManager: new EventEmitter(), - peerStore: new PeerStore({ peerId: remotePeer }), - multiaddrs: listenMaddrs - }, - protocols + peerStore: remotePeerStore, + multiaddrs: listenMaddrs, + isStarted: () => true + } }) const observedAddr = multiaddr('/ip4/127.0.0.1/tcp/1234') @@ -108,20 +110,20 @@ describe('Identify', () => { libp2p: { peerId: localPeer, connectionManager: new EventEmitter(), - peerStore: new PeerStore({ peerId: localPeer }), - multiaddrs: listenMaddrs - }, - protocols + peerStore: localPeerStore, + multiaddrs: listenMaddrs, + isStarted: () => true + } }) const remoteIdentify = new IdentifyService({ libp2p: { peerId: remotePeer, connectionManager: new EventEmitter(), - peerStore: new PeerStore({ peerId: remotePeer }), - multiaddrs: listenMaddrs - }, - protocols + peerStore: remotePeerStore, + multiaddrs: listenMaddrs, + isStarted: () => true + } }) const observedAddr = multiaddr('/ip4/127.0.0.1/tcp/1234') @@ -167,19 +169,17 @@ describe('Identify', () => { libp2p: { peerId: localPeer, connectionManager: new EventEmitter(), - peerStore: new PeerStore({ peerId: localPeer }), + peerStore: localPeerStore, multiaddrs: [] - }, - protocols + } }) const remoteIdentify = new IdentifyService({ libp2p: { peerId: remotePeer, connectionManager: new EventEmitter(), - peerStore: new PeerStore({ peerId: remotePeer }), + peerStore: remotePeerStore, multiaddrs: [] - }, - protocols + } }) const observedAddr = multiaddr('/ip4/127.0.0.1/tcp/1234') @@ -206,33 +206,38 @@ describe('Identify', () => { describe('push', () => { it('should be able to push identify updates to another peer', async () => { + const storedProtocols = [multicodecs.IDENTIFY, multicodecs.IDENTIFY_PUSH, '/echo/1.0.0'] const connectionManager = new EventEmitter() connectionManager.getConnection = () => { } + const localPeerStore = new PeerStore({ peerId: localPeer }) + localPeerStore.protoBook.set(localPeer, storedProtocols) + const localIdentify = new IdentifyService({ libp2p: { peerId: localPeer, connectionManager: new EventEmitter(), - peerStore: new PeerStore({ peerId: localPeer }), - multiaddrs: listenMaddrs - }, - protocols: new Map([ - [multicodecs.IDENTIFY], - [multicodecs.IDENTIFY_PUSH], - ['/echo/1.0.0'] - ]) + peerStore: localPeerStore, + multiaddrs: listenMaddrs, + isStarted: () => true + } }) + + const remotePeerStore = new PeerStore({ peerId: remotePeer }) + remotePeerStore.protoBook.set(remotePeer, storedProtocols) + const remoteIdentify = new IdentifyService({ libp2p: { peerId: remotePeer, connectionManager, - peerStore: new PeerStore({ peerId: remotePeer }), - multiaddrs: [] + peerStore: remotePeerStore, + multiaddrs: [], + isStarted: () => true } }) // Setup peer protocols and multiaddrs - const localProtocols = new Set([multicodecs.IDENTIFY, multicodecs.IDENTIFY_PUSH, '/echo/1.0.0']) + const localProtocols = new Set(storedProtocols) const localConnectionMock = { newStream: () => { } } const remoteConnectionMock = { remotePeer: localPeer } @@ -271,33 +276,38 @@ describe('Identify', () => { // LEGACY it('should be able to push identify updates to another peer with no certified peer records support', async () => { + const storedProtocols = [multicodecs.IDENTIFY, multicodecs.IDENTIFY_PUSH, '/echo/1.0.0'] const connectionManager = new EventEmitter() connectionManager.getConnection = () => { } + const localPeerStore = new PeerStore({ peerId: localPeer }) + localPeerStore.protoBook.set(localPeer, storedProtocols) + const localIdentify = new IdentifyService({ libp2p: { peerId: localPeer, connectionManager: new EventEmitter(), - peerStore: new PeerStore({ peerId: localPeer }), - multiaddrs: listenMaddrs - }, - protocols: new Map([ - [multicodecs.IDENTIFY], - [multicodecs.IDENTIFY_PUSH], - ['/echo/1.0.0'] - ]) + peerStore: localPeerStore, + multiaddrs: listenMaddrs, + isStarted: () => true + } }) + + const remotePeerStore = new PeerStore({ peerId: remotePeer }) + remotePeerStore.protoBook.set(remotePeer, storedProtocols) + const remoteIdentify = new IdentifyService({ libp2p: { peerId: remotePeer, connectionManager, peerStore: new PeerStore({ peerId: remotePeer }), - multiaddrs: [] + multiaddrs: [], + isStarted: () => true } }) // Setup peer protocols and multiaddrs - const localProtocols = new Set([multicodecs.IDENTIFY, multicodecs.IDENTIFY_PUSH, '/echo/1.0.0']) + const localProtocols = new Set(storedProtocols) const localConnectionMock = { newStream: () => {} } const remoteConnectionMock = { remotePeer: localPeer } diff --git a/test/peer-store/proto-book.spec.js b/test/peer-store/proto-book.spec.js index 15b5199757..262836971f 100644 --- a/test/peer-store/proto-book.spec.js +++ b/test/peer-store/proto-book.spec.js @@ -224,6 +224,113 @@ describe('protoBook', () => { }) }) + describe('protoBook.remove', () => { + let peerStore, pb + + beforeEach(() => { + peerStore = new PeerStore({ peerId }) + pb = peerStore.protoBook + }) + + afterEach(() => { + peerStore.removeAllListeners() + }) + + it('throwns invalid parameters error if invalid PeerId is provided', () => { + expect(() => { + pb.remove('invalid peerId') + }).to.throw(ERR_INVALID_PARAMETERS) + }) + + it('throwns invalid parameters error if no protocols provided', () => { + expect(() => { + pb.remove(peerId) + }).to.throw(ERR_INVALID_PARAMETERS) + }) + + it('removes the given protocol and emits change event', () => { + const defer = pDefer() + + const supportedProtocols = ['protocol1', 'protocol2'] + const removedProtocols = ['protocol1'] + const finalProtocols = supportedProtocols.filter(p => !removedProtocols.includes(p)) + + let changeTrigger = 2 + peerStore.on('change:protocols', ({ protocols }) => { + changeTrigger-- + if (changeTrigger === 0 && arraysAreEqual(protocols, finalProtocols)) { + defer.resolve() + } + }) + + // Replace + pb.set(peerId, supportedProtocols) + let protocols = pb.get(peerId) + expect(protocols).to.have.deep.members(supportedProtocols) + + // Remove + pb.remove(peerId, removedProtocols) + protocols = pb.get(peerId) + expect(protocols).to.have.deep.members(finalProtocols) + + return defer.promise + }) + + it('emits on remove if the content changes', () => { + const defer = pDefer() + + const supportedProtocols = ['protocol1', 'protocol2'] + const removedProtocols = ['protocol2'] + const finalProtocols = supportedProtocols.filter(p => !removedProtocols.includes(p)) + + let changeCounter = 0 + peerStore.on('change:protocols', () => { + changeCounter++ + if (changeCounter > 1) { + defer.resolve() + } + }) + + // set + pb.set(peerId, supportedProtocols) + + // remove (content already existing) + pb.remove(peerId, removedProtocols) + const protocols = pb.get(peerId) + expect(protocols).to.have.deep.members(finalProtocols) + + return defer.promise + }) + + it('does not emit on remove if the content does not change', () => { + const defer = pDefer() + + const supportedProtocols = ['protocol1', 'protocol2'] + const removedProtocols = ['protocol3'] + + let changeCounter = 0 + peerStore.on('change:protocols', () => { + changeCounter++ + if (changeCounter > 1) { + defer.reject() + } + }) + + // set + pb.set(peerId, supportedProtocols) + + // remove + pb.remove(peerId, removedProtocols) + + // Wait 50ms for incorrect second event + setTimeout(() => { + defer.resolve() + }, 50) + + return defer.promise + }) + }) + describe('protoBook.get', () => { let peerStore, pb