diff --git a/src/errors.js b/src/errors.js index 18e600c6dc..079e5c89cd 100644 --- a/src/errors.js +++ b/src/errors.js @@ -3,7 +3,9 @@ exports.messages = { NOT_STARTED_YET: 'The libp2p node is not started yet', DHT_DISABLED: 'DHT is not available', - CONN_ENCRYPTION_REQUIRED: 'At least one connection encryption module is required' + CONN_ENCRYPTION_REQUIRED: 'At least one connection encryption module is required', + ERR_INVALID_ENVELOPE: 'Invalid envelope received', + ERR_INVALID_PEER_RECORD: 'Invalid peer record received' } exports.codes = { @@ -20,6 +22,8 @@ exports.codes = { ERR_DUPLICATE_TRANSPORT: 'ERR_DUPLICATE_TRANSPORT', ERR_ENCRYPTION_FAILED: 'ERR_ENCRYPTION_FAILED', ERR_HOP_REQUEST_FAILED: 'ERR_HOP_REQUEST_FAILED', + ERR_INVALID_ENVELOPE: 'ERR_INVALID_ENVELOPE', + ERR_INVALID_PEER_RECORD: 'ERR_INVALID_PEER_RECORD', ERR_INVALID_KEY: 'ERR_INVALID_KEY', ERR_INVALID_MESSAGE: 'ERR_INVALID_MESSAGE', ERR_INVALID_PARAMETERS: 'ERR_INVALID_PARAMETERS', diff --git a/src/identify/consts.js b/src/identify/consts.js index 58ec077faa..f15769d403 100644 --- a/src/identify/consts.js +++ b/src/identify/consts.js @@ -4,5 +4,9 @@ const libp2pVersion = require('../../package.json').version module.exports.PROTOCOL_VERSION = 'ipfs/0.1.0' module.exports.AGENT_VERSION = `js-libp2p/${libp2pVersion}` -module.exports.MULTICODEC_IDENTIFY = '/ipfs/id/1.0.0' -module.exports.MULTICODEC_IDENTIFY_PUSH = '/ipfs/id/push/1.0.0' +module.exports.MULTICODEC_IDENTIFY = '/p2p/id/1.1.0' +module.exports.MULTICODEC_IDENTIFY_PUSH = '/p2p/id/push/1.1.0' + +// Legacy +module.exports.MULTICODEC_IDENTIFY_LEGACY = '/ipfs/id/1.0.0' +module.exports.MULTICODEC_IDENTIFY_PUSH_LEGACY = '/ipfs/id/push/1.0.0' diff --git a/src/identify/index.js b/src/identify/index.js index 2d38dd5335..8b17352971 100644 --- a/src/identify/index.js +++ b/src/identify/index.js @@ -18,13 +18,17 @@ log.error = debug('libp2p:identify:error') const { MULTICODEC_IDENTIFY, + MULTICODEC_IDENTIFY_LEGACY, MULTICODEC_IDENTIFY_PUSH, + MULTICODEC_IDENTIFY_PUSH_LEGACY, AGENT_VERSION, PROTOCOL_VERSION } = require('./consts') const errCode = require('err-code') -const { codes } = require('../errors') +const { messages, codes } = require('../errors') +const Envelope = require('../record-manager/envelope') +const PeerRecord = require('../record-manager/peer-record') class IdentifyService { /** @@ -89,11 +93,27 @@ class IdentifyService { push (connections) { const pushes = connections.map(async connection => { try { - const { stream } = await connection.newStream(MULTICODEC_IDENTIFY_PUSH) + const { protocol, stream } = await connection.newStream([MULTICODEC_IDENTIFY_PUSH, MULTICODEC_IDENTIFY_PUSH_LEGACY]) + + // Handle Legacy + if (protocol === MULTICODEC_IDENTIFY_PUSH_LEGACY) { + return pipe( + [{ + listenAddrs: this._libp2p.multiaddrs.map((ma) => ma.buffer), + protocols: Array.from(this._protocols.keys()) + }], + pb.encode(Message), + stream, + consume + ) + } + + const envelope = this._libp2p.recordManager.getPeerRecord() + const signedPeerRecord = envelope.marshal() await pipe( [{ - listenAddrs: this._libp2p.multiaddrs.map((ma) => ma.buffer), + signedPeerRecord, protocols: Array.from(this._protocols.keys()) }], pb.encode(Message), @@ -135,7 +155,7 @@ class IdentifyService { * @returns {Promise} */ async identify (connection) { - const { stream } = await connection.newStream(MULTICODEC_IDENTIFY) + const { protocol, stream } = await connection.newStream([MULTICODEC_IDENTIFY, MULTICODEC_IDENTIFY_LEGACY]) const [data] = await pipe( [], stream, @@ -160,7 +180,8 @@ class IdentifyService { publicKey, listenAddrs, protocols, - observedAddr + observedAddr, + signedPeerRecord } = message const id = await PeerId.createFromPubKey(publicKey) @@ -172,8 +193,40 @@ class IdentifyService { // Get the observedAddr if there is one observedAddr = IdentifyService.getCleanMultiaddr(observedAddr) + // LEGACY: differentiate message with SignedPeerRecord + if (protocol === MULTICODEC_IDENTIFY_LEGACY) { + // Update peers data in PeerStore + this.peerStore.addressBook.set(id, listenAddrs.map((addr) => multiaddr(addr))) + this.peerStore.protoBook.set(id, protocols) + + // TODO: Track our observed address so that we can score it + log('received observed address of %s', observedAddr) + + return + } + + // Open envelope and verify if is authenticated + let envelope + try { + envelope = await Envelope.openAndCertify(signedPeerRecord, PeerRecord.DOMAIN) + } catch (err) { + log('received invalid envelope, discard it') + throw errCode(new Error(messages.ERR_INVALID_ENVELOPE), codes.ERR_INVALID_ENVELOPE) + } + + // Decode peer record + let peerRecord + try { + peerRecord = await PeerRecord.createFromProtobuf(envelope.payload) + } catch (err) { + log('received invalid peer record, discard it') + throw errCode(new Error(messages.ERR_INVALID_PEER_RECORD), codes.ERR_INVALID_PEER_RECORD) + } + + // TODO: Store as certified record + // Update peers data in PeerStore - this.peerStore.addressBook.set(id, listenAddrs.map((addr) => multiaddr(addr))) + this.peerStore.addressBook.set(id, peerRecord.multiaddrs.map((addr) => multiaddr(addr))) this.peerStore.protoBook.set(id, protocols) this.peerStore.metadataBook.set(id, 'AgentVersion', Buffer.from(message.agentVersion)) @@ -194,16 +247,20 @@ class IdentifyService { switch (protocol) { case MULTICODEC_IDENTIFY: return this._handleIdentify({ connection, stream }) + case MULTICODEC_IDENTIFY_LEGACY: + return this._handleIdentifyLegacy({ connection, stream }) case MULTICODEC_IDENTIFY_PUSH: return this._handlePush({ connection, stream }) + case MULTICODEC_IDENTIFY_PUSH_LEGACY: + return this._handlePushLegacy({ connection, stream }) default: log.error('cannot handle unknown protocol %s', protocol) } } /** - * Sends the `Identify` response to the requesting peer over the - * given `connection` + * Sends the `Identify` response with the Signed Peer Record + * to the requesting peer over the given `connection` * @private * @param {object} options * @param {*} options.stream @@ -215,6 +272,40 @@ class IdentifyService { publicKey = this.peerId.pubKey.bytes } + const envelope = this._libp2p.recordManager.getPeerRecord() + const signedPeerRecord = envelope.marshal() + + const message = Message.encode({ + protocolVersion: PROTOCOL_VERSION, + agentVersion: AGENT_VERSION, + publicKey, + signedPeerRecord, + observedAddr: connection.remoteAddr.buffer, + protocols: Array.from(this._protocols.keys()) + }) + + pipe( + [message], + lp.encode(), + stream, + consume + ) + } + + /** + * Sends the `Identify` response with listen addresses (LEGACY) + * to the requesting peer over the given `connection` + * @private + * @param {object} options + * @param {*} options.stream + * @param {Connection} options.connection + */ + _handleIdentifyLegacy ({ connection, stream }) { + let publicKey = Buffer.alloc(0) + if (this.peerId.pubKey) { + publicKey = this.peerId.pubKey.bytes + } + const message = Message.encode({ protocolVersion: PROTOCOL_VERSION, agentVersion: AGENT_VERSION, @@ -259,6 +350,63 @@ class IdentifyService { return log.error('received invalid message', err) } + // Open envelope and verify if is authenticated + let envelope + try { + envelope = await Envelope.openAndCertify(message.signedPeerRecord, PeerRecord.DOMAIN) + } catch (err) { + log('received invalid envelope, discard it') + throw errCode(new Error(messages.ERR_INVALID_ENVELOPE), codes.ERR_INVALID_ENVELOPE) + } + + // Decode peer record + let peerRecord + try { + peerRecord = await PeerRecord.createFromProtobuf(envelope.payload) + } catch (err) { + log('received invalid peer record, discard it') + throw errCode(new Error(messages.ERR_INVALID_PEER_RECORD), codes.ERR_INVALID_PEER_RECORD) + } + + // Update peers data in PeerStore + const id = connection.remotePeer + try { + // TODO: Store as certified record + + this.peerStore.addressBook.set(id, peerRecord.multiaddrs.map((addr) => multiaddr(addr))) + } catch (err) { + return log.error('received invalid listen addrs', err) + } + + // Update the protocols + this.peerStore.protoBook.set(id, message.protocols) + } + + /** + * Reads the Identify Push message from the given `connection` + * with listen addresses (LEGACY) + * @private + * @param {object} options + * @param {*} options.stream + * @param {Connection} options.connection + */ + async _handlePushLegacy ({ connection, stream }) { + const [data] = await pipe( + [], + stream, + lp.decode(), + take(1), + toBuffer, + collect + ) + + let message + try { + message = Message.decode(data) + } catch (err) { + return log.error('received invalid message', err) + } + // Update peers data in PeerStore const id = connection.remotePeer try { @@ -279,6 +427,8 @@ module.exports.IdentifyService = IdentifyService */ module.exports.multicodecs = { IDENTIFY: MULTICODEC_IDENTIFY, - IDENTIFY_PUSH: MULTICODEC_IDENTIFY_PUSH + IDENTIFY_LEGACY: MULTICODEC_IDENTIFY_LEGACY, + IDENTIFY_PUSH: MULTICODEC_IDENTIFY_PUSH, + IDENTIFY_PUSH_LEGACY: MULTICODEC_IDENTIFY_PUSH_LEGACY } module.exports.Message = Message diff --git a/src/identify/message.js b/src/identify/message.js index d81b1fd4b4..25b003f5c0 100644 --- a/src/identify/message.js +++ b/src/identify/message.js @@ -24,6 +24,11 @@ message Identify { optional bytes observedAddr = 4; repeated string protocols = 3; + + // signedPeerRecord contains a serialized SignedEnvelope containing a PeerRecord, + // signed by the sending node. It contains the same addresses as the listenAddrs field, but + // in a form that lets us share authenticated addrs with other peers. + optional bytes signedPeerRecord = 8; } ` diff --git a/src/index.js b/src/index.js index 560b084cc0..49f0a9f3f5 100644 --- a/src/index.js +++ b/src/index.js @@ -446,6 +446,9 @@ class Libp2p extends EventEmitter { // Listen on the provided transports await this.transportManager.listen() + // Start record Manager + await this.recordManager.start() + // Start PeerStore await this.peerStore.start() diff --git a/src/record/peer-record/index.js b/src/record/peer-record/index.js index 68f987e060..432f95010b 100644 --- a/src/record/peer-record/index.js +++ b/src/record/peer-record/index.js @@ -95,4 +95,6 @@ PeerRecord.createFromProtobuf = (buf) => { return new PeerRecord({ peerId, multiaddrs, seqNumber }) } +PeerRecord.DOMAIN = ENVELOPE_DOMAIN_PEER_RECORD + module.exports = PeerRecord diff --git a/test/identify/index.spec.js b/test/identify/index.spec.js index a4504be273..401ccdad45 100644 --- a/test/identify/index.spec.js +++ b/test/identify/index.spec.js @@ -14,6 +14,9 @@ const duplexPair = require('it-pair/duplex') const multiaddr = require('multiaddr') const pWaitFor = require('p-wait-for') +const Envelope = require('../../src/record-manager/envelope') +const PeerRecord = require('../../src/record-manager/peer-record') + const { codes: Errors } = require('../../src/errors') const { IdentifyService, multicodecs } = require('../../src/identify') const Peers = require('../fixtures/peers') @@ -23,14 +26,21 @@ const pkg = require('../../package.json') const { MULTIADDRS_WEBSOCKETS } = require('../fixtures/browser') const remoteAddr = MULTIADDRS_WEBSOCKETS[0] +const listenMaddrs = [multiaddr('/ip4/127.0.0.1/tcp/15002/ws')] + +const protocols = new Map([ + [multicodecs.IDENTIFY, () => { }], + [multicodecs.IDENTIFY_PUSH, () => { }] +]) + +const protocolsLegacy = new Map([ + [multicodecs.IDENTIFY_LEGACY, () => { }], + [multicodecs.IDENTIFY_PUSH_LEGACY, () => { }] +]) describe('Identify', () => { - let localPeer - let remotePeer - const protocols = new Map([ - [multicodecs.IDENTIFY, () => {}], - [multicodecs.IDENTIFY_PUSH, () => {}] - ]) + let localPeer, localPeerRecord + let remotePeer, remotePeerRecord before(async () => { [localPeer, remotePeer] = (await Promise.all([ @@ -39,10 +49,77 @@ describe('Identify', () => { ])) }) + // Compute peer records + before(async () => { + // Compute PeerRecords + const localRecord = new PeerRecord({ peerId: localPeer, multiaddrs: listenMaddrs }) + localPeerRecord = await Envelope.seal(localRecord, localPeer) + const remoteRecord = new PeerRecord({ peerId: remotePeer, multiaddrs: listenMaddrs }) + remotePeerRecord = await Envelope.seal(remoteRecord, remotePeer) + }) + afterEach(() => { sinon.restore() }) + it('should be able to identify another peer with legacy protocol', async () => { + const localIdentify = new IdentifyService({ + libp2p: { + peerId: localPeer, + connectionManager: new EventEmitter(), + peerStore: { + addressBook: { + set: () => { } + }, + protoBook: { + set: () => { } + } + }, + multiaddrs: listenMaddrs + }, + protocols: protocolsLegacy + }) + + const remoteIdentify = new IdentifyService({ + libp2p: { + peerId: remotePeer, + connectionManager: new EventEmitter(), + multiaddrs: listenMaddrs + }, + protocols: protocolsLegacy + }) + + const observedAddr = multiaddr('/ip4/127.0.0.1/tcp/1234') + const localConnectionMock = { newStream: () => { }, remotePeer } + const remoteConnectionMock = { remoteAddr: observedAddr } + + const [local, remote] = duplexPair() + sinon.stub(localConnectionMock, 'newStream').returns({ stream: local, protocol: multicodecs.IDENTIFY_LEGACY }) + + sinon.spy(localIdentify.peerStore.addressBook, 'set') + sinon.spy(localIdentify.peerStore.protoBook, 'set') + + // Run identify + await Promise.all([ + localIdentify.identify(localConnectionMock), + remoteIdentify.handleMessage({ + connection: remoteConnectionMock, + stream: remote, + protocol: multicodecs.IDENTIFY_LEGACY + }) + ]) + + expect(localIdentify.peerStore.addressBook.set.callCount).to.equal(1) + expect(localIdentify.peerStore.protoBook.set.callCount).to.equal(1) + + // Validate the remote peer gets updated in the peer store + const call = localIdentify.peerStore.addressBook.set.firstCall + expect(call.args[0].id.bytes).to.equal(remotePeer.bytes) + expect(call.args[1]).to.exist() + expect(call.args[1]).have.lengthOf(listenMaddrs.length) + expect(call.args[1][0].equals(listenMaddrs[0])) + }) + it('should be able to identify another peer', async () => { const localIdentify = new IdentifyService({ libp2p: { @@ -59,15 +136,22 @@ describe('Identify', () => { set: () => { } } }, - multiaddrs: [] + multiaddrs: [], + recordManager: { + getPeerRecord: () => localPeerRecord + } }, protocols }) + const remoteIdentify = new IdentifyService({ libp2p: { peerId: remotePeer, connectionManager: new EventEmitter(), - multiaddrs: [] + multiaddrs: [], + recordManager: { + getPeerRecord: () => remotePeerRecord + } }, protocols }) @@ -104,6 +188,9 @@ describe('Identify', () => { // Validate the remote peer gets updated in the peer store const call = localIdentify.peerStore.addressBook.set.firstCall expect(call.args[0].id.bytes).to.equal(remotePeer.bytes) + expect(call.args[1]).to.exist() + expect(call.args[1]).have.lengthOf(listenMaddrs.length) + expect(call.args[1][0].equals(listenMaddrs[0])) }) it('should throw if identified peer is the wrong peer', async () => { @@ -119,7 +206,10 @@ describe('Identify', () => { set: () => { } } }, - multiaddrs: [] + multiaddrs: [], + recordManager: { + getPeerRecord: () => localPeerRecord + } }, protocols }) @@ -127,7 +217,10 @@ describe('Identify', () => { libp2p: { peerId: remotePeer, connectionManager: new EventEmitter(), - multiaddrs: [] + multiaddrs: [], + recordManager: { + getPeerRecord: () => remotePeerRecord + } }, protocols }) @@ -155,8 +248,7 @@ describe('Identify', () => { }) describe('push', () => { - it('should be able to push identify updates to another peer', async () => { - const listeningAddr = multiaddr('/ip4/127.0.0.1/tcp/1234') + it('should be able to push identify updates to another peer with legacy protocol', async () => { const connectionManager = new EventEmitter() connectionManager.getConnection = () => {} @@ -164,7 +256,73 @@ describe('Identify', () => { libp2p: { peerId: localPeer, connectionManager: new EventEmitter(), - multiaddrs: [listeningAddr] + multiaddrs: listenMaddrs + }, + protocols: new Map([ + [multicodecs.IDENTIFY_LEGACY], + [multicodecs.IDENTIFY_PUSH_LEGACY], + ['/echo/1.0.0'] + ]) + }) + const remoteIdentify = new IdentifyService({ + libp2p: { + peerId: remotePeer, + connectionManager, + peerStore: { + addressBook: { + set: () => { } + }, + protoBook: { + set: () => { } + } + }, + multiaddrs: [] + } + }) + + // Setup peer protocols and multiaddrs + const localProtocols = new Set([multicodecs.IDENTIFY_LEGACY, multicodecs.IDENTIFY_PUSH_LEGACY, '/echo/1.0.0']) + const localConnectionMock = { newStream: () => {} } + const remoteConnectionMock = { remotePeer: localPeer } + + const [local, remote] = duplexPair() + sinon.stub(localConnectionMock, 'newStream').returns({ stream: local, protocol: multicodecs.IDENTIFY_PUSH_LEGACY }) + + sinon.spy(remoteIdentify.peerStore.addressBook, 'set') + sinon.spy(remoteIdentify.peerStore.protoBook, 'set') + + // Run identify + await Promise.all([ + localIdentify.push([localConnectionMock]), + remoteIdentify.handleMessage({ + connection: remoteConnectionMock, + stream: remote, + protocol: multicodecs.IDENTIFY_PUSH_LEGACY + }) + ]) + + expect(remoteIdentify.peerStore.addressBook.set.callCount).to.equal(1) + expect(remoteIdentify.peerStore.protoBook.set.callCount).to.equal(1) + const [peerId, multiaddrs] = remoteIdentify.peerStore.addressBook.set.firstCall.args + expect(peerId.bytes).to.eql(localPeer.bytes) + expect(multiaddrs).to.eql(listenMaddrs) + const [peerId2, protocols] = remoteIdentify.peerStore.protoBook.set.firstCall.args + expect(peerId2.bytes).to.eql(localPeer.bytes) + expect(protocols).to.eql(Array.from(localProtocols)) + }) + + it('should be able to push identify updates to another peer', async () => { + const connectionManager = new EventEmitter() + connectionManager.getConnection = () => { } + + const localIdentify = new IdentifyService({ + libp2p: { + peerId: localPeer, + connectionManager: new EventEmitter(), + multiaddrs: listenMaddrs, + recordManager: { + getPeerRecord: () => localPeerRecord + } }, protocols: new Map([ [multicodecs.IDENTIFY], @@ -184,13 +342,16 @@ describe('Identify', () => { set: () => { } } }, - multiaddrs: [] + multiaddrs: [], + recordManager: { + getPeerRecord: () => remotePeerRecord + } } }) // Setup peer protocols and multiaddrs const localProtocols = new Set([multicodecs.IDENTIFY, multicodecs.IDENTIFY_PUSH, '/echo/1.0.0']) - const localConnectionMock = { newStream: () => {} } + const localConnectionMock = { newStream: () => { } } const remoteConnectionMock = { remotePeer: localPeer } const [local, remote] = duplexPair() @@ -213,7 +374,7 @@ describe('Identify', () => { expect(remoteIdentify.peerStore.protoBook.set.callCount).to.equal(1) const [peerId, multiaddrs] = remoteIdentify.peerStore.addressBook.set.firstCall.args expect(peerId.bytes).to.eql(localPeer.bytes) - expect(multiaddrs).to.eql([listeningAddr]) + expect(multiaddrs).to.eql(listenMaddrs) const [peerId2, protocols] = remoteIdentify.peerStore.protoBook.set.firstCall.args expect(peerId2.bytes).to.eql(localPeer.bytes) expect(protocols).to.eql(Array.from(localProtocols)) @@ -245,6 +406,8 @@ describe('Identify', () => { peerId }) + await libp2p.start() + sinon.spy(libp2p.identifyService, 'identify') const peerStoreSpySet = sinon.spy(libp2p.peerStore.addressBook, 'set') const peerStoreSpyAdd = sinon.spy(libp2p.peerStore.addressBook, 'add') @@ -268,6 +431,8 @@ describe('Identify', () => { peerId }) + await libp2p.start() + sinon.spy(libp2p.identifyService, 'identify') sinon.spy(libp2p.identifyService, 'push')