Skip to content
This repository has been archived by the owner on Mar 10, 2020. It is now read-only.

Commit

Permalink
fix: make pubsub.unsubscribe async and alter pubsub.subscribe signature
Browse files Browse the repository at this point in the history
BREAKING CHANGE: pubsub.unsubscribe is now async and argument order for pubsub.subscribe has changed

License: MIT
Signed-off-by: Alan Shaw <alan@tableflip.io>
  • Loading branch information
alanshaw authored and daviddias committed May 12, 2018
1 parent 6e8f491 commit b98f8f3
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 18 deletions.
38 changes: 27 additions & 11 deletions src/pubsub.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@ module.exports = (arg) => {
const subscriptions = {}
ps.id = Math.random()
return {
subscribe: (topic, options, handler, callback) => {
subscribe: (topic, handler, options, callback) => {
const defaultOptions = {
discover: false
}

if (typeof options === 'function') {
callback = handler
handler = options
callback = options
options = defaultOptions
}

Expand All @@ -39,14 +38,15 @@ module.exports = (arg) => {
if (!callback) {
return Promise.reject(NotSupportedError())
}
return callback(NotSupportedError())

return process.nextTick(() => callback(NotSupportedError()))
}

// promisify doesn't work as we always pass a
// function as last argument (`handler`)
if (!callback) {
return new Promise((resolve, reject) => {
subscribe(topic, options, handler, (err) => {
subscribe(topic, handler, options, (err) => {
if (err) {
return reject(err)
}
Expand All @@ -55,24 +55,40 @@ module.exports = (arg) => {
})
}

subscribe(topic, options, handler, callback)
subscribe(topic, handler, options, callback)
},
unsubscribe: (topic, handler) => {
unsubscribe: (topic, handler, callback) => {
if (!isNode) {
throw NotSupportedError()
if (!callback) {
return Promise.reject(NotSupportedError())
}

return process.nextTick(() => callback(NotSupportedError()))
}

if (ps.listenerCount(topic) === 0 || !subscriptions[topic]) {
throw new Error(`Not subscribed to '${topic}'`)
const err = new Error(`Not subscribed to '${topic}'`)

if (!callback) {
return Promise.reject(err)
}

return process.nextTick(() => callback(err))
}

ps.removeListener(topic, handler)

// Drop the request once we are actualy done
// Drop the request once we are actually done
if (ps.listenerCount(topic) === 0) {
subscriptions[topic].abort()
subscriptions[topic] = null
}

if (!callback) {
return Promise.resolve()
}

process.nextTick(() => callback())
},
publish: promisify((topic, data, callback) => {
if (!isNode) {
Expand Down Expand Up @@ -118,7 +134,7 @@ module.exports = (arg) => {
}
}

function subscribe (topic, options, handler, callback) {
function subscribe (topic, handler, options, callback) {
ps.on(topic, handler)

if (subscriptions[topic]) {
Expand Down
11 changes: 4 additions & 7 deletions test/pubsub-in-browser.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ describe('.pubsub is not supported in the browser, yet!', function () {
describe('.subscribe', () => {
const handler = () => {}
it('throws an error if called in the browser', (done) => {
ipfs.pubsub.subscribe(topic, {}, handler, (err, topics) => {
ipfs.pubsub.subscribe(topic, handler, {}, (err, topics) => {
expect(err).to.exist()
expect(err.message).to.equal(expectedError)
done()
Expand Down Expand Up @@ -115,7 +115,7 @@ describe('.pubsub is not supported in the browser, yet!', function () {
describe('.subscribe', () => {
const handler = () => {}
it('throws an error if called in the browser', (done) => {
ipfs.pubsub.subscribe(topic, {}, handler)
ipfs.pubsub.subscribe(topic, handler, {})
.catch((err) => {
expect(err).to.exist()
expect(err.message).to.equal(expectedError)
Expand Down Expand Up @@ -148,14 +148,11 @@ describe('.pubsub is not supported in the browser, yet!', function () {

describe('.unsubscribe', () => {
it('throws an error if called in the browser', (done) => {
try {
ipfs.pubsub.unsubscribe()
done('unsubscribe() didn\'t throw an error')
} catch (err) {
ipfs.pubsub.unsubscribe('test', () => {}, (err) => {
expect(err).to.exist()
expect(err.message).to.equal(expectedError)
done()
}
})
})
})
})
Expand Down

0 comments on commit b98f8f3

Please sign in to comment.