Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

Commit

Permalink
chore: simplify streaming http response code and use instances of pub…
Browse files Browse the repository at this point in the history
…sub tracker instead of a singleton (#3719)

Simplifies deps, pass peer id as string instead of cid.
  • Loading branch information
achingbrain authored Jun 18, 2021
1 parent b5470d4 commit 9e48da3
Show file tree
Hide file tree
Showing 21 changed files with 295 additions and 341 deletions.
9 changes: 2 additions & 7 deletions packages/interface-ipfs-core/src/pubsub/unsubscribe.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
const { isBrowser, isWebWorker, isElectronRenderer } = require('ipfs-utils/src/env')
const { getTopic } = require('./utils')
const { getDescribe, getIt, expect } = require('../utils/mocha')
const delay = require('delay')

/** @typedef { import("ipfsd-ctl/src/factory") } Factory */
/**
Expand Down Expand Up @@ -41,9 +40,7 @@ module.exports = (common, options) => {
await ipfs.pubsub.unsubscribe(someTopic, handlers[i])
}

await delay(100)
const topics = await ipfs.pubsub.ls()
expect(topics).to.eql([])
return expect(ipfs.pubsub.ls()).to.eventually.eql([])
})

it(`should subscribe ${count} handlers and unsubscribe once with no reference to the handlers`, async () => {
Expand All @@ -53,9 +50,7 @@ module.exports = (common, options) => {
}
await ipfs.pubsub.unsubscribe(someTopic)

await delay(100)
const topics = await ipfs.pubsub.ls()
expect(topics).to.eql([])
return expect(ipfs.pubsub.ls()).to.eventually.eql([])
})
})
}
6 changes: 4 additions & 2 deletions packages/ipfs-core-types/src/repo/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,16 @@ export interface GCOptions extends AbortOptions {
}

export interface GCError {
err: Error
err: Error,
cid?: never
}

export interface GCSuccess {
err?: never,
cid: CID
}

export type GCResult = GCSuccess | GCError
export type GCResult = GCSuccess | GCError

export interface StatResult {
numObjects: BigInt
Expand Down
20 changes: 13 additions & 7 deletions packages/ipfs-http-client/src/pubsub/index.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
'use strict'

const SubscriptionTracker = require('./subscription-tracker')

/**
* @param {import('../types').Options} config
*/
module.exports = config => ({
ls: require('./ls')(config),
peers: require('./peers')(config),
publish: require('./publish')(config),
subscribe: require('./subscribe')(config),
unsubscribe: require('./unsubscribe')(config)
})
module.exports = config => {
const subscriptionTracker = new SubscriptionTracker()

return {
ls: require('./ls')(config),
peers: require('./peers')(config),
publish: require('./publish')(config),
subscribe: require('./subscribe')(config, subscriptionTracker),
unsubscribe: require('./unsubscribe')(config, subscriptionTracker)
}
}
118 changes: 61 additions & 57 deletions packages/ipfs-http-client/src/pubsub/subscribe.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
const uint8ArrayFromString = require('uint8arrays/from-string')
const uint8ArrayToString = require('uint8arrays/to-string')
const log = require('debug')('ipfs-http-client:pubsub:subscribe')
const SubscriptionTracker = require('./subscription-tracker')
const configure = require('../lib/configure')
const toUrlSearchParams = require('../lib/to-url-search-params')

Expand All @@ -12,70 +11,75 @@ const toUrlSearchParams = require('../lib/to-url-search-params')
* @typedef {import('ipfs-core-types/src/pubsub').Message} Message
* @typedef {(err: Error, fatal: boolean, msg?: Message) => void} ErrorHandlerFn
* @typedef {import('ipfs-core-types/src/pubsub').API<HTTPClientExtraOptions & { onError?: ErrorHandlerFn }>} PubsubAPI
* @typedef {import('../types').Options} Options
*/

module.exports = configure((api, options) => {
const subsTracker = SubscriptionTracker.singleton()

/**
* @type {PubsubAPI["subscribe"]}
*/
async function subscribe (topic, handler, options = {}) { // eslint-disable-line require-await
options.signal = subsTracker.subscribe(topic, handler, options.signal)

/** @type {(value?: any) => void} */
let done
/** @type {(error: Error) => void} */
let fail

const result = new Promise((resolve, reject) => {
done = resolve
fail = reject
})

// In Firefox, the initial call to fetch does not resolve until some data
// is received. If this doesn't happen within 1 second assume success
const ffWorkaround = setTimeout(() => done(), 1000)

// Do this async to not block Firefox
setTimeout(() => {
api.post('pubsub/sub', {
timeout: options.timeout,
signal: options.signal,
searchParams: toUrlSearchParams({
arg: topic,
...options
}),
headers: options.headers
/**
* @param {Options} options
* @param {import('./subscription-tracker')} subsTracker
*/
module.exports = (options, subsTracker) => {
return configure((api) => {
/**
* @type {PubsubAPI["subscribe"]}
*/
async function subscribe (topic, handler, options = {}) { // eslint-disable-line require-await
options.signal = subsTracker.subscribe(topic, handler, options.signal)

/** @type {(value?: any) => void} */
let done
/** @type {(error: Error) => void} */
let fail

const result = new Promise((resolve, reject) => {
done = resolve
fail = reject
})
.catch((err) => {
// Initial subscribe fail, ensure we clean up
subsTracker.unsubscribe(topic, handler)

fail(err)
// In Firefox, the initial call to fetch does not resolve until some data
// is received. If this doesn't happen within 1 second assume success
const ffWorkaround = setTimeout(() => done(), 1000)

// Do this async to not block Firefox
setTimeout(() => {
api.post('pubsub/sub', {
timeout: options.timeout,
signal: options.signal,
searchParams: toUrlSearchParams({
arg: topic,
...options
}),
headers: options.headers
})
.then((response) => {
clearTimeout(ffWorkaround)

if (!response) {
// if there was no response, the subscribe failed
return
}

readMessages(response, {
onMessage: handler,
onEnd: () => subsTracker.unsubscribe(topic, handler),
onError: options.onError
.catch((err) => {
// Initial subscribe fail, ensure we clean up
subsTracker.unsubscribe(topic, handler)

fail(err)
})
.then((response) => {
clearTimeout(ffWorkaround)

done()
})
}, 0)
if (!response) {
// if there was no response, the subscribe failed
return
}

return result
}
return subscribe
})
readMessages(response, {
onMessage: handler,
onEnd: () => subsTracker.unsubscribe(topic, handler),
onError: options.onError
})

done()
})
}, 0)

return result
}
return subscribe
})(options)
}

/**
* @param {import('ipfs-utils/src/types').ExtendedResponse} response
Expand Down
15 changes: 4 additions & 11 deletions packages/ipfs-http-client/src/pubsub/subscription-tracker.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,6 @@ class SubscriptionTracker {
this._subs = new Map()
}

static singleton () {
if (SubscriptionTracker.instance) return SubscriptionTracker.instance
SubscriptionTracker.instance = new SubscriptionTracker()
return SubscriptionTracker.instance
}

/**
* @param {string} topic
* @param {MessageHandlerFn} handler
Expand Down Expand Up @@ -63,13 +57,12 @@ class SubscriptionTracker {
unsubs = subs
}

if (!(this._subs.get(topic) || []).length) {
this._subs.delete(topic)
}

unsubs.forEach(s => s.controller.abort())
}
}

/**
* @type {SubscriptionTracker | null}
*/
SubscriptionTracker.instance = null

module.exports = SubscriptionTracker
10 changes: 4 additions & 6 deletions packages/ipfs-http-client/src/pubsub/unsubscribe.js
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
'use strict'

const SubscriptionTracker = require('./subscription-tracker')

/**
* @typedef {import('../types').HTTPClientExtraOptions} HTTPClientExtraOptions
* @typedef {import('ipfs-core-types/src/pubsub').API<HTTPClientExtraOptions>} PubsubAPI
* @typedef {import('../types').Options} Options
*/

/**
* @param {import('../types').Options} config
* @param {Options} options
* @param {import('./subscription-tracker')} subsTracker
*/
module.exports = config => {
const subsTracker = SubscriptionTracker.singleton()

module.exports = (options, subsTracker) => {
/**
* @type {PubsubAPI["unsubscribe"]}
*/
Expand Down
4 changes: 2 additions & 2 deletions packages/ipfs-http-client/src/refs/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ const toUrlSearchParams = require('../lib/to-url-search-params')
* @typedef {import('ipfs-core-types/src/refs').API<HTTPClientExtraOptions>} RefsAPI
*/

module.exports = configure((api, options) => {
module.exports = configure((api, opts) => {
/**
* @type {RefsAPI["refs"]}
*/
Expand All @@ -34,6 +34,6 @@ module.exports = configure((api, options) => {
}

return Object.assign(refs, {
local: require('./local')(options)
local: require('./local')(opts)
})
})
7 changes: 4 additions & 3 deletions packages/ipfs-http-server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,16 @@
"ipld-dag-pb": "^0.22.1",
"it-all": "^1.0.4",
"it-drain": "^1.0.3",
"it-filter": "^1.0.2",
"it-first": "^1.0.4",
"it-last": "^1.0.4",
"it-map": "^1.0.4",
"it-merge": "^1.0.1",
"it-multipart": "^2.0.0",
"it-pipe": "^1.1.0",
"it-pushable": "^1.4.0",
"it-reduce": "^1.0.5",
"it-tar": "^3.0.0",
"it-to-stream": "^1.0.0",
"iterable-ndjson": "^1.1.0",
"joi": "^17.2.1",
"just-safe-set": "^2.2.1",
"multiaddr": "^9.0.1",
Expand All @@ -64,7 +66,6 @@
"native-abort-controller": "^1.0.3",
"parse-duration": "^1.0.0",
"stream-to-it": "^0.2.2",
"streaming-iterables": "^5.0.2",
"uint8arrays": "^2.1.3",
"uri-to-multiaddr": "^5.0.0"
},
Expand Down
9 changes: 4 additions & 5 deletions packages/ipfs-http-server/src/api/resources/block.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ const Boom = require('@hapi/boom')
const { cidToString } = require('ipfs-core-utils/src/cid')
const all = require('it-all')
const { pipe } = require('it-pipe')
const { map } = require('streaming-iterables')
// @ts-ignore no types
const ndjson = require('iterable-ndjson')
const map = require('it-map')
const streamResponse = require('../../utils/stream-response')

exports.get = {
Expand Down Expand Up @@ -234,8 +232,9 @@ exports.rm = {
timeout,
signal
}),
map(({ cid, error }) => ({ Hash: cidToString(cid, { base: cidBase }), Error: error ? error.message : undefined })),
ndjson.stringify
async function * (source) {
yield * map(source, ({ cid, error }) => ({ Hash: cidToString(cid, { base: cidBase }), Error: error ? error.message : undefined }))
}
))
}
}
Expand Down
Loading

0 comments on commit 9e48da3

Please sign in to comment.