Skip to content
This repository has been archived by the owner on Mar 10, 2020. It is now read-only.

Commit

Permalink
feat(pubsub): Add pubsub api (#493)
Browse files Browse the repository at this point in the history
* feat(pubsub): Add pubsub api
  • Loading branch information
dignifiedquire authored and daviddias committed Mar 30, 2017
1 parent 820150c commit d2eb925
Show file tree
Hide file tree
Showing 9 changed files with 466 additions and 2 deletions.
24 changes: 24 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,30 @@ $ ipfs config --json API.HTTPHeaders.Access-Control-Allow-Methods "[\"PUT\", \"P

> `js-ipfs-api` follows the spec defined by [`interface-ipfs-core`](https://github.com/ipfs/interface-ipfs-core), which concerns the interface to expect from IPFS implementations. This interface is a currently active endeavor. You can use it today to consult the methods available.
#### Caveats

##### Pubsub

**Currently, the [PubSub API only works in Node.js envinroment](https://github.com/ipfs/js-ipfs-api/issues/518)**

We currently don't support pubsub when run in the browser, and we test it with separate set of tests to make sure if it's being used in the browser, pubsub errors.

More info: https://github.com/ipfs/js-ipfs-api/issues/518

This means:
- You can use pubsub from js-ipfs-api in Node.js
- You can use pubsub from js-ipfs-api in Electron
(when js-ipfs-api is ran in the main process of Electron)
- You can't use pubsub from js-ipfs-api in the browser
- You can't use pubsub from js-ipfs-api in Electron's
renderer process
- You can use pubsub from js-ipfs in the browsers
- You can use pubsub from js-ipfs in Node.js
- You can use pubsub from js-ipfs in Electron
(in both the main process and the renderer process)
- See https://github.com/ipfs/js-ipfs for details on
pubsub in js-ipfs

##### [bitswap]()

- [`ipfs.bitswap.wantlist()`]()
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
"eslint-plugin-react": "^6.10.3",
"gulp": "^3.9.1",
"hapi": "^16.1.0",
"interface-ipfs-core": "~0.26.1",
"interface-ipfs-core": "~0.26.2",
"ipfsd-ctl": "~0.20.0",
"pre-commit": "^1.2.2",
"socket.io": "^1.7.3",
Expand Down
162 changes: 162 additions & 0 deletions src/api/pubsub.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
'use strict'

const promisify = require('promisify-es6')
const EventEmitter = require('events')
const eos = require('end-of-stream')
const isNode = require('detect-node')
const PubsubMessageStream = require('../pubsub-message-stream')
const stringlistToArray = require('../stringlist-to-array')

const NotSupportedError = () => new Error('pubsub is currently not supported when run in the browser')

/* Public API */
module.exports = (send) => {
/* Internal subscriptions state and functions */
const ps = new EventEmitter()
const subscriptions = {}
ps.id = Math.random()
return {
subscribe: (topic, options, handler, callback) => {
const defaultOptions = {
discover: false
}

if (typeof options === 'function') {
callback = handler
handler = options
options = defaultOptions
}

if (!options) {
options = defaultOptions
}

// Throw an error if ran in the browsers
if (!isNode) {
if (!callback) {
return Promise.reject(NotSupportedError())
}
return callback(NotSupportedError())
}

// promisify doesn't work as we always pass a
// function as last argument (`handler`)
if (!callback) {
return new Promise((resolve, reject) => {
subscribe(topic, options, handler, (err) => {
if (err) {
return reject(err)
}
resolve()
})
})
}

subscribe(topic, options, handler, callback)
},
unsubscribe: (topic, handler) => {
if (!isNode) {
throw NotSupportedError()
}

if (ps.listenerCount(topic) === 0 || !subscriptions[topic]) {
throw new Error(`Not subscribed to '${topic}'`)
}

ps.removeListener(topic, handler)

// Drop the request once we are actualy done
if (ps.listenerCount(topic) === 0) {
subscriptions[topic].abort()
subscriptions[topic] = null
}
},
publish: promisify((topic, data, callback) => {
if (!isNode) {
return callback(NotSupportedError())
}

if (!Buffer.isBuffer(data)) {
return callback(new Error('data must be a Buffer'))
}

const request = {
path: 'pubsub/pub',
args: [topic, data]
}

send(request, callback)
}),
ls: promisify((callback) => {
if (!isNode) {
return callback(NotSupportedError())
}

const request = {
path: 'pubsub/ls'
}

send.andTransform(request, stringlistToArray, callback)
}),
peers: promisify((topic, callback) => {
if (!isNode) {
return callback(NotSupportedError())
}

const request = {
path: 'pubsub/peers',
args: [topic]
}

send.andTransform(request, stringlistToArray, callback)
}),
setMaxListeners (n) {
return ps.setMaxListeners(n)
}
}

function subscribe (topic, options, handler, callback) {
ps.on(topic, handler)
if (subscriptions[topic]) {
return callback()
}

// Request params
const request = {
path: 'pubsub/sub',
args: [topic],
qs: {
discover: options.discover
}
}

// Start the request and transform the response
// stream to Pubsub messages stream
subscriptions[topic] = send.andTransform(request, PubsubMessageStream.from, (err, stream) => {
if (err) {
subscriptions[topic] = null
ps.removeListener(topic, handler)
return callback(err)
}

stream.on('data', (msg) => {
ps.emit(topic, msg)
})

stream.on('error', (err) => {
ps.emit('error', err)
})

eos(stream, (err) => {
if (err) {
ps.emit('error', err)
}

subscriptions[topic] = null
ps.removeListener(topic, handler)
})

callback()
})
}
}
1 change: 1 addition & 0 deletions src/load-commands.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ function requireCommands () {
refs: require('./api/refs'),
repo: require('./api/repo'),
swarm: require('./api/swarm'),
pubsub: require('./api/pubsub'),
update: require('./api/update'),
version: require('./api/version')
}
Expand Down
33 changes: 33 additions & 0 deletions src/pubsub-message-stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
'use strict'

const TransformStream = require('readable-stream').Transform
const PubsubMessage = require('./pubsub-message-utils')

class PubsubMessageStream extends TransformStream {
constructor (options) {
const opts = Object.assign(options || {}, { objectMode: true })
super(opts)
}

static from (inputStream, callback) {
let outputStream = inputStream.pipe(new PubsubMessageStream())
inputStream.on('end', () => outputStream.emit('end'))
callback(null, outputStream)
}

_transform (obj, enc, callback) {
let msg
try {
msg = PubsubMessage.deserialize(obj, 'base64')
} catch (err) {
// Not a valid pubsub message
// go-ipfs returns '{}' as the very first object atm, we skip that
return callback()
}

this.push(msg)
callback()
}
}

module.exports = PubsubMessageStream
39 changes: 39 additions & 0 deletions src/pubsub-message-utils.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
'use strict'

const bs58 = require('bs58')

module.exports = {
deserialize (data, enc) {
enc = enc ? enc.toLowerCase() : 'json'

if (enc === 'json') {
return deserializeFromJson(data)
} else if (enc === 'base64') {
return deserializeFromBase64(data)
}

throw new Error(`Unsupported encoding: '${enc}'`)
}
}

function deserializeFromJson (data) {
const json = JSON.parse(data)
return deserializeFromBase64(json)
}

function deserializeFromBase64 (obj) {
if (!isPubsubMessage(obj)) {
throw new Error(`Not a pubsub message`)
}

return {
from: bs58.encode(new Buffer(obj.from, 'base64')).toString(),
seqno: new Buffer(obj.seqno, 'base64'),
data: new Buffer(obj.data, 'base64'),
topicCIDs: obj.topicIDs || obj.topicCIDs
}
}

function isPubsubMessage (obj) {
return obj && obj.from && obj.seqno && obj.data && (obj.topicIDs || obj.topicCIDs)
}
23 changes: 23 additions & 0 deletions test/interface/pubsub.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/* eslint-env mocha */

'use strict'

const test = require('interface-ipfs-core')
const FactoryClient = require('../ipfs-factory/client')
const isNode = require('detect-node')

if (isNode) {
let fc

const common = {
setup: function (callback) {
fc = new FactoryClient()
callback(null, fc)
},
teardown: function (callback) {
fc.dismantle(callback)
}
}

test.pubsub(common)
}
2 changes: 1 addition & 1 deletion test/ipfs-factory/daemon-spawner.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ function spawnEphemeralNode (callback) {
node.setConfig(configKey, configVal, cb)
}, cb)
},
(cb) => node.startDaemon(cb)
(cb) => node.startDaemon(['--enable-pubsub-experiment'], cb)
], (err) => callback(err, node))
})
}
Loading

0 comments on commit d2eb925

Please sign in to comment.