diff --git a/package.json b/package.json index 721add1..bf6d964 100644 --- a/package.json +++ b/package.json @@ -65,4 +65,4 @@ "Gavin McDermott ", "Haad " ] -} \ No newline at end of file +} diff --git a/src/dial-floodsub.js b/src/dial-floodsub.js index b98b475..d426599 100644 --- a/src/dial-floodsub.js +++ b/src/dial-floodsub.js @@ -1,6 +1,7 @@ 'use strict' const config = require('./config') +const pb = require('./message') const log = config.log const multicodec = config.multicodec const stream = require('stream') @@ -50,8 +51,19 @@ module.exports = (libp2pNode, peerSet, subscriptions) => { conn ) + // Immediately send my own subscriptions to the newly established conn if (subscriptions.length > 0) { - // TODO send my subscriptions through the new conn + const subopts = subscriptions.map((topic) => { + return { + subscribe: true, + topicCID: topic + } + }) + const rpc = pb.rpc.RPC.encode({ + subscriptions: subopts + }) + + peerSet[idB58Str].stream.write(rpc) } } } diff --git a/test/2-nodes.js b/test/2-nodes.js index 06d766f..9480b99 100644 --- a/test/2-nodes.js +++ b/test/2-nodes.js @@ -1,4 +1,5 @@ /* eslint-env mocha */ +/* eslint max-nested-callbacks: ["error", 5] */ 'use strict' const expect = require('chai').expect @@ -8,7 +9,6 @@ const multiaddr = require('multiaddr') const Node = require('libp2p-ipfs-nodejs') const parallel = require('async/parallel') const series = require('async/series') - const _times = require('lodash.times') const _values = require('lodash.values') @@ -20,176 +20,268 @@ describe('basics', () => { let psA let psB - before((done) => { - series([ - (cb) => { - PeerId.create((err, idA) => { - expect(err).to.not.exist - PeerInfo.create(idA, (err, peerA) => { + describe('no existing pubsub config', () => { + before((done) => { + series([ + (cb) => { + PeerId.create((err, idA) => { expect(err).to.not.exist - peerA.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/0')) - nodeA = new Node(peerA) - cb() + PeerInfo.create(idA, (err, peerA) => { + expect(err).to.not.exist + peerA.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/0')) + nodeA = new Node(peerA) + cb() + }) }) - }) - }, - (cb) => { - PeerId.create((err, idB) => { - expect(err).to.not.exist - PeerInfo.create(idB, (err, peerB) => { + }, + (cb) => { + PeerId.create((err, idB) => { expect(err).to.not.exist - peerB.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/0')) - nodeB = new Node(peerB) - cb() + PeerInfo.create(idB, (err, peerB) => { + expect(err).to.not.exist + peerB.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/0')) + nodeB = new Node(peerB) + cb() + }) }) - }) - }, - (cb) => { - parallel([ - (cb) => { - nodeA.start(cb) - }, - (cb) => { - nodeB.start(cb) - } - ], cb) - } - ], done) - }) + }, + (cb) => { + parallel([ + (cb) => { + nodeA.start(cb) + }, + (cb) => { + nodeB.start(cb) + } + ], cb) + } + ], done) + }) - after((done) => { - parallel([ - (cb) => { - nodeA.stop(cb) - }, - (cb) => { - nodeB.stop(cb) - } - ], done) - }) + after((done) => { + parallel([ + (cb) => { + nodeA.stop(cb) + }, + (cb) => { + nodeB.stop(cb) + } + ], done) + }) - it('Mount the pubsub protocol', (done) => { - parallel([ - (cb) => { - psA = new PSG(nodeA) - setTimeout(() => { - expect(psA.getPeerSet()).to.eql({}) - expect(psA.getSubscriptions()).to.eql([]) - cb() - }, 50) - }, - (cb) => { - psB = new PSG(nodeB) + it('Mount the pubsub protocol', (done) => { + parallel([ + (cb) => { + psA = new PSG(nodeA) + setTimeout(() => { + expect(psA.getPeerSet()).to.eql({}) + expect(psA.getSubscriptions()).to.eql([]) + cb() + }, 50) + }, + (cb) => { + psB = new PSG(nodeB) + setTimeout(() => { + expect(psB.getPeerSet()).to.eql({}) + expect(psB.getSubscriptions()).to.eql([]) + cb() + }, 50) + } + ], done) + }) + + it('Dial from nodeA to nodeB', (done) => { + nodeA.dialByPeerInfo(nodeB.peerInfo, (err) => { + expect(err).to.not.exist setTimeout(() => { - expect(psB.getPeerSet()).to.eql({}) - expect(psB.getSubscriptions()).to.eql([]) - cb() - }, 50) - } - ], done) - }) + expect(Object.keys(psA.getPeerSet()).length).to.equal(1) + expect(Object.keys(psB.getPeerSet()).length).to.equal(1) + done() + }, 250) + }) + }) - it('Dial from nodeA to nodeB', (done) => { - nodeA.dialByPeerInfo(nodeB.peerInfo, (err) => { - expect(err).to.not.exist + it('Subscribe to a topic:Z in nodeA', (done) => { + psA.subscribe('Z') setTimeout(() => { - expect(Object.keys(psA.getPeerSet()).length).to.equal(1) - expect(Object.keys(psB.getPeerSet()).length).to.equal(1) + expect(psA.getSubscriptions()).to.eql(['Z']) + const peersB = _values(psB.getPeerSet()) + expect(peersB.length).to.equal(1) + expect(peersB[0].topics).to.eql(['Z']) done() - }, 250) + }, 100) }) - }) - it('Subscribe to a topic:Z in nodeA', (done) => { - psA.subscribe('Z') - setTimeout(() => { - expect(psA.getSubscriptions()).to.eql(['Z']) - const peersB = _values(psB.getPeerSet()) - expect(peersB.length).to.equal(1) - expect(peersB[0].topics).to.eql(['Z']) - done() - }, 100) - }) + it('Publish to a topic:Z in nodeA', (done) => { + psB.once('Z', shouldNotHappen) + + function shouldNotHappen (msg) { expect.fail() } - it('Publish to a topic:Z in nodeA', (done) => { - psB.once('Z', shouldNotHappen) + psA.once('Z', (msg) => { + expect(msg.toString()).to.equal('hey') + psB.removeListener('Z', shouldNotHappen) + done() + }) - function shouldNotHappen (msg) { expect.fail() } + psB.once('Z', shouldNotHappen) - psA.once('Z', (msg) => { - expect(msg.toString()).to.equal('hey') - psB.removeListener('Z', shouldNotHappen) - done() + psA.publish('Z', new Buffer('hey')) }) - psB.once('Z', shouldNotHappen) + it('Publish to a topic:Z in nodeB', (done) => { + psB.once('Z', shouldNotHappen) - psA.publish('Z', new Buffer('hey')) - }) + psA.once('Z', (msg) => { + psA.once('Z', shouldNotHappen) + expect(msg.toString()).to.equal('banana') + setTimeout(() => { + psA.removeListener('Z', shouldNotHappen) + psB.removeListener('Z', shouldNotHappen) + done() + }, 100) + }) - it('Publish to a topic:Z in nodeB', (done) => { - psB.once('Z', shouldNotHappen) + psB.once('Z', shouldNotHappen) - psA.once('Z', (msg) => { - psA.once('Z', shouldNotHappen) - expect(msg.toString()).to.equal('banana') - setTimeout(() => { - psA.removeListener('Z', shouldNotHappen) - psB.removeListener('Z', shouldNotHappen) - done() - }, 100) + psB.publish('Z', new Buffer('banana')) }) - psB.once('Z', shouldNotHappen) + it('Publish 10 msg to a topic:Z in nodeB', (done) => { + let counter = 0 - psB.publish('Z', new Buffer('banana')) - }) + psB.once('Z', shouldNotHappen) + + psA.on('Z', receivedMsg) - it('Publish 10 msg to a topic:Z in nodeB', (done) => { - let counter = 0 + function receivedMsg (msg) { + expect(msg.toString()).to.equal('banana') - psB.once('Z', shouldNotHappen) + if (++counter === 10) { + psA.removeListener('Z', receivedMsg) + done() + } + } - psA.on('Z', receivedMsg) + _times(10, () => { + psB.publish('Z', new Buffer('banana')) + }) + }) - function receivedMsg (msg) { - expect(msg.toString()).to.equal('banana') + it('Unsubscribe from topic:Z in nodeA', (done) => { + psA.unsubscribe('Z') + expect(psA.getSubscriptions()).to.eql([]) - if (++counter === 10) { - psA.removeListener('Z', receivedMsg) + setTimeout(() => { + const peersB = _values(psB.getPeerSet()) + expect(peersB.length).to.equal(1) + expect(peersB[0].topics).to.eql([]) done() - } - } + }, 100) + }) + + it('Publish to a topic:Z in nodeA nodeB', (done) => { + psA.once('Z', shouldNotHappen) + psB.once('Z', shouldNotHappen) + + setTimeout(() => { + psA.removeListener('Z', shouldNotHappen) + psB.removeListener('Z', shouldNotHappen) + done() + }, 100) - _times(10, () => { psB.publish('Z', new Buffer('banana')) + psA.publish('Z', new Buffer('banana')) }) }) - it('Unsubscribe from topic:Z in nodeA', (done) => { - psA.unsubscribe('Z') - expect(psA.getSubscriptions()).to.eql([]) + describe('existing pubsub config', () => { + before((done) => { + series([ + (cb) => { + PeerId.create((err, idA) => { + expect(err).to.not.exist + PeerInfo.create(idA, (err, peerA) => { + expect(err).to.not.exist + peerA.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/0')) + nodeA = new Node(peerA) + cb() + }) + }) + }, + (cb) => { + PeerId.create((err, idB) => { + expect(err).to.not.exist + PeerInfo.create(idB, (err, peerB) => { + expect(err).to.not.exist + peerB.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/0')) + nodeB = new Node(peerB) + cb() + }) + }) + }, + (cb) => { + parallel([ + (cb) => { + nodeA.start(cb) + }, + (cb) => { + nodeB.start(cb) + } + ], cb) + }, + (cb) => { + psA = new PSG(nodeA) + psA.subscribe('Za') + setTimeout(() => { + expect(psA.getPeerSet()).to.eql({}) + expect(psA.getSubscriptions()).to.eql(['Za']) + cb() + }, 50) + }, + (cb) => { + psB = new PSG(nodeB) + psB.subscribe('Zb') + setTimeout(() => { + expect(psB.getPeerSet()).to.eql({}) + expect(psB.getSubscriptions()).to.eql(['Zb']) + cb() + }, 50) + } + ], done) + }) - setTimeout(() => { - const peersB = _values(psB.getPeerSet()) - expect(peersB.length).to.equal(1) - expect(peersB[0].topics).to.eql([]) - done() - }, 100) - }) + after((done) => { + parallel([ + (cb) => { + nodeA.stop(cb) + }, + (cb) => { + nodeB.stop(cb) + } + ], done) + }) - it('Publish to a topic:Z in nodeA nodeB', (done) => { - psA.once('Z', shouldNotHappen) - psB.once('Z', shouldNotHappen) + it('Existing subscriptions are sent upon peer connection', (done) => { + nodeA.dialByPeerInfo(nodeB.peerInfo, (err) => { + expect(err).to.not.exist + setTimeout(() => { + expect(Object.keys(psA.getPeerSet()).length).to.equal(1) + expect(Object.keys(psB.getPeerSet()).length).to.equal(1) + + expect(psA.getSubscriptions()).to.eql(['Za']) + const peersB = _values(psB.getPeerSet()) + expect(peersB.length).to.equal(1) + expect(peersB[0].topics).to.eql(['Za']) - setTimeout(() => { - psA.removeListener('Z', shouldNotHappen) - psB.removeListener('Z', shouldNotHappen) - done() - }, 100) + expect(psB.getSubscriptions()).to.eql(['Zb']) + const peersA = _values(psA.getPeerSet()) + expect(peersA.length).to.equal(1) + expect(peersA[0].topics).to.eql(['Zb']) - psB.publish('Z', new Buffer('banana')) - psA.publish('Z', new Buffer('banana')) + done() + }, 250) + }) + }) }) })