Skip to content

Commit

Permalink
feat: integrate gossipsub by default
Browse files Browse the repository at this point in the history
BREAKING CHANGE: new configuration for deciding the implementation of pubsub to be used.
In this context, the experimental flags were also removed.
  • Loading branch information
vasco-santos committed Jul 9, 2019
1 parent 10811e9 commit e238d43
Show file tree
Hide file tree
Showing 10 changed files with 203 additions and 73 deletions.
9 changes: 5 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ const MPLEX = require('libp2p-mplex')
const SECIO = require('libp2p-secio')
const MulticastDNS = require('libp2p-mdns')
const DHT = require('libp2p-kad-dht')
const GossipSub = require('gossipsub')
const defaultsDeep = require('@nodeutils/defaults-deep')
const Protector = require('libp2p-pnet')
const DelegatedPeerRouter = require('libp2p-delegated-peer-routing')
Expand Down Expand Up @@ -154,7 +155,8 @@ class Node extends Libp2p {
peerDiscovery: [
MulticastDNS
],
dht: DHT // DHT enables PeerRouting, ContentRouting and DHT itself components
dht: DHT, // DHT enables PeerRouting, ContentRouting and DHT itself components
pubsub: GossipSub
},

// libp2p config options (typically found on a config.json)
Expand Down Expand Up @@ -187,9 +189,8 @@ class Node extends Libp2p {
timeout: 10e3
}
},
// Enable/Disable Experimental features
EXPERIMENTAL: { // Experimental features ("behind a flag")
pubsub: false
pubsub: {
enabled: true
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
"err-code": "^1.1.2",
"fsm-event": "^2.1.0",
"libp2p-connection-manager": "^0.1.0",
"libp2p-floodsub": "^0.16.1",
"libp2p-ping": "^0.8.5",
"libp2p-switch": "^0.42.12",
"libp2p-websockets": "^0.12.2",
Expand All @@ -73,6 +72,8 @@
"libp2p-circuit": "^0.3.7",
"libp2p-delegated-content-routing": "^0.2.2",
"libp2p-delegated-peer-routing": "^0.2.2",
"libp2p-floodsub": "~0.17.0",
"libp2p-gossipsub": "vasco-santos/gossipsub-js#feat/fallback-to-floodsub",
"libp2p-kad-dht": "^0.15.2",
"libp2p-mdns": "^0.12.3",
"libp2p-mplex": "^0.8.4",
Expand All @@ -83,6 +84,7 @@
"libp2p-websocket-star": "~0.10.2",
"libp2p-websocket-star-rendezvous": "~0.3.0",
"lodash.times": "^4.3.2",
"merge-options": "^1.0.1",
"nock": "^10.0.6",
"pull-goodbye": "0.0.2",
"pull-mplex": "^0.1.2",
Expand Down
11 changes: 5 additions & 6 deletions src/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const modulesSchema = s({
connProtector: s.union(['undefined', s.interface({ protect: 'function' })]),
contentRouting: optional(list(['object'])),
dht: optional(s('null|function|object')),
pubsub: optional(s('null|function|object')),
peerDiscovery: optional(list([s('object|function')])),
peerRouting: optional(list(['object'])),
streamMuxer: optional(list([s('object|function')])),
Expand Down Expand Up @@ -59,12 +60,10 @@ const configSchema = s({
timeout: 10e3
}
}),
// Experimental config
EXPERIMENTAL: s({
pubsub: 'boolean'
}, {
// Experimental defaults
pubsub: false
// Pubsub config
pubsub: s('object?', {
// DHT defaults
enabled: false
})
}, {})

Expand Down
15 changes: 9 additions & 6 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,11 @@ class Libp2p extends EventEmitter {
})
}

// enable/disable pubsub
if (this._config.EXPERIMENTAL.pubsub) {
// start pubsub
if (this._config.pubsub.enabled) {
const Pubsub = this._modules.pubsub

this._pubsub = new Pubsub(this)
this.pubsub = pubsub(this)
}

Expand Down Expand Up @@ -395,8 +398,8 @@ class Libp2p extends EventEmitter {
}
},
(cb) => {
if (this._floodSub) {
return this._floodSub.start(cb)
if (this._pubsub) {
return this._pubsub.start(cb)
}
cb()
},
Expand Down Expand Up @@ -434,8 +437,8 @@ class Libp2p extends EventEmitter {
)
},
(cb) => {
if (this._floodSub) {
return this._floodSub.stop(cb)
if (this._pubsub) {
return this._pubsub.stop(cb)
}
cb()
},
Expand Down
37 changes: 17 additions & 20 deletions src/pubsub.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,11 @@

const nextTick = require('async/nextTick')
const { messages, codes } = require('./errors')
const FloodSub = require('libp2p-floodsub')

const errCode = require('err-code')

module.exports = (node) => {
const floodSub = new FloodSub(node)

node._floodSub = floodSub
const pubsub = node._pubsub

return {
subscribe: (topic, options, handler, callback) => {
Expand All @@ -19,34 +16,34 @@ module.exports = (node) => {
options = {}
}

if (!node.isStarted() && !floodSub.started) {
if (!node.isStarted() && !pubsub.started) {
return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED))
}

function subscribe (cb) {
if (floodSub.listenerCount(topic) === 0) {
floodSub.subscribe(topic)
if (pubsub.listenerCount(topic) === 0) {
pubsub.subscribe(topic)
}

floodSub.on(topic, handler)
pubsub.on(topic, handler)
nextTick(cb)
}

subscribe(callback)
},

unsubscribe: (topic, handler, callback) => {
if (!node.isStarted() && !floodSub.started) {
if (!node.isStarted() && !pubsub.started) {
return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED))
}
if (!handler && !callback) {
floodSub.removeAllListeners(topic)
pubsub.removeAllListeners(topic)
} else {
floodSub.removeListener(topic, handler)
pubsub.removeListener(topic, handler)
}

if (floodSub.listenerCount(topic) === 0) {
floodSub.unsubscribe(topic)
if (pubsub.listenerCount(topic) === 0) {
pubsub.unsubscribe(topic)
}

if (typeof callback === 'function') {
Expand All @@ -55,29 +52,29 @@ module.exports = (node) => {
},

publish: (topic, data, callback) => {
if (!node.isStarted() && !floodSub.started) {
if (!node.isStarted() && !pubsub.started) {
return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED))
}

if (!Buffer.isBuffer(data)) {
return nextTick(callback, errCode(new Error('data must be a Buffer'), 'ERR_DATA_IS_NOT_A_BUFFER'))
}

floodSub.publish(topic, data, callback)
pubsub.publish(topic, data, callback)
},

ls: (callback) => {
if (!node.isStarted() && !floodSub.started) {
if (!node.isStarted() && !pubsub.started) {
return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED))
}

const subscriptions = Array.from(floodSub.subscriptions)
const subscriptions = Array.from(pubsub.subscriptions)

nextTick(() => callback(null, subscriptions))
},

peers: (topic, callback) => {
if (!node.isStarted() && !floodSub.started) {
if (!node.isStarted() && !pubsub.started) {
return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED))
}

Expand All @@ -86,15 +83,15 @@ module.exports = (node) => {
topic = null
}

const peers = Array.from(floodSub.peers.values())
const peers = Array.from(pubsub.peers.values())
.filter((peer) => topic ? peer.topics.has(topic) : true)
.map((peer) => peer.info.id.toB58String())

nextTick(() => callback(null, peers))
},

setMaxListeners (n) {
return floodSub.setMaxListeners(n)
return pubsub.setMaxListeners(n)
}
}
}
12 changes: 6 additions & 6 deletions test/config.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ describe('configuration', () => {
peerDiscovery: {
autoDial: true
},
EXPERIMENTAL: {
pubsub: false
pubsub: {
enabled: false
},
dht: {
kBucketSize: 20,
Expand Down Expand Up @@ -144,8 +144,8 @@ describe('configuration', () => {
enabled: true
}
},
EXPERIMENTAL: {
pubsub: false
pubsub: {
enabled: false
},
dht: {
kBucketSize: 20,
Expand Down Expand Up @@ -269,8 +269,8 @@ describe('configuration', () => {
dht: DHT
},
config: {
EXPERIMENTAL: {
pubsub: false
pubsub: {
enabled: false
},
peerDiscovery: {
autoDial: true
Expand Down
12 changes: 6 additions & 6 deletions test/create.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ describe('libp2p creation', () => {
it('should be able to start and stop successfully', (done) => {
createNode([], {
config: {
EXPERIMENTAL: {
pubsub: true
pubsub: {
enabled: true
},
dht: {
enabled: true
Expand All @@ -32,7 +32,7 @@ describe('libp2p creation', () => {
let sw = node._switch
let cm = node.connectionManager
let dht = node._dht
let pub = node._floodSub
let pub = node._pubsub

sinon.spy(sw, 'start')
sinon.spy(cm, 'start')
Expand Down Expand Up @@ -77,13 +77,13 @@ describe('libp2p creation', () => {
it('should not create disabled modules', (done) => {
createNode([], {
config: {
EXPERIMENTAL: {
pubsub: false
pubsub: {
enabled: false
}
}
}, (err, node) => {
expect(err).to.not.exist()
expect(node._floodSub).to.not.exist()
expect(node._pubsub).to.not.exist()
done()
})
})
Expand Down
Loading

0 comments on commit e238d43

Please sign in to comment.