Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

Commit

Permalink
fix: add onError to pubsub.subscribe types (#3706)
Browse files Browse the repository at this point in the history
This was missed in #3468.  Also runs the type checker on the pubsub http client
tests to ensure typing is correct.
  • Loading branch information
achingbrain authored Jun 4, 2021
1 parent 7cf404c commit d910aea
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 21 deletions.
10 changes: 9 additions & 1 deletion packages/ipfs-core-types/src/pubsub/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ export interface API<OptionExtension = {}> {
* console.log(`subscribed to ${topic}`)
* ```
*/
subscribe: (topic: string, handler: MessageHandlerFn, options?: AbortOptions & OptionExtension) => Promise<void>
subscribe: (topic: string, handler: MessageHandlerFn, options?: SubscribeOptions & OptionExtension) => Promise<void>

/**
* Unsubscribes from a pubsub topic
Expand Down Expand Up @@ -81,4 +81,12 @@ export interface Message {
topicIDs: string[]
}

export interface SubscribeOptions extends AbortOptions {
/**
* A callback to receive an error if one occurs during processing
* subscription messages. Only supported by ipfs-http-client.
*/
onError?: (err: Error) => void
}

export type MessageHandlerFn = (message: Message) => void
1 change: 1 addition & 0 deletions packages/ipfs-http-client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
"it-concat": "^2.0.0",
"it-first": "^1.0.4",
"nock": "^13.0.2",
"p-defer": "^3.0.0",
"rimraf": "^3.0.2"
},
"engines": {
Expand Down
1 change: 1 addition & 0 deletions packages/ipfs-http-client/test/commands.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const f = require('./utils/factory')()
describe('.commands', function () {
this.timeout(60 * 1000)

/** @type {import('ipfs-core-types').IPFS} */
let ipfs

before(async () => {
Expand Down
11 changes: 10 additions & 1 deletion packages/ipfs-http-client/test/node/agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ const { expect } = require('aegir/utils/chai')
const ipfsClient = require('../../src').create
const delay = require('delay')

/**
* @typedef {import('http').IncomingMessage} IncomingMessage
*
* @param {(message: IncomingMessage) => Promise<any>} handler
*/
function startServer (handler) {
return new Promise((resolve) => {
// spin up a test http server to inspect the requests made by the library
Expand All @@ -20,15 +25,18 @@ function startServer (handler) {
})

server.listen(0, () => {
const addressInfo = server.address()

resolve({
port: server.address().port,
port: addressInfo && (typeof addressInfo === 'string' ? addressInfo : addressInfo.port),
close: () => server.close()
})
})
})
}

describe('agent', function () {
/** @type {import('http').Agent} */
let agent

before(() => {
Expand All @@ -40,6 +48,7 @@ describe('agent', function () {
})

it('restricts the number of concurrent connections', async () => {
/** @type {((arg: any) => void)[]} */
const responses = []

const server = await startServer(() => {
Expand Down
36 changes: 18 additions & 18 deletions packages/ipfs-http-client/test/pubsub.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@

const { expect } = require('aegir/utils/chai')
const { AbortController } = require('native-abort-controller')
const uint8ArrayFromString = require('uint8arrays/from-string')
const defer = require('p-defer')

const f = require('./utils/factory')()

describe('.pubsub', function () {
this.timeout(20 * 1000)
describe('.subscribe', () => {
/** @type {import('ipfs-core-types').IPFS} */
let ipfs
/** @type {any} */
let ctl

beforeEach(async function () {
Expand All @@ -27,8 +31,7 @@ describe('.pubsub', function () {
it('.onError when connection is closed', async () => {
const topic = 'gossipboom'
let messageCount = 0
let onError
const error = new Promise(resolve => { onError = resolve })
const onError = defer()

await ipfs.pubsub.subscribe(topic, message => {
messageCount++
Expand All @@ -38,47 +41,44 @@ describe('.pubsub', function () {
ctl.stop().catch()
}
}, {
onError
onError: onError.resolve
})

await ipfs.pubsub.publish(topic, 'hello')
await ipfs.pubsub.publish(topic, 'bye')
await ipfs.pubsub.publish(topic, uint8ArrayFromString('hello'))
await ipfs.pubsub.publish(topic, uint8ArrayFromString('bye'))

await expect(error).to.eventually.be.fulfilled().and.to.be.instanceOf(Error)
await expect(onError.promise).to.eventually.be.fulfilled().and.to.be.instanceOf(Error)
})

it('does not call onError when aborted', async () => {
const controller = new AbortController()
const topic = 'gossipabort'
const messages = []
let onError
let onReceived

const received = new Promise(resolve => { onReceived = resolve })
const error = new Promise(resolve => { onError = resolve })
const onError = defer()
const onReceived = defer()

await ipfs.pubsub.subscribe(topic, message => {
messages.push(message)
if (messages.length === 2) {
onReceived()
onReceived.resolve()
}
}, {
onError,
onError: onError.resolve,
signal: controller.signal
})

await ipfs.pubsub.publish(topic, 'hello')
await ipfs.pubsub.publish(topic, 'bye')
await ipfs.pubsub.publish(topic, uint8ArrayFromString('hello'))
await ipfs.pubsub.publish(topic, uint8ArrayFromString('bye'))

await received
await onReceived.promise
controller.abort()

// Stop the daemon
await ctl.stop()
// Just to make sure no error is caused by above line
setTimeout(onError, 200, 'aborted')
setTimeout(onError.resolve, 200, 'aborted')

await expect(error).to.eventually.be.fulfilled().and.to.equal('aborted')
await expect(onError.promise).to.eventually.be.fulfilled().and.to.equal('aborted')
})
})
})
2 changes: 2 additions & 0 deletions packages/ipfs-http-client/test/utils/factory.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
'use strict'

// @ts-ignore no types
const { createFactory } = require('ipfsd-ctl')
const merge = require('merge-options')
const { isNode } = require('ipfs-utils/src/env')
Expand All @@ -13,6 +14,7 @@ const commonOptions = {

const commonOverrides = {
go: {
// @ts-ignore go-ipfs has no types
ipfsBin: isNode ? require('go-ipfs').path() : undefined
}
}
Expand Down
4 changes: 3 additions & 1 deletion packages/ipfs-http-client/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
"outDir": "dist"
},
"include": [
"src"
"src",
"test/utils/factory.js",
"test/pubsub.spec.js"
],
"references": [
{
Expand Down

0 comments on commit d910aea

Please sign in to comment.