diff --git a/README.md b/README.md index 2b155ed7d7..850697af7f 100644 --- a/README.md +++ b/README.md @@ -119,6 +119,7 @@ const MPLEX = require('libp2p-mplex') const SECIO = require('libp2p-secio') const MulticastDNS = require('libp2p-mdns') const DHT = require('libp2p-kad-dht') +const GossipSub = require('gossipsub') const defaultsDeep = require('@nodeutils/defaults-deep') const Protector = require('libp2p-pnet') const DelegatedPeerRouter = require('libp2p-delegated-peer-routing') @@ -154,7 +155,8 @@ class Node extends Libp2p { peerDiscovery: [ MulticastDNS ], - dht: DHT // DHT enables PeerRouting, ContentRouting and DHT itself components + dht: DHT, // DHT enables PeerRouting, ContentRouting and DHT itself components + pubsub: GossipSub }, // libp2p config options (typically found on a config.json) @@ -187,9 +189,8 @@ class Node extends Libp2p { timeout: 10e3 } }, - // Enable/Disable Experimental features - EXPERIMENTAL: { // Experimental features ("behind a flag") - pubsub: false + pubsub: { + enabled: true } } } diff --git a/package.json b/package.json index f21f4923a8..d82d74bf71 100644 --- a/package.json +++ b/package.json @@ -48,7 +48,6 @@ "err-code": "^1.1.2", "fsm-event": "^2.1.0", "libp2p-connection-manager": "^0.1.0", - "libp2p-floodsub": "^0.16.1", "libp2p-ping": "^0.8.5", "libp2p-switch": "^0.42.12", "libp2p-websockets": "^0.12.2", @@ -73,6 +72,8 @@ "libp2p-circuit": "^0.3.7", "libp2p-delegated-content-routing": "^0.2.2", "libp2p-delegated-peer-routing": "^0.2.2", + "libp2p-floodsub": "~0.17.0", + "libp2p-gossipsub": "vasco-santos/gossipsub-js#feat/fallback-to-floodsub", "libp2p-kad-dht": "^0.15.2", "libp2p-mdns": "^0.12.3", "libp2p-mplex": "^0.8.4", @@ -83,6 +84,7 @@ "libp2p-websocket-star": "~0.10.2", "libp2p-websocket-star-rendezvous": "~0.3.0", "lodash.times": "^4.3.2", + "merge-options": "^1.0.1", "nock": "^10.0.6", "pull-goodbye": "0.0.2", "pull-mplex": "^0.1.2", diff --git a/src/config.js b/src/config.js index 6b718038c9..f3fef2e866 100644 --- a/src/config.js +++ b/src/config.js @@ -19,6 +19,7 @@ const modulesSchema = s({ connProtector: s.union(['undefined', s.interface({ protect: 'function' })]), contentRouting: optional(list(['object'])), dht: optional(s('null|function|object')), + pubsub: optional(s('null|function|object')), peerDiscovery: optional(list([s('object|function')])), peerRouting: optional(list(['object'])), streamMuxer: optional(list([s('object|function')])), @@ -59,12 +60,10 @@ const configSchema = s({ timeout: 10e3 } }), - // Experimental config - EXPERIMENTAL: s({ - pubsub: 'boolean' - }, { - // Experimental defaults - pubsub: false + // Pubsub config + pubsub: s('object?', { + // DHT defaults + enabled: false }) }, {}) diff --git a/src/index.js b/src/index.js index 826ed51ccd..7fa6e36eb0 100644 --- a/src/index.js +++ b/src/index.js @@ -121,8 +121,11 @@ class Libp2p extends EventEmitter { }) } - // enable/disable pubsub - if (this._config.EXPERIMENTAL.pubsub) { + // start pubsub + if (this._config.pubsub.enabled) { + const Pubsub = this._modules.pubsub + + this._pubsub = new Pubsub(this) this.pubsub = pubsub(this) } @@ -395,8 +398,8 @@ class Libp2p extends EventEmitter { } }, (cb) => { - if (this._floodSub) { - return this._floodSub.start(cb) + if (this._pubsub) { + return this._pubsub.start(cb) } cb() }, @@ -434,8 +437,8 @@ class Libp2p extends EventEmitter { ) }, (cb) => { - if (this._floodSub) { - return this._floodSub.stop(cb) + if (this._pubsub) { + return this._pubsub.stop(cb) } cb() }, diff --git a/src/pubsub.js b/src/pubsub.js index 0706e5d471..c1a21945de 100644 --- a/src/pubsub.js +++ b/src/pubsub.js @@ -2,14 +2,11 @@ const nextTick = require('async/nextTick') const { messages, codes } = require('./errors') -const FloodSub = require('libp2p-floodsub') const errCode = require('err-code') module.exports = (node) => { - const floodSub = new FloodSub(node) - - node._floodSub = floodSub + const pubsub = node._pubsub return { subscribe: (topic, options, handler, callback) => { @@ -19,16 +16,16 @@ module.exports = (node) => { options = {} } - if (!node.isStarted() && !floodSub.started) { + if (!node.isStarted() && !pubsub.started) { return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED)) } function subscribe (cb) { - if (floodSub.listenerCount(topic) === 0) { - floodSub.subscribe(topic) + if (pubsub.listenerCount(topic) === 0) { + pubsub.subscribe(topic) } - floodSub.on(topic, handler) + pubsub.on(topic, handler) nextTick(cb) } @@ -36,17 +33,17 @@ module.exports = (node) => { }, unsubscribe: (topic, handler, callback) => { - if (!node.isStarted() && !floodSub.started) { + if (!node.isStarted() && !pubsub.started) { return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED)) } if (!handler && !callback) { - floodSub.removeAllListeners(topic) + pubsub.removeAllListeners(topic) } else { - floodSub.removeListener(topic, handler) + pubsub.removeListener(topic, handler) } - if (floodSub.listenerCount(topic) === 0) { - floodSub.unsubscribe(topic) + if (pubsub.listenerCount(topic) === 0) { + pubsub.unsubscribe(topic) } if (typeof callback === 'function') { @@ -55,7 +52,7 @@ module.exports = (node) => { }, publish: (topic, data, callback) => { - if (!node.isStarted() && !floodSub.started) { + if (!node.isStarted() && !pubsub.started) { return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED)) } @@ -63,21 +60,21 @@ module.exports = (node) => { return nextTick(callback, errCode(new Error('data must be a Buffer'), 'ERR_DATA_IS_NOT_A_BUFFER')) } - floodSub.publish(topic, data, callback) + pubsub.publish(topic, data, callback) }, ls: (callback) => { - if (!node.isStarted() && !floodSub.started) { + if (!node.isStarted() && !pubsub.started) { return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED)) } - const subscriptions = Array.from(floodSub.subscriptions) + const subscriptions = Array.from(pubsub.subscriptions) nextTick(() => callback(null, subscriptions)) }, peers: (topic, callback) => { - if (!node.isStarted() && !floodSub.started) { + if (!node.isStarted() && !pubsub.started) { return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED)) } @@ -86,7 +83,7 @@ module.exports = (node) => { topic = null } - const peers = Array.from(floodSub.peers.values()) + const peers = Array.from(pubsub.peers.values()) .filter((peer) => topic ? peer.topics.has(topic) : true) .map((peer) => peer.info.id.toB58String()) @@ -94,7 +91,7 @@ module.exports = (node) => { }, setMaxListeners (n) { - return floodSub.setMaxListeners(n) + return pubsub.setMaxListeners(n) } } } diff --git a/test/config.spec.js b/test/config.spec.js index c28f08f685..b957d7a9d4 100644 --- a/test/config.spec.js +++ b/test/config.spec.js @@ -82,8 +82,8 @@ describe('configuration', () => { peerDiscovery: { autoDial: true }, - EXPERIMENTAL: { - pubsub: false + pubsub: { + enabled: false }, dht: { kBucketSize: 20, @@ -144,8 +144,8 @@ describe('configuration', () => { enabled: true } }, - EXPERIMENTAL: { - pubsub: false + pubsub: { + enabled: false }, dht: { kBucketSize: 20, @@ -269,8 +269,8 @@ describe('configuration', () => { dht: DHT }, config: { - EXPERIMENTAL: { - pubsub: false + pubsub: { + enabled: false }, peerDiscovery: { autoDial: true diff --git a/test/create.spec.js b/test/create.spec.js index 7b9d24162e..3e931e5357 100644 --- a/test/create.spec.js +++ b/test/create.spec.js @@ -19,8 +19,8 @@ describe('libp2p creation', () => { it('should be able to start and stop successfully', (done) => { createNode([], { config: { - EXPERIMENTAL: { - pubsub: true + pubsub: { + enabled: true }, dht: { enabled: true @@ -32,7 +32,7 @@ describe('libp2p creation', () => { let sw = node._switch let cm = node.connectionManager let dht = node._dht - let pub = node._floodSub + let pub = node._pubsub sinon.spy(sw, 'start') sinon.spy(cm, 'start') @@ -77,13 +77,13 @@ describe('libp2p creation', () => { it('should not create disabled modules', (done) => { createNode([], { config: { - EXPERIMENTAL: { - pubsub: false + pubsub: { + enabled: false } } }, (err, node) => { expect(err).to.not.exist() - expect(node._floodSub).to.not.exist() + expect(node._pubsub).to.not.exist() done() }) }) diff --git a/test/pubsub.node.js b/test/pubsub.node.js index f3795c3590..8c043b85db 100644 --- a/test/pubsub.node.js +++ b/test/pubsub.node.js @@ -11,23 +11,31 @@ const parallel = require('async/parallel') const series = require('async/series') const _times = require('lodash.times') +const Floodsub = require('libp2p-floodsub') +const mergeOptions = require('merge-options') + const { codes } = require('../src/errors') const createNode = require('./utils/create-node') -function startTwo (callback) { +function startTwo (options, callback) { + if (typeof options === 'function') { + callback = options + options = {} + } + const tasks = _times(2, () => (cb) => { - createNode('/ip4/0.0.0.0/tcp/0', { + createNode('/ip4/0.0.0.0/tcp/0', mergeOptions({ config: { peerDiscovery: { mdns: { enabled: false } }, - EXPERIMENTAL: { - pubsub: true + pubsub: { + enabled: true } } - }, (err, node) => { + }, options), (err, node) => { expect(err).to.not.exist() node.start((err) => cb(err, node)) }) @@ -47,13 +55,8 @@ function stopTwo (nodes, callback) { ], callback) } -// There is a vast test suite on PubSub through js-ipfs -// https://github.com/ipfs/interface-ipfs-core/blob/master/js/src/pubsub.js -// and libp2p-floodsub itself -// https://github.com/libp2p/js-libp2p-floodsub/tree/master/test -// TODO: consider if all or some of those should come here describe('.pubsub', () => { - describe('.pubsub on (default)', (done) => { + describe('.pubsub on (default)', () => { it('start two nodes and send one message, then unsubscribe', (done) => { // Check the final series error, and the publish handler expect(2).checks(done) @@ -145,9 +148,133 @@ describe('.pubsub', () => { enabled: false } }, - EXPERIMENTAL: { - pubsub: true + pubsub: { + enabled: true + } + } + }, (err, node) => { + expect(err).to.not.exist() + + node.start((err) => { + expect(err).to.not.exist() + + node.pubsub.publish('pubsub', 'datastr', (err) => { + expect(err).to.exist() + expect(err.code).to.equal('ERR_DATA_IS_NOT_A_BUFFER') + + done() + }) + }) + }) + }) + }) + + describe('.pubsub on using floodsub', () => { + it('start two nodes and send one message, then unsubscribe', (done) => { + // Check the final series error, and the publish handler + expect(2).checks(done) + + let nodes + const data = Buffer.from('test') + const handler = (msg) => { + // verify the data is correct and mark the expect + expect(msg.data).to.eql(data).mark() + } + + series([ + // Start the nodes + (cb) => startTwo({ + modules: { + pubsub: Floodsub + } + }, (err, _nodes) => { + nodes = _nodes + cb(err) + }), + // subscribe on the first + (cb) => nodes[0].pubsub.subscribe('pubsub', handler, cb), + // Wait a moment before publishing + (cb) => setTimeout(cb, 500), + // publish on the second + (cb) => nodes[1].pubsub.publish('pubsub', data, cb), + // ls subscripts + (cb) => nodes[1].pubsub.ls(cb), + // get subscribed peers + (cb) => nodes[1].pubsub.peers('pubsub', cb), + // Wait a moment before unsubscribing + (cb) => setTimeout(cb, 500), + // unsubscribe on the first + (cb) => nodes[0].pubsub.unsubscribe('pubsub', handler, cb), + // Stop both nodes + (cb) => stopTwo(nodes, cb) + ], (err) => { + // Verify there was no error, and mark the expect + expect(err).to.not.exist().mark() + }) + }) + it('start two nodes and send one message, then unsubscribe without handler', (done) => { + // Check the final series error, and the publish handler + expect(3).checks(done) + + let nodes + const data = Buffer.from('test') + const handler = (msg) => { + // verify the data is correct and mark the expect + expect(msg.data).to.eql(data).mark() + } + + series([ + // Start the nodes + (cb) => startTwo({ + modules: { + pubsub: Floodsub + } + }, (err, _nodes) => { + nodes = _nodes + cb(err) + }), + // subscribe on the first + (cb) => nodes[0].pubsub.subscribe('pubsub', handler, cb), + // Wait a moment before publishing + (cb) => setTimeout(cb, 500), + // publish on the second + (cb) => nodes[1].pubsub.publish('pubsub', data, cb), + // Wait a moment before unsubscribing + (cb) => setTimeout(cb, 500), + // unsubscribe on the first + (cb) => { + nodes[0].pubsub.unsubscribe('pubsub') + // Wait a moment to make sure the ubsubscribe-from-all worked + setTimeout(cb, 500) + }, + // Verify unsubscribed + (cb) => { + nodes[0].pubsub.ls((err, topics) => { + expect(topics.length).to.eql(0).mark() + cb(err) + }) + }, + // Stop both nodes + (cb) => stopTwo(nodes, cb) + ], (err) => { + // Verify there was no error, and mark the expect + expect(err).to.not.exist().mark() + }) + }) + it('publish should fail if data is not a buffer', (done) => { + createNode('/ip4/0.0.0.0/tcp/0', { + config: { + peerDiscovery: { + mdns: { + enabled: false + } + }, + pubsub: { + enabled: true } + }, + modules: { + pubsub: Floodsub } }, (err, node) => { expect(err).to.not.exist() @@ -174,9 +301,6 @@ describe('.pubsub', () => { mdns: { enabled: false } - }, - EXPERIMENTAL: { - pubsub: false } } }, (err, node) => { @@ -198,8 +322,8 @@ describe('.pubsub', () => { enabled: false } }, - EXPERIMENTAL: { - pubsub: true + pubsub: { + enabled: true } } }, (err, node) => { diff --git a/test/utils/bundle-browser.js b/test/utils/bundle-browser.js index 5e9b3fecd2..a5140da92a 100644 --- a/test/utils/bundle-browser.js +++ b/test/utils/bundle-browser.js @@ -8,6 +8,7 @@ const SPDY = require('libp2p-spdy') const MPLEX = require('libp2p-mplex') const PULLMPLEX = require('pull-mplex') const KadDHT = require('libp2p-kad-dht') +const GossipSub = require('libp2p-gossipsub') const SECIO = require('libp2p-secio') const defaultsDeep = require('@nodeutils/defaults-deep') const libp2p = require('../..') @@ -57,7 +58,8 @@ class Node extends libp2p { wsStar.discovery, Bootstrap ], - dht: KadDHT + dht: KadDHT, + pubsub: GossipSub }, config: { peerDiscovery: { @@ -88,8 +90,8 @@ class Node extends libp2p { }, enabled: false }, - EXPERIMENTAL: { - pubsub: false + pubsub: { + enabled: false } } } diff --git a/test/utils/bundle-nodejs.js b/test/utils/bundle-nodejs.js index d4900dd50a..842224cac8 100644 --- a/test/utils/bundle-nodejs.js +++ b/test/utils/bundle-nodejs.js @@ -6,6 +6,7 @@ const WS = require('libp2p-websockets') const Bootstrap = require('libp2p-bootstrap') const SPDY = require('libp2p-spdy') const KadDHT = require('libp2p-kad-dht') +const GossipSub = require('libp2p-gossipsub') const MPLEX = require('libp2p-mplex') const PULLMPLEX = require('pull-mplex') const SECIO = require('libp2p-secio') @@ -52,7 +53,8 @@ class Node extends libp2p { MulticastDNS, Bootstrap ], - dht: KadDHT + dht: KadDHT, + pubsub: GossipSub }, config: { peerDiscovery: { @@ -81,8 +83,8 @@ class Node extends libp2p { }, enabled: true }, - EXPERIMENTAL: { - pubsub: false + pubsub: { + enabled: false } } }