Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: integrate gossipsub by default #19

Merged
merged 8 commits into from
Aug 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@
"cids": "~0.6.0",
"debug": "^4.1.1",
"length-prefixed-stream": "github:jacobheun/length-prefixed-stream#v2.0.0-rc.1",
"libp2p": "~0.25.2",
"libp2p": "~0.26.0",
"libp2p-bootstrap": "~0.9.7",
"libp2p-floodsub": "~0.17.0",
"libp2p-gossipsub": "~0.0.4",
"libp2p-kad-dht": "~0.15.2",
"libp2p-secio": "~0.11.1",
"libp2p-tcp": "~0.13.0",
Expand Down
5 changes: 5 additions & 0 deletions src/cli/bin.js
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ const main = async (processArgs) => {
type: 'boolean',
default: false
})
.option('pubsubRouter', {
desc: 'Specifies the pubsub router implementation',
type: 'string',
default: 'gossipsub'
})
.fail((msg, err, yargs) => {
if (err) {
throw err // preserve stack
Expand Down
6 changes: 3 additions & 3 deletions src/daemon.js
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ class Daemon {
async handlePubsubRequest ({ pubsub }, enc) {
switch (pubsub.type) {
case PSRequest.Type.GET_TOPICS: {
const topics = await this.libp2p.pubsub.getTopics()
const topics = await this.libp2p.pubsub.ls()

await new Promise((resolve) => {
enc.write(
Expand All @@ -283,7 +283,7 @@ class Daemon {
case PSRequest.Type.SUBSCRIBE: {
const topic = pubsub.topic

await this.libp2p.pubsub.subscribe(topic, {}, async (msg) => {
await this.libp2p.pubsub.subscribe(topic, async (msg) => {
await new Promise((resolve) => {
enc.write(PSMessage.encode({
from: msg.from && Buffer.from(msg.from),
Expand All @@ -294,7 +294,7 @@ class Daemon {
key: msg.key
}), resolve)
})
})
}, {})

await new Promise((resolve) => {
enc.write(OkResponse(), resolve)
Expand Down
157 changes: 46 additions & 111 deletions src/libp2p.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ const Bootstrap = require('libp2p-bootstrap')
const MPLEX = require('pull-mplex')
const SECIO = require('libp2p-secio')
const KadDHT = require('libp2p-kad-dht')
const FloodSub = require('libp2p-floodsub')
const GossipSub = require('libp2p-gossipsub')
const pullToStream = require('pull-stream-to-stream')
const PeerBook = require('peer-book')
const PeerInfo = require('peer-info')
Expand Down Expand Up @@ -105,59 +107,6 @@ class ContentRouting {
}
}

class Pubsub {
/**
* @param {Libp2p} libp2p The libp2p instance to use
*/
constructor (libp2p) {
this.libp2p = libp2p
}

/**
* Subscribe to a pubsub topic
* @param {string} topic
* @param {Object} options
* @param {function(msg)} handler handle received messages
* @returns { Promise<void>}
*/
subscribe (topic, options, handler) {
return new Promise((resolve, reject) => {
this.libp2p._pubsub.subscribe(topic, options, handler, (err) => {
if (err) return reject(err)
resolve()
})
})
}

/**
* Publish data in the context of a topic
* @param {string} topic
* @param {Buffer} data
* @returns {Promise<void>}
*/
publish (topic, data) {
return new Promise((resolve, reject) => {
this.libp2p._pubsub.publish(topic, data, (err) => {
if (err) return reject(err)
resolve()
})
})
}

/**
* Get the list of subscriptions the peer is subscribed to.
* @returns {Promise<Array<string>>}
*/
getTopics () {
return new Promise((resolve, reject) => {
this.libp2p._pubsub.ls((err, topics) => {
if (err) return reject(err)
resolve(topics)
})
})
}
}

class DHT {
/**
* @param {Libp2p} libp2p The libp2p instance to use
Expand Down Expand Up @@ -253,9 +202,7 @@ class DaemonLibp2p extends Libp2p {
constructor (libp2pOpts, { announceAddrs }) {
super(libp2pOpts)
this.announceAddrs = announceAddrs
this.needsPullStream = libp2pOpts.config.EXPERIMENTAL.pubsub
this._pubsub = this.pubsub
this.pubsub = new Pubsub(this)
this.needsPullStream = libp2pOpts.config.pubsub.enabled
}
get contentRouting () {
return this._contentRouting
Expand All @@ -278,76 +225,60 @@ class DaemonLibp2p extends Libp2p {

/**
* Starts the libp2p node
* NOTE: This is currently promisified internally by libp2p
*
* @returns {Promise<void>}
* @param {function(Error)} callback
*/
start () {
return new Promise((resolve, reject) => {
super.start((err) => {
if (err) return reject(err)
start (callback) {
super.start((err) => {
if (err) return callback(err)

// replace with announce addrs until libp2p supports this directly
if (this.announceAddrs.length > 0) {
this.peerInfo.multiaddrs.clear()
this.announceAddrs.forEach(addr => {
this.peerInfo.multiaddrs.add(addr)
})
}

// replace with announce addrs until libp2p supports this directly
if (this.announceAddrs.length > 0) {
this.peerInfo.multiaddrs.clear()
this.announceAddrs.forEach(addr => {
this.peerInfo.multiaddrs.add(addr)
})
// temporary removal of "/ipfs/..." from multiaddrs
// this will be solved in: https://github.com/libp2p/js-libp2p/issues/323
this.peerInfo.multiaddrs.toArray().forEach(m => {
let ma
try {
ma = m.decapsulate('ipfs')
} catch (_) {
ma = m
}

// temporary removal of "/ipfs/..." from multiaddrs
// this will be solved in: https://github.com/libp2p/js-libp2p/issues/323
this.peerInfo.multiaddrs.toArray().forEach(m => {
let ma
try {
ma = m.decapsulate('ipfs')
} catch (_) {
ma = m
}

this.peerInfo.multiaddrs.replace(m, ma)
})

resolve()
this.peerInfo.multiaddrs.replace(m, ma)
})
})
}

/**
* Stops the libp2p node
*
* @returns {Promise<void>}
*/
stop () {
return new Promise((resolve, reject) => {
super.stop((err) => {
if (err) return reject(err)
resolve()
})
callback()
})
}

/**
* Dials the given peer on protocol. The promise will resolve with the connection
* NOTE: This is currently promisified internally by libp2p
*
* @param {PeerInfo} peerInfo
* @param {string} protocol
* @returns {Promise<Connection>}
* @param {function(Error, Connection)} callback
*/
dial (peerInfo, protocol) {
return new Promise((resolve, reject) => {
this.dialProtocol(peerInfo, protocol, (err, conn) => {
if (err) return reject(err)
if (!conn) return resolve()
dial (peerInfo, protocol, callback) {
this.dialProtocol(peerInfo, protocol, (err, conn) => {
if (err) return callback(err)
if (!conn) return callback()

conn.getPeerInfo((err, peerInfo) => {
if (err) return reject(err)
conn.getPeerInfo((err, peerInfo) => {
if (err) return callback(err)

// Convert the pull stream to an iterable node stream
const connection = pullToStream(conn)
connection.peerInfo = peerInfo
// Convert the pull stream to an iterable node stream
const connection = pullToStream(conn)
connection.peerInfo = peerInfo

resolve(connection)
})
callback(null, connection)
})
})
}
Expand Down Expand Up @@ -384,6 +315,8 @@ class DaemonLibp2p extends Libp2p {
* @param {string} opts.id
* @param {string} opts.bootstrapPeers
* @param {string} opts.hostAddrs
* @param {boolean} opts.pubsub
* @param {string} opts.pubsubRouter
* @returns {Libp2p}
*/
const createLibp2p = async ({
Expand All @@ -395,7 +328,8 @@ const createLibp2p = async ({
connMgrLo,
connMgrHi,
id,
pubsub
pubsub,
pubsubRouter
} = {}) => {
const peerInfo = await getPeerInfo(id)
const peerBook = new PeerBook()
Expand Down Expand Up @@ -430,7 +364,8 @@ const createLibp2p = async ({
peerDiscovery: [
Bootstrap
],
dht: KadDHT
dht: KadDHT,
pubsub: pubsubRouter === 'floodsub' ? GossipSub : FloodSub
},
config: {
peerDiscovery: {
Expand All @@ -451,8 +386,8 @@ const createLibp2p = async ({
enabled: dht,
kBucketSize: 20
},
EXPERIMENTAL: {
pubsub: pubsub
pubsub: {
enabled: Boolean(pubsub)
}
}
}, {
Expand Down
1 change: 1 addition & 0 deletions test/daemon/config.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ describe('configuration', () => {
})

await daemon.start()

const addrs = daemon.libp2p.peerInfo.multiaddrs.toArray()
expect(addrs).to.eql([
ma('/dns/ipfs.io')
Expand Down
Loading