diff --git a/package.json b/package.json index 408a62f6..624a7108 100644 --- a/package.json +++ b/package.json @@ -39,7 +39,7 @@ "err-code": "^2.0.0", "it-length-prefixed": "^3.0.0", "it-pipe": "^1.0.1", - "libp2p-pubsub": "~0.4.2", + "libp2p-pubsub": "~0.4.5", "p-map": "^4.0.0", "peer-id": "~0.13.3", "peer-info": "~0.17.0", diff --git a/test/2-nodes.spec.js b/test/2-nodes.spec.js index 19b310e5..06086aa1 100644 --- a/test/2-nodes.spec.js +++ b/test/2-nodes.spec.js @@ -299,10 +299,8 @@ describe('2 nodes', () => { } = await createGossipsubNodes(2, true)) }) - after(() => Promise.all(nodes.map((n) => n.stop()))) - - it('existing subscriptions are sent upon peer connection', async function () { - this.timeout(5000) + // Make subscriptions prior to new nodes + before(() => { nodes[0].subscribe('Za') nodes[1].subscribe('Zb') @@ -310,17 +308,42 @@ describe('2 nodes', () => { expectSet(nodes[0].subscriptions, ['Za']) expect(nodes[1].peers.size).to.equal(0) expectSet(nodes[1].subscriptions, ['Zb']) + }) - // Connect nodes - const onConnect0 = registrarRecords[0][multicodec].onConnect - const onConnect1 = registrarRecords[1][multicodec].onConnect + after(() => Promise.all(nodes.map((n) => n.stop()))) - // Notice peers of connection - const [d0, d1] = ConnectionPair() - onConnect0(nodes[1].peerInfo, d0) - onConnect1(nodes[0].peerInfo, d1) + it('existing subscriptions are sent upon peer connection', async function () { + this.timeout(5000) + + const dial = async () => { + // Connect nodes + const onConnect0 = registrarRecords[0][multicodec].onConnect + const onConnect1 = registrarRecords[1][multicodec].onConnect + const handle0 = registrarRecords[0][multicodec].handler + const handle1 = registrarRecords[1][multicodec].handler + + // Notice peers of connection + const [d0, d1] = ConnectionPair() + await onConnect0(nodes[1].peerInfo, d0) + await handle1({ + protocol: multicodec, + stream: d1.stream, + connection: { + remotePeer: nodes[0].peerInfo.id + } + }) + await onConnect1(nodes[0].peerInfo, d1) + await handle0({ + protocol: multicodec, + stream: d0.stream, + connection: { + remotePeer: nodes[1].peerInfo.id + } + }) + } await Promise.all([ + dial(), new Promise((resolve) => nodes[0].once('pubsub:subscription-change', resolve)), new Promise((resolve) => nodes[1].once('pubsub:subscription-change', resolve)) ]) diff --git a/test/floodsub.spec.js b/test/floodsub.spec.js index 05fa6fd8..991b7d7f 100644 --- a/test/floodsub.spec.js +++ b/test/floodsub.spec.js @@ -112,11 +112,27 @@ describe('gossipsub fallbacks to floodsub', () => { const onConnectGs = registrarRecords[0][floodsubMulticodec].onConnect const onConnectFs = registrarRecords[1][floodsubMulticodec].onConnect + const handleGs = registrarRecords[0][floodsubMulticodec].handler + const handleFs = registrarRecords[1][floodsubMulticodec].handler // Notice peers of connection const [d0, d1] = ConnectionPair() - onConnectGs(nodeFs.peerInfo, d0) - onConnectFs(nodeGs.peerInfo, d1) + await onConnectGs(nodeFs.peerInfo, d0) + await handleFs({ + protocol: floodsubMulticodec, + stream: d1.stream, + connection: { + remotePeer: nodeGs.peerInfo.id + } + }) + await onConnectFs(nodeGs.peerInfo, d1) + await handleGs({ + protocol: floodsubMulticodec, + stream: d0.stream, + connection: { + remotePeer: nodeFs.peerInfo.id + } + }) }) after(async function () { @@ -167,11 +183,27 @@ describe('gossipsub fallbacks to floodsub', () => { const onConnectGs = registrarRecords[0][floodsubMulticodec].onConnect const onConnectFs = registrarRecords[1][floodsubMulticodec].onConnect + const handleGs = registrarRecords[0][floodsubMulticodec].handler + const handleFs = registrarRecords[1][floodsubMulticodec].handler // Notice peers of connection const [d0, d1] = ConnectionPair() - onConnectGs(nodeFs.peerInfo, d0) - onConnectFs(nodeGs.peerInfo, d1) + await onConnectGs(nodeFs.peerInfo, d0) + await handleFs({ + protocol: floodsubMulticodec, + stream: d1.stream, + connection: { + remotePeer: nodeGs.peerInfo.id + } + }) + await onConnectFs(nodeGs.peerInfo, d1) + await handleGs({ + protocol: floodsubMulticodec, + stream: d0.stream, + connection: { + remotePeer: nodeFs.peerInfo.id + } + }) nodeGs.subscribe(topic) nodeFs.subscribe(topic) @@ -288,11 +320,27 @@ describe('gossipsub fallbacks to floodsub', () => { const onConnectGs = registrarRecords[0][floodsubMulticodec].onConnect const onConnectFs = registrarRecords[1][floodsubMulticodec].onConnect + const handleGs = registrarRecords[0][floodsubMulticodec].handler + const handleFs = registrarRecords[1][floodsubMulticodec].handler // Notice peers of connection const [d0, d1] = ConnectionPair() await onConnectGs(nodeFs.peerInfo, d0) + await handleFs({ + protocol: floodsubMulticodec, + stream: d1.stream, + connection: { + remotePeer: nodeGs.peerInfo.id + } + }) await onConnectFs(nodeGs.peerInfo, d1) + await handleGs({ + protocol: floodsubMulticodec, + stream: d0.stream, + connection: { + remotePeer: nodeFs.peerInfo.id + } + }) nodeGs.subscribe(topic) nodeFs.subscribe(topic) diff --git a/test/gossip.js b/test/gossip.js index 410a8eca..341bba94 100644 --- a/test/gossip.js +++ b/test/gossip.js @@ -32,7 +32,7 @@ describe('gossip', () => { // add subscriptions to each node nodes.forEach((n) => n.subscribe(topic)) - connectGossipsubNodes(nodes, registrarRecords, multicodec) + await connectGossipsubNodes(nodes, registrarRecords, multicodec) await new Promise((resolve) => setTimeout(resolve, 1000)) @@ -69,7 +69,7 @@ describe('gossip', () => { nodes.forEach((n) => n.subscribe(topic)) // every node connected to every other - connectGossipsubNodes(nodes, registrarRecords, multicodec) + await connectGossipsubNodes(nodes, registrarRecords, multicodec) await new Promise((resolve) => setTimeout(resolve, 500)) // await mesh rebalancing await Promise.all(nodes.map((n) => new Promise((resolve) => n.once('gossipsub:heartbeat', resolve)))) diff --git a/test/mesh.spec.js b/test/mesh.spec.js index f6c40440..f7e0cecc 100644 --- a/test/mesh.spec.js +++ b/test/mesh.spec.js @@ -35,15 +35,31 @@ describe('mesh overlay', () => { // connect N (< GossipsubD) nodes to node0 const N = 4 const onConnect0 = registrarRecords[0][multicodec].onConnect + const handle0 = registrarRecords[0][multicodec].handler for (let i = nodes.length; i > nodes.length - N; i--) { const n = i - 1 const onConnectN = registrarRecords[n][multicodec].onConnect + const handleN = registrarRecords[n][multicodec].handler // Notice peers of connection const [d0, d1] = ConnectionPair() - onConnect0(nodes[n].peerInfo, d0) - onConnectN(nodes[0].peerInfo, d1) + await onConnect0(nodes[n].peerInfo, d0) + await handleN({ + protocol: multicodec, + stream: d1.stream, + connection: { + remotePeer: nodes[0].peerInfo.id + } + }) + await onConnectN(nodes[0].peerInfo, d1) + await handle0({ + protocol: multicodec, + stream: d0.stream, + connection: { + remotePeer: nodes[n].peerInfo.id + } + }) } // await mesh rebalancing diff --git a/test/multiple-nodes.spec.js b/test/multiple-nodes.spec.js index 9cdf60f8..13d68721 100644 --- a/test/multiple-nodes.spec.js +++ b/test/multiple-nodes.spec.js @@ -38,15 +38,46 @@ describe('multiple nodes (more than 2)', () => { const onConnectA = registrarRecords[0][multicodec].onConnect const onConnectB = registrarRecords[1][multicodec].onConnect const onConnectC = registrarRecords[2][multicodec].onConnect + const handleA = registrarRecords[0][multicodec].handler + const handleB = registrarRecords[1][multicodec].handler + const handleC = registrarRecords[2][multicodec].handler // Notice peers of connection const [d0, d1] = ConnectionPair() - onConnectA(b.peerInfo, d0) - onConnectB(a.peerInfo, d1) + await onConnectA(b.peerInfo, d0) + await handleB({ + protocol: multicodec, + stream: d1.stream, + connection: { + remotePeer: a.peerInfo.id + } + }) + await onConnectB(a.peerInfo, d1) + await handleA({ + protocol: multicodec, + stream: d0.stream, + connection: { + remotePeer: b.peerInfo.id + } + }) const [d2, d3] = ConnectionPair() - onConnectB(c.peerInfo, d2) - onConnectC(b.peerInfo, d3) + await onConnectB(c.peerInfo, d2) + await handleC({ + protocol: multicodec, + stream: d3.stream, + connection: { + remotePeer: b.peerInfo.id + } + }) + await onConnectC(b.peerInfo, d3) + await handleB({ + protocol: multicodec, + stream: d2.stream, + connection: { + remotePeer: c.peerInfo.id + } + }) }) after(() => Promise.all(nodes.map((n) => n.stop()))) @@ -103,15 +134,46 @@ describe('multiple nodes (more than 2)', () => { const onConnectA = registrarRecords[0][multicodec].onConnect const onConnectB = registrarRecords[1][multicodec].onConnect const onConnectC = registrarRecords[2][multicodec].onConnect + const handleA = registrarRecords[0][multicodec].handler + const handleB = registrarRecords[1][multicodec].handler + const handleC = registrarRecords[2][multicodec].handler // Notice peers of connection const [d0, d1] = ConnectionPair() - onConnectA(b.peerInfo, d0) - onConnectB(a.peerInfo, d1) + await onConnectA(b.peerInfo, d0) + await handleB({ + protocol: multicodec, + stream: d1.stream, + connection: { + remotePeer: a.peerInfo.id + } + }) + await onConnectB(a.peerInfo, d1) + await handleA({ + protocol: multicodec, + stream: d0.stream, + connection: { + remotePeer: b.peerInfo.id + } + }) const [d2, d3] = ConnectionPair() - onConnectB(c.peerInfo, d2) - onConnectC(b.peerInfo, d3) + await onConnectB(c.peerInfo, d2) + await handleC({ + protocol: multicodec, + stream: d3.stream, + connection: { + remotePeer: b.peerInfo.id + } + }) + await onConnectC(b.peerInfo, d3) + await handleB({ + protocol: multicodec, + stream: d2.stream, + connection: { + remotePeer: c.peerInfo.id + } + }) a.subscribe(topic) b.subscribe(topic) @@ -198,15 +260,46 @@ describe('multiple nodes (more than 2)', () => { const onConnectA = registrarRecords[0][multicodec].onConnect const onConnectB = registrarRecords[1][multicodec].onConnect const onConnectC = registrarRecords[2][multicodec].onConnect + const handleA = registrarRecords[0][multicodec].handler + const handleB = registrarRecords[1][multicodec].handler + const handleC = registrarRecords[2][multicodec].handler // Notice peers of connection const [d0, d1] = ConnectionPair() - onConnectA(b.peerInfo, d0) - onConnectB(a.peerInfo, d1) + await onConnectA(b.peerInfo, d0) + await handleB({ + protocol: multicodec, + stream: d1.stream, + connection: { + remotePeer: a.peerInfo.id + } + }) + await onConnectB(a.peerInfo, d1) + await handleA({ + protocol: multicodec, + stream: d0.stream, + connection: { + remotePeer: b.peerInfo.id + } + }) const [d2, d3] = ConnectionPair() - onConnectB(c.peerInfo, d2) - onConnectC(b.peerInfo, d3) + await onConnectB(c.peerInfo, d2) + await handleC({ + protocol: multicodec, + stream: d3.stream, + connection: { + remotePeer: b.peerInfo.id + } + }) + await onConnectC(b.peerInfo, d3) + await handleB({ + protocol: multicodec, + stream: d2.stream, + connection: { + remotePeer: c.peerInfo.id + } + }) a.subscribe(topic) b.subscribe(topic) @@ -263,23 +356,84 @@ describe('multiple nodes (more than 2)', () => { const onConnectC = registrarRecords[2][multicodec].onConnect const onConnectD = registrarRecords[3][multicodec].onConnect const onConnectE = registrarRecords[4][multicodec].onConnect + const handleA = registrarRecords[0][multicodec].handler + const handleB = registrarRecords[1][multicodec].handler + const handleC = registrarRecords[2][multicodec].handler + const handleD = registrarRecords[3][multicodec].handler + const handleE = registrarRecords[4][multicodec].handler // Notice peers of connection const [d0, d1] = ConnectionPair() - onConnectA(b.peerInfo, d0) - onConnectB(a.peerInfo, d1) + await onConnectA(b.peerInfo, d0) + await handleB({ + protocol: multicodec, + stream: d1.stream, + connection: { + remotePeer: a.peerInfo.id + } + }) + await onConnectB(a.peerInfo, d1) + await handleA({ + protocol: multicodec, + stream: d0.stream, + connection: { + remotePeer: b.peerInfo.id + } + }) const [d2, d3] = ConnectionPair() - onConnectB(c.peerInfo, d2) - onConnectC(b.peerInfo, d3) + await onConnectB(c.peerInfo, d2) + await handleC({ + protocol: multicodec, + stream: d3.stream, + connection: { + remotePeer: b.peerInfo.id + } + }) + await onConnectC(b.peerInfo, d3) + await handleB({ + protocol: multicodec, + stream: d2.stream, + connection: { + remotePeer: c.peerInfo.id + } + }) const [d4, d5] = ConnectionPair() - onConnectC(d.peerInfo, d4) - onConnectD(c.peerInfo, d5) + await onConnectC(d.peerInfo, d4) + await handleD({ + protocol: multicodec, + stream: d5.stream, + connection: { + remotePeer: c.peerInfo.id + } + }) + await onConnectD(c.peerInfo, d5) + await handleC({ + protocol: multicodec, + stream: d4.stream, + connection: { + remotePeer: d.peerInfo.id + } + }) const [d6, d7] = ConnectionPair() - onConnectD(e.peerInfo, d6) - onConnectE(d.peerInfo, d7) + await onConnectD(e.peerInfo, d6) + await handleE({ + protocol: multicodec, + stream: d7.stream, + connection: { + remotePeer: d.peerInfo.id + } + }) + await onConnectE(d.peerInfo, d7) + await handleD({ + protocol: multicodec, + stream: d6.stream, + connection: { + remotePeer: e.peerInfo.id + } + }) a.subscribe(topic) b.subscribe(topic) diff --git a/test/pubsub.spec.js b/test/pubsub.spec.js index 30e4926b..cbf3b9ae 100644 --- a/test/pubsub.spec.js +++ b/test/pubsub.spec.js @@ -6,12 +6,10 @@ const chai = require('chai') chai.use(require('dirty-chai')) const expect = chai.expect const sinon = require('sinon') -const pWaitFor = require('p-wait-for') const { utils } = require('libp2p-pubsub') const { createGossipsub, - createPeerInfo, mockRegistrar } = require('./utils') @@ -122,34 +120,4 @@ describe('Pubsub', () => { }, 500)) }) }) - - describe('process', () => { - it('should disconnect peer on stream error', async () => { - sinon.spy(gossipsub, '_onPeerDisconnected') - - const peerInfo = await createPeerInfo() - const mockConn = { - newStream () { - return { - stream: { - sink: async source => { - for await (const _ of source) { // eslint-disable-line no-unused-vars - // mock stream just swallows any data sent - } - }, - source: (async function * () { // eslint-disable-line require-yield - // throw in a bit - await new Promise(resolve => setTimeout(resolve, 100)) - throw new Error('boom') - })() - } - } - } - } - - gossipsub._onPeerConnected(peerInfo, mockConn) - - await pWaitFor(() => gossipsub._onPeerDisconnected.calledWith(peerInfo), { timeout: 1000 }) - }) - }) }) diff --git a/test/utils/index.js b/test/utils/index.js index 489bdeb5..5f07929b 100644 --- a/test/utils/index.js +++ b/test/utils/index.js @@ -59,17 +59,33 @@ const createGossipsubNodes = async (n, shouldStart, options) => { exports.createGossipsubNodes = createGossipsubNodes -const connectGossipsubNodes = (nodes, registrarRecords, multicodec) => { +const connectGossipsubNodes = async (nodes, registrarRecords, multicodec) => { // connect all nodes for (let i = 0; i < nodes.length; i++) { for (let j = i + 1; j < nodes.length; j++) { const onConnectI = registrarRecords[i][multicodec].onConnect const onConnectJ = registrarRecords[j][multicodec].onConnect + const handleI = registrarRecords[i][multicodec].handler + const handleJ = registrarRecords[j][multicodec].handler // Notice peers of connection const [d0, d1] = ConnectionPair() - onConnectI(nodes[j].peerInfo, d0) - onConnectJ(nodes[i].peerInfo, d1) + await onConnectI(nodes[j].peerInfo, d0) + await handleJ({ + protocol: multicodec, + stream: d1.stream, + connection: { + remotePeer: nodes[i].peerInfo.id + } + }) + await onConnectJ(nodes[i].peerInfo, d1) + await handleI({ + protocol: multicodec, + stream: d0.stream, + connection: { + remotePeer: nodes[j].peerInfo.id + } + }) } }