Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

Commit

Permalink
fix(pubsub): new wire format in http rpc
Browse files Browse the repository at this point in the history
  • Loading branch information
lidel committed Oct 22, 2021
1 parent afeb20d commit 1dcac76
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 44 deletions.
22 changes: 11 additions & 11 deletions packages/ipfs-cli/test/pubsub.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ describe('pubsub', () => {
timeout: undefined
}

it('should list toic peers', async () => {
it('should list topic peers', async () => {
const subName = 'sub-name'
const peer = 'peer-id'

Expand All @@ -79,7 +79,7 @@ describe('pubsub', () => {
expect(out).to.equal(`${peer}\n`)
})

it('should list toic peers with a timeout', async () => {
it('should list topic peers with a timeout', async () => {
const subName = 'sub-name'
const peer = 'peer-id'

Expand All @@ -101,19 +101,19 @@ describe('pubsub', () => {
}

it('should publish message', async () => {
const subName = 'sub-name'
const data = 'data'
const subName = 'sub-name-1'
const data = 'data\r\nfirst\nZażółć gęślą jaźń😇'

await cli(`pubsub pub ${subName} ${data}`, { ipfs })
await cli(`pubsub pub ${subName} "${data}"`, { ipfs })

expect(ipfs.pubsub.publish.calledWith(subName, uint8ArrayFromString(data), defaultOptions)).to.be.true()
})

it('should publish message with timeout', async () => {
const subName = 'sub-name'
const data = 'data'
const subName = 'sub-name-2'
const data = 'data\r\nsecond\nZażółć gęślą jaźń😇'

await cli(`pubsub pub ${subName} ${data} --timeout=1s`, { ipfs })
await cli(`pubsub pub ${subName} "${data}" --timeout=1s`, { ipfs })

expect(ipfs.pubsub.publish.calledWith(subName, uint8ArrayFromString(data), {
...defaultOptions,
Expand All @@ -128,17 +128,17 @@ describe('pubsub', () => {
}

it('should subscribe', async () => {
const subName = 'sub-name'
const subName = 'sub\nname'

await cli(`pubsub sub ${subName}`, { ipfs })
await cli(`pubsub sub "${subName}"`, { ipfs })

expect(ipfs.pubsub.subscribe.calledWith(subName, sinon.match.func, defaultOptions)).to.be.true()
})

it('should subscribe with a timeout', async () => {
const subName = 'sub-name'

await cli(`pubsub sub ${subName} --timeout=1s`, { ipfs })
await cli(`pubsub sub "${subName}" --timeout=1s`, { ipfs })

expect(ipfs.pubsub.subscribe.calledWith(subName, sinon.match.func, {
...defaultOptions,
Expand Down
41 changes: 41 additions & 0 deletions packages/ipfs-http-client/src/lib/http-rpc-wire-format.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import { base64url } from 'multiformats/bases/base64'

/* HTTP RPC:
* - wraps binary data in multibase. base64url is used to avoid issues
* when a binary data is passed as search param in URL.
* Historical context: https://github.com/ipfs/go-ipfs/issues/7939
* Multibase wrapping introduced in: https://github.com/ipfs/go-ipfs/pull/8183
*/

/**
* @param {Array<string>} strings
* @returns {Array<string>} strings
*/
const rpcArrayToTextArray = strings => {
if (Array.isArray(strings)) {
return strings.map(rpcToText)
}
return strings
}

/**
* @param {string} mb
* @returns {string}
*/
const rpcToText = mb => uint8ArrayToString(rpcToBytes(mb))

/**
* @param {string} mb
* @returns {Uint8Array}
*/
const rpcToBytes = mb => base64url.decode(mb)

/**
* @param {string} text
* @returns {string}
*/
const textToUrlSafeRpc = text => base64url.encode(uint8ArrayFromString(text))

export { rpcArrayToTextArray, rpcToText, rpcToBytes, textToUrlSafeRpc }
4 changes: 2 additions & 2 deletions packages/ipfs-http-client/src/pubsub/ls.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { configure } from '../lib/configure.js'
import { toUrlSearchParams } from '../lib/to-url-search-params.js'
import { rpcArrayToTextArray } from '../lib/http-rpc-wire-format.js'

/**
* @typedef {import('../types').HTTPClientExtraOptions} HTTPClientExtraOptions
Expand All @@ -17,8 +18,7 @@ export const createLs = configure(api => {
headers: options.headers
})).json()

// TODO: unwrap topic names from multibase
return Strings || []
return rpcArrayToTextArray(Strings) || []
}
return ls
})
3 changes: 2 additions & 1 deletion packages/ipfs-http-client/src/pubsub/peers.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { configure } from '../lib/configure.js'
import { toUrlSearchParams } from '../lib/to-url-search-params.js'
import { textToUrlSafeRpc } from '../lib/http-rpc-wire-format.js'

/**
* @typedef {import('../types').HTTPClientExtraOptions} HTTPClientExtraOptions
Expand All @@ -14,7 +15,7 @@ export const createPeers = configure(api => {
const res = await api.post('pubsub/peers', {
signal: options.signal,
searchParams: toUrlSearchParams({
arg: topic,
arg: textToUrlSafeRpc(topic),
...options
}),
headers: options.headers
Expand Down
4 changes: 2 additions & 2 deletions packages/ipfs-http-client/src/pubsub/publish.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { configure } from '../lib/configure.js'
import { toUrlSearchParams } from '../lib/to-url-search-params.js'
import { multipartRequest } from 'ipfs-core-utils/multipart-request'
import { abortSignal } from '../lib/abort-signal.js'
import { textToUrlSafeRpc } from '../lib/http-rpc-wire-format.js'
import { AbortController } from 'native-abort-controller'

/**
Expand All @@ -14,9 +15,8 @@ export const createPublish = configure(api => {
* @type {PubsubAPI["publish"]}
*/
async function publish (topic, data, options = {}) {
// TODO: wrap topic in multibase
const searchParams = toUrlSearchParams({
arg: topic,
arg: textToUrlSafeRpc(topic),
...options
})

Expand Down
16 changes: 6 additions & 10 deletions packages/ipfs-http-client/src/pubsub/subscribe.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import debug from 'debug'
import { configure } from '../lib/configure.js'
import { toUrlSearchParams } from '../lib/to-url-search-params.js'
import { textToUrlSafeRpc, rpcArrayToTextArray, rpcToBytes } from '../lib/http-rpc-wire-format.js'
const log = debug('ipfs-http-client:pubsub:subscribe')

/**
Expand Down Expand Up @@ -39,12 +38,11 @@ export const createSubscribe = (options, subsTracker) => {
// is received. If this doesn't happen within 1 second assume success
const ffWorkaround = setTimeout(() => done(), 1000)

// TODO: wrap topic in multibase
// Do this async to not block Firefox
api.post('pubsub/sub', {
signal: options.signal,
searchParams: toUrlSearchParams({
arg: topic,
arg: textToUrlSafeRpc(topic),
...options
}),
headers: options.headers
Expand Down Expand Up @@ -95,13 +93,11 @@ async function readMessages (response, { onMessage, onEnd, onError }) {
continue
}

// TODO: multibase data, seqno and topics
// TODO: parse string and get peerid bytes using libp2p lib
onMessage({
from: uint8ArrayToString(uint8ArrayFromString(msg.from, 'base64pad'), 'base58btc'),
data: uint8ArrayFromString(msg.data, 'base64pad'),
seqno: uint8ArrayFromString(msg.seqno, 'base64pad'),
topicIDs: msg.topicIDs
from: msg.from,
data: rpcToBytes(msg.data),
seqno: rpcToBytes(msg.seqno),
topicIDs: rpcArrayToTextArray(msg.topicIDs)
})
} catch (/** @type {any} */ err) {
err.message = `Failed to parse pubsub message: ${err.message}`
Expand Down
2 changes: 1 addition & 1 deletion packages/ipfs-http-client/test/utils/factory.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ const commonOptions = {

const commonOverrides = {
go: {
ipfsBin: isNode ? path() : undefined
ipfsBin: isNode ? (process.env.IPFS_GO_EXEC || path()) : undefined
}
}

Expand Down
49 changes: 32 additions & 17 deletions packages/ipfs-http-server/src/api/resources/pubsub.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,22 @@ import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import { streamResponse } from '../../utils/stream-response.js'
import pushable from 'it-pushable'
import { base64url } from 'multiformats/bases/base64'

const preDecodeTopicFromHttpRpc = {
assign: 'topic',
/**
* @param {import('../../types').Request} request
* @param {import('@hapi/hapi').ResponseToolkit} _h
*/
method: async (request, _h) => {
try {
return uint8ArrayToString(base64url.decode(request.query.topic))
} catch (/** @type {any} */ err) {
throw Boom.boomify(err, { message: `Failed to decode topic from HTTP RPC form ${request.query.topic}` })
}
}
}

export const subscribeResource = {
options: {
Expand All @@ -24,7 +40,8 @@ export const subscribeResource = {
override: true,
ignoreUndefined: true
})
}
},
pre: [preDecodeTopicFromHttpRpc]
},
/**
* @param {import('../../types').Request} request
Expand All @@ -40,8 +57,8 @@ export const subscribeResource = {
ipfs
}
},
query: {
topic
pre: {
topic // decoded version created by preDecodeTopicFromHttpRpc
}
} = request

Expand All @@ -56,13 +73,11 @@ export const subscribeResource = {
* @type {import('ipfs-core-types/src/pubsub').MessageHandlerFn}
*/
const handler = (msg) => {
// TODO: data, seqno and topicIDs in multibase
// TODO: from should use canonical toString from peerid libp2p lib
output.push({
from: uint8ArrayToString(uint8ArrayFromString(msg.from, 'base58btc'), 'base64pad'),
data: uint8ArrayToString(msg.data, 'base64pad'),
seqno: uint8ArrayToString(msg.seqno, 'base64pad'),
topicIDs: msg.topicIDs
from: msg.from, // TODO: switch to PeerId.parse(msg.from).toString() when go-ipfs defaults to CIDv1
data: base64url.encode(msg.data),
seqno: base64url.encode(msg.seqno),
topicIDs: msg.topicIDs.map(t => base64url.encode(uint8ArrayFromString(t)))
})
}

Expand Down Expand Up @@ -92,7 +107,7 @@ export const publishResource = {
parse: false,
output: 'stream'
},
pre: [{
pre: [preDecodeTopicFromHttpRpc, {
assign: 'data',
/**
* @param {import('../../types').Request} request
Expand Down Expand Up @@ -149,16 +164,15 @@ export const publishResource = {
}
},
pre: {
topic,
data
},
query: {
topic,
timeout
}
} = request

try {
// TODO: unwrap topic from multibase?
await ipfs.pubsub.publish(topic, data, {
signal,
timeout
Expand Down Expand Up @@ -212,8 +226,7 @@ export const lsResource = {
throw Boom.boomify(err, { message: 'Failed to list subscriptions' })
}

// TODO: multibase topic names in Strings array
return h.response({ Strings: subscriptions })
return h.response({ Strings: subscriptions.map(s => base64url.encode(uint8ArrayFromString(s))) })
}
}

Expand All @@ -232,7 +245,8 @@ export const peersResource = {
override: true,
ignoreUndefined: true
})
}
},
pre: [preDecodeTopicFromHttpRpc]
},
/**
* @param {import('../../types').Request} request
Expand All @@ -248,15 +262,16 @@ export const peersResource = {
ipfs
}
},
pre: {
topic
},
query: {
topic,
timeout
}
} = request

let peers
try {
// TODO: unwrap topic from multibase
peers = await ipfs.pubsub.peers(topic, {
signal,
timeout
Expand Down

0 comments on commit 1dcac76

Please sign in to comment.