From 8de89e2082fd5c89a56a28a73da42ae22f03f4d3 Mon Sep 17 00:00:00 2001 From: Gavin McDermott Date: Fri, 9 Dec 2016 21:29:09 -0800 Subject: [PATCH 1/3] fix: pass subscriptions immediately on new peer connection --- src/dial-floodsub.js | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/dial-floodsub.js b/src/dial-floodsub.js index b98b475..d426599 100644 --- a/src/dial-floodsub.js +++ b/src/dial-floodsub.js @@ -1,6 +1,7 @@ 'use strict' const config = require('./config') +const pb = require('./message') const log = config.log const multicodec = config.multicodec const stream = require('stream') @@ -50,8 +51,19 @@ module.exports = (libp2pNode, peerSet, subscriptions) => { conn ) + // Immediately send my own subscriptions to the newly established conn if (subscriptions.length > 0) { - // TODO send my subscriptions through the new conn + const subopts = subscriptions.map((topic) => { + return { + subscribe: true, + topicCID: topic + } + }) + const rpc = pb.rpc.RPC.encode({ + subscriptions: subopts + }) + + peerSet[idB58Str].stream.write(rpc) } } } From b09cec82598f41542ba06d110ed0276c53fd9df0 Mon Sep 17 00:00:00 2001 From: Gavin McDermott Date: Sat, 10 Dec 2016 14:09:17 -0800 Subject: [PATCH 2/3] test: add test to ensure subs are passed on connection --- package.json | 5 +- test/2-nodes.js | 297 +++++++++++++++++++++++++++++------------------- 2 files changed, 186 insertions(+), 116 deletions(-) diff --git a/package.json b/package.json index 088b292..52e363b 100644 --- a/package.json +++ b/package.json @@ -51,7 +51,8 @@ "peer-info": "^0.7.1", "pre-commit": "^1.1.3", "run-parallel": "^1.1.6", - "run-series": "^1.1.4" + "run-series": "^1.1.4", + "run-waterfall": "1.1.3" }, "dependencies": { "debug": "^2.2.0", @@ -65,4 +66,4 @@ "contributors": [ "David Dias " ] -} \ No newline at end of file +} diff --git a/test/2-nodes.js b/test/2-nodes.js index 872a687..903fee4 100644 --- a/test/2-nodes.js +++ b/test/2-nodes.js @@ -7,6 +7,7 @@ const PeerInfo = require('peer-info') const multiaddr = require('multiaddr') const libp2pIPFS = require('libp2p-ipfs') const parallel = require('run-parallel') +const waterfall = require('run-waterfall') const _times = require('lodash.times') const _values = require('lodash.values') @@ -18,152 +19,220 @@ describe('basics', () => { let psA let psB - before((done) => { - const idA = PeerId.create() - const peerA = new PeerInfo(idA) - peerA.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/0')) - nodeA = new libp2pIPFS.Node(peerA) - - const idB = PeerId.create() - const peerB = new PeerInfo(idB) - peerB.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/0')) - nodeB = new libp2pIPFS.Node(peerB) - - parallel([ - nodeA.start, - nodeB.start - ], done) - }) + describe('no existing pubsub config', () => { + before((done) => { + const idA = PeerId.create() + const peerA = new PeerInfo(idA) + peerA.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/0')) + nodeA = new libp2pIPFS.Node(peerA) + + const idB = PeerId.create() + const peerB = new PeerInfo(idB) + peerB.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/0')) + nodeB = new libp2pIPFS.Node(peerB) + + parallel([ + nodeA.start, + nodeB.start + ], done) + }) - after((done) => { - parallel([ - nodeA.stop, - nodeB.stop - ], done) - }) + after((done) => { + parallel([ + nodeA.stop, + nodeB.stop + ], done) + }) - it('Mount the pubsub protocol', (done) => { - parallel([ - (cb) => { - psA = new PSG(nodeA) - setTimeout(() => { - expect(psA.getPeerSet()).to.eql({}) - expect(psA.getSubscriptions()).to.eql([]) - cb() - }, 50) - }, - (cb) => { - psB = new PSG(nodeB) + it('Mount the pubsub protocol', (done) => { + parallel([ + (cb) => { + psA = new PSG(nodeA) + setTimeout(() => { + expect(psA.getPeerSet()).to.eql({}) + expect(psA.getSubscriptions()).to.eql([]) + cb() + }, 50) + }, + (cb) => { + psB = new PSG(nodeB) + setTimeout(() => { + expect(psB.getPeerSet()).to.eql({}) + expect(psB.getSubscriptions()).to.eql([]) + cb() + }, 50) + } + ], done) + }) + + it('Dial from nodeA to nodeB', (done) => { + nodeA.dialByPeerInfo(nodeB.peerInfo, (err) => { + expect(err).to.not.exist setTimeout(() => { - expect(psB.getPeerSet()).to.eql({}) - expect(psB.getSubscriptions()).to.eql([]) - cb() - }, 50) - } - ], done) - }) + expect(Object.keys(psA.getPeerSet()).length).to.equal(1) + expect(Object.keys(psB.getPeerSet()).length).to.equal(1) + done() + }, 250) + }) + }) - it('Dial from nodeA to nodeB', (done) => { - nodeA.dialByPeerInfo(nodeB.peerInfo, (err) => { - expect(err).to.not.exist + it('Subscribe to a topic:Z in nodeA', (done) => { + psA.subscribe('Z') setTimeout(() => { - expect(Object.keys(psA.getPeerSet()).length).to.equal(1) - expect(Object.keys(psB.getPeerSet()).length).to.equal(1) + expect(psA.getSubscriptions()).to.eql(['Z']) + const peersB = _values(psB.getPeerSet()) + expect(peersB.length).to.equal(1) + expect(peersB[0].topics).to.eql(['Z']) done() - }, 250) + }, 100) }) - }) - it('Subscribe to a topic:Z in nodeA', (done) => { - psA.subscribe('Z') - setTimeout(() => { - expect(psA.getSubscriptions()).to.eql(['Z']) - const peersB = _values(psB.getPeerSet()) - expect(peersB.length).to.equal(1) - expect(peersB[0].topics).to.eql(['Z']) - done() - }, 100) - }) + it('Publish to a topic:Z in nodeA', (done) => { + psB.once('Z', shouldNotHappen) + + function shouldNotHappen (msg) { expect.fail() } - it('Publish to a topic:Z in nodeA', (done) => { - psB.once('Z', shouldNotHappen) + psA.once('Z', (msg) => { + expect(msg.toString()).to.equal('hey') + psB.removeListener('Z', shouldNotHappen) + done() + }) - function shouldNotHappen (msg) { expect.fail() } + psB.once('Z', shouldNotHappen) - psA.once('Z', (msg) => { - expect(msg.toString()).to.equal('hey') - psB.removeListener('Z', shouldNotHappen) - done() + psA.publish('Z', new Buffer('hey')) }) - psB.once('Z', shouldNotHappen) + it('Publish to a topic:Z in nodeB', (done) => { + psB.once('Z', shouldNotHappen) - psA.publish('Z', new Buffer('hey')) - }) + psA.once('Z', (msg) => { + psA.once('Z', shouldNotHappen) + expect(msg.toString()).to.equal('banana') + setTimeout(() => { + psA.removeListener('Z', shouldNotHappen) + psB.removeListener('Z', shouldNotHappen) + done() + }, 100) + }) - it('Publish to a topic:Z in nodeB', (done) => { - psB.once('Z', shouldNotHappen) + psB.once('Z', shouldNotHappen) - psA.once('Z', (msg) => { - psA.once('Z', shouldNotHappen) - expect(msg.toString()).to.equal('banana') - setTimeout(() => { - psA.removeListener('Z', shouldNotHappen) - psB.removeListener('Z', shouldNotHappen) - done() - }, 100) + psB.publish('Z', new Buffer('banana')) }) - psB.once('Z', shouldNotHappen) + it('Publish 10 msg to a topic:Z in nodeB', (done) => { + let counter = 0 - psB.publish('Z', new Buffer('banana')) - }) + psB.once('Z', shouldNotHappen) - it('Publish 10 msg to a topic:Z in nodeB', (done) => { - let counter = 0 + psA.on('Z', receivedMsg) - psB.once('Z', shouldNotHappen) + function receivedMsg (msg) { + expect(msg.toString()).to.equal('banana') - psA.on('Z', receivedMsg) + if (++counter === 10) { + psA.removeListener('Z', receivedMsg) + done() + } + } - function receivedMsg (msg) { - expect(msg.toString()).to.equal('banana') + _times(10, () => { + psB.publish('Z', new Buffer('banana')) + }) + }) - if (++counter === 10) { - psA.removeListener('Z', receivedMsg) - done() - } - } + it('Unsubscribe from topic:Z in nodeA', (done) => { + psA.unsubscribe('Z') + expect(psA.getSubscriptions()).to.eql([]) - _times(10, () => { - psB.publish('Z', new Buffer('banana')) + setTimeout(() => { + const peersB = _values(psB.getPeerSet()) + expect(peersB.length).to.equal(1) + expect(peersB[0].topics).to.eql([]) + done() + }, 100) }) - }) - it('Unsubscribe from topic:Z in nodeA', (done) => { - psA.unsubscribe('Z') - expect(psA.getSubscriptions()).to.eql([]) + it('Publish to a topic:Z in nodeA nodeB', (done) => { + psA.once('Z', shouldNotHappen) + psB.once('Z', shouldNotHappen) + + setTimeout(() => { + psA.removeListener('Z', shouldNotHappen) + psB.removeListener('Z', shouldNotHappen) + done() + }, 100) - setTimeout(() => { - const peersB = _values(psB.getPeerSet()) - expect(peersB.length).to.equal(1) - expect(peersB[0].topics).to.eql([]) - done() - }, 100) + psB.publish('Z', new Buffer('banana')) + psA.publish('Z', new Buffer('banana')) + }) }) - it('Publish to a topic:Z in nodeA nodeB', (done) => { - psA.once('Z', shouldNotHappen) - psB.once('Z', shouldNotHappen) + describe('existing pubsub config', () => { + before((done) => { + const idA = PeerId.create() + const peerA = new PeerInfo(idA) + peerA.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/0')) + nodeA = new libp2pIPFS.Node(peerA) + + const idB = PeerId.create() + const peerB = new PeerInfo(idB) + peerB.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/0')) + nodeB = new libp2pIPFS.Node(peerB) + + waterfall([ + nodeA.start, + nodeB.start, + (cb) => { + psA = new PSG(nodeA) + psA.subscribe('Za') + setTimeout(() => { + expect(psA.getPeerSet()).to.eql({}) + expect(psA.getSubscriptions()).to.eql(['Za']) + cb() + }, 50) + }, + (cb) => { + psB = new PSG(nodeB) + psB.subscribe('Zb') + setTimeout(() => { + expect(psB.getPeerSet()).to.eql({}) + expect(psB.getSubscriptions()).to.eql(['Zb']) + cb() + }, 50) + } + ], done) + }) - setTimeout(() => { - psA.removeListener('Z', shouldNotHappen) - psB.removeListener('Z', shouldNotHappen) - done() - }, 100) + after((done) => { + parallel([ + nodeA.stop, + nodeB.stop + ], done) + }) - psB.publish('Z', new Buffer('banana')) - psA.publish('Z', new Buffer('banana')) + it('Existing subscriptions are sent upon peer connection', (done) => { + nodeA.dialByPeerInfo(nodeB.peerInfo, (err) => { + expect(err).to.not.exist + setTimeout(() => { + expect(Object.keys(psA.getPeerSet()).length).to.equal(1) + expect(Object.keys(psB.getPeerSet()).length).to.equal(1) + + expect(psA.getSubscriptions()).to.eql(['Za']) + const peersB = _values(psB.getPeerSet()) + expect(peersB.length).to.equal(1) + expect(peersB[0].topics).to.eql(['Za']) + + expect(psB.getSubscriptions()).to.eql(['Zb']) + const peersA = _values(psA.getPeerSet()) + expect(peersA.length).to.equal(1) + expect(peersA[0].topics).to.eql(['Zb']) + + done() + }, 250) + }) + }) }) }) From eb9d72b0cfef57f09aee17a6bc9d5f87b7e56b2a Mon Sep 17 00:00:00 2001 From: Gavin McDermott Date: Sat, 10 Dec 2016 14:29:00 -0800 Subject: [PATCH 3/3] test: remove .only --- test/2-nodes.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/2-nodes.js b/test/2-nodes.js index 0502a5b..9480b99 100644 --- a/test/2-nodes.js +++ b/test/2-nodes.js @@ -1,4 +1,5 @@ /* eslint-env mocha */ +/* eslint max-nested-callbacks: ["error", 5] */ 'use strict' const expect = require('chai').expect @@ -13,7 +14,7 @@ const _values = require('lodash.values') const PSG = require('../src') -describe.only('basics', () => { +describe('basics', () => { let nodeA let nodeB let psA