Skip to content

Commit

Permalink
feat: integrate gossipsub by default (#365)
Browse files Browse the repository at this point in the history
BREAKING CHANGE: new configuration for deciding the implementation of pubsub to be used.
In this context, the experimental flags were also removed. See the README for the latest usage.
  • Loading branch information
vasco-santos authored and jacobheun committed Jul 31, 2019
1 parent 65d5285 commit 791f39a
Show file tree
Hide file tree
Showing 10 changed files with 208 additions and 83 deletions.
9 changes: 5 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ const MPLEX = require('libp2p-mplex')
const SECIO = require('libp2p-secio')
const MulticastDNS = require('libp2p-mdns')
const DHT = require('libp2p-kad-dht')
const GossipSub = require('libp2p-gossipsub')
const defaultsDeep = require('@nodeutils/defaults-deep')
const Protector = require('libp2p-pnet')
const DelegatedPeerRouter = require('libp2p-delegated-peer-routing')
Expand Down Expand Up @@ -154,7 +155,8 @@ class Node extends Libp2p {
peerDiscovery: [
MulticastDNS
],
dht: DHT // DHT enables PeerRouting, ContentRouting and DHT itself components
dht: DHT, // DHT enables PeerRouting, ContentRouting and DHT itself components
pubsub: GossipSub
},

// libp2p config options (typically found on a config.json)
Expand Down Expand Up @@ -187,9 +189,8 @@ class Node extends Libp2p {
timeout: 10e3
}
},
// Enable/Disable Experimental features
EXPERIMENTAL: { // Experimental features ("behind a flag")
pubsub: false
pubsub: {
enabled: true
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
"err-code": "^1.1.2",
"fsm-event": "^2.1.0",
"libp2p-connection-manager": "^0.1.0",
"libp2p-floodsub": "^0.16.1",
"libp2p-ping": "^0.8.5",
"libp2p-switch": "^0.42.12",
"libp2p-websockets": "^0.12.2",
Expand All @@ -74,6 +73,8 @@
"libp2p-circuit": "^0.3.7",
"libp2p-delegated-content-routing": "^0.2.2",
"libp2p-delegated-peer-routing": "^0.2.2",
"libp2p-floodsub": "~0.17.0",
"libp2p-gossipsub": "~0.0.4",
"libp2p-kad-dht": "^0.15.3",
"libp2p-mdns": "^0.12.3",
"libp2p-mplex": "^0.8.4",
Expand All @@ -84,6 +85,7 @@
"libp2p-websocket-star": "~0.10.2",
"libp2p-websocket-star-rendezvous": "~0.3.0",
"lodash.times": "^4.3.2",
"merge-options": "^1.0.1",
"nock": "^10.0.6",
"pull-goodbye": "0.0.2",
"pull-mplex": "^0.1.2",
Expand Down
11 changes: 5 additions & 6 deletions src/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const modulesSchema = s({
connProtector: s.union(['undefined', s.interface({ protect: 'function' })]),
contentRouting: optional(list(['object'])),
dht: optional(s('null|function|object')),
pubsub: optional(s('null|function|object')),
peerDiscovery: optional(list([s('object|function')])),
peerRouting: optional(list(['object'])),
streamMuxer: optional(list([s('object|function')])),
Expand Down Expand Up @@ -59,12 +60,10 @@ const configSchema = s({
timeout: 10e3
}
}),
// Experimental config
EXPERIMENTAL: s({
pubsub: 'boolean'
}, {
// Experimental defaults
pubsub: false
// Pubsub config
pubsub: s('object?', {
// DHT defaults
enabled: false
})
}, {})

Expand Down
14 changes: 7 additions & 7 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,9 @@ class Libp2p extends EventEmitter {
})
}

// enable/disable pubsub
if (this._config.EXPERIMENTAL.pubsub) {
this.pubsub = pubsub(this)
// start pubsub
if (this._modules.pubsub && this._config.pubsub.enabled !== false) {
this.pubsub = pubsub(this, this._modules.pubsub)
}

// Attach remaining APIs
Expand Down Expand Up @@ -403,8 +403,8 @@ class Libp2p extends EventEmitter {
}
},
(cb) => {
if (this._floodSub) {
return this._floodSub.start(cb)
if (this.pubsub) {
return this.pubsub.start(cb)
}
cb()
},
Expand Down Expand Up @@ -442,8 +442,8 @@ class Libp2p extends EventEmitter {
)
},
(cb) => {
if (this._floodSub) {
return this._floodSub.stop(cb)
if (this.pubsub) {
return this.pubsub.stop(cb)
}
cb()
},
Expand Down
51 changes: 27 additions & 24 deletions src/pubsub.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,12 @@

const nextTick = require('async/nextTick')
const { messages, codes } = require('./errors')
const FloodSub = require('libp2p-floodsub')
const promisify = require('promisify-es6')

const errCode = require('err-code')

module.exports = (node) => {
const floodSub = new FloodSub(node)

node._floodSub = floodSub
module.exports = (node, Pubsub) => {
const pubsub = new Pubsub(node, { emitSelf: true })

return {
/**
Expand Down Expand Up @@ -41,16 +38,16 @@ module.exports = (node) => {
options = {}
}

if (!node.isStarted() && !floodSub.started) {
if (!node.isStarted() && !pubsub.started) {
return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED))
}

function subscribe (cb) {
if (floodSub.listenerCount(topic) === 0) {
floodSub.subscribe(topic)
if (pubsub.listenerCount(topic) === 0) {
pubsub.subscribe(topic)
}

floodSub.on(topic, handler)
pubsub.on(topic, handler)
nextTick(cb)
}

Expand Down Expand Up @@ -80,18 +77,18 @@ module.exports = (node) => {
* libp2p.unsubscribe(topic, handler, callback)
*/
unsubscribe: promisify((topic, handler, callback) => {
if (!node.isStarted() && !floodSub.started) {
if (!node.isStarted() && !pubsub.started) {
return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED))
}

if (!handler) {
floodSub.removeAllListeners(topic)
pubsub.removeAllListeners(topic)
} else {
floodSub.removeListener(topic, handler)
pubsub.removeListener(topic, handler)
}

if (floodSub.listenerCount(topic) === 0) {
floodSub.unsubscribe(topic)
if (pubsub.listenerCount(topic) === 0) {
pubsub.unsubscribe(topic)
}

if (typeof callback === 'function') {
Expand All @@ -102,29 +99,31 @@ module.exports = (node) => {
}),

publish: promisify((topic, data, callback) => {
if (!node.isStarted() && !floodSub.started) {
if (!node.isStarted() && !pubsub.started) {
return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED))
}

if (!Buffer.isBuffer(data)) {
return nextTick(callback, errCode(new Error('data must be a Buffer'), 'ERR_DATA_IS_NOT_A_BUFFER'))
try {
data = Buffer.from(data)
} catch (err) {
return nextTick(callback, errCode(new Error('data must be convertible to a Buffer'), 'ERR_DATA_IS_NOT_VALID'))
}

floodSub.publish(topic, data, callback)
pubsub.publish(topic, data, callback)
}),

ls: promisify((callback) => {
if (!node.isStarted() && !floodSub.started) {
if (!node.isStarted() && !pubsub.started) {
return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED))
}

const subscriptions = Array.from(floodSub.subscriptions)
const subscriptions = Array.from(pubsub.subscriptions)

nextTick(() => callback(null, subscriptions))
}),

peers: promisify((topic, callback) => {
if (!node.isStarted() && !floodSub.started) {
if (!node.isStarted() && !pubsub.started) {
return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED))
}

Expand All @@ -133,15 +132,19 @@ module.exports = (node) => {
topic = null
}

const peers = Array.from(floodSub.peers.values())
const peers = Array.from(pubsub.peers.values())
.filter((peer) => topic ? peer.topics.has(topic) : true)
.map((peer) => peer.info.id.toB58String())

nextTick(() => callback(null, peers))
}),

setMaxListeners (n) {
return floodSub.setMaxListeners(n)
}
return pubsub.setMaxListeners(n)
},

start: (cb) => pubsub.start(cb),

stop: (cb) => pubsub.stop(cb)
}
}
12 changes: 6 additions & 6 deletions test/config.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ describe('configuration', () => {
peerDiscovery: {
autoDial: true
},
EXPERIMENTAL: {
pubsub: false
pubsub: {
enabled: false
},
dht: {
kBucketSize: 20,
Expand Down Expand Up @@ -144,8 +144,8 @@ describe('configuration', () => {
enabled: true
}
},
EXPERIMENTAL: {
pubsub: false
pubsub: {
enabled: false
},
dht: {
kBucketSize: 20,
Expand Down Expand Up @@ -269,8 +269,8 @@ describe('configuration', () => {
dht: DHT
},
config: {
EXPERIMENTAL: {
pubsub: false
pubsub: {
enabled: false
},
peerDiscovery: {
autoDial: true
Expand Down
12 changes: 6 additions & 6 deletions test/create.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ describe('libp2p creation', () => {
it('should be able to start and stop successfully', (done) => {
createNode([], {
config: {
EXPERIMENTAL: {
pubsub: true
pubsub: {
enabled: true
},
dht: {
enabled: true
Expand All @@ -32,7 +32,7 @@ describe('libp2p creation', () => {
const sw = node._switch
const cm = node.connectionManager
const dht = node._dht
const pub = node._floodSub
const pub = node.pubsub

sinon.spy(sw, 'start')
sinon.spy(cm, 'start')
Expand Down Expand Up @@ -77,13 +77,13 @@ describe('libp2p creation', () => {
it('should not create disabled modules', (done) => {
createNode([], {
config: {
EXPERIMENTAL: {
pubsub: false
pubsub: {
enabled: false
}
}
}, (err, node) => {
expect(err).to.not.exist()
expect(node._floodSub).to.not.exist()
expect(node._pubsub).to.not.exist()
done()
})
})
Expand Down
Loading

0 comments on commit 791f39a

Please sign in to comment.