From 0b7326a855ec266b26eef24f2e41fe9113582089 Mon Sep 17 00:00:00 2001 From: Marcin Rataj Date: Wed, 15 Dec 2021 13:46:48 +0100 Subject: [PATCH] fix(pubsub): multibase in pubsub http rpc (#3922) This PR aims to restore interop with go-ipfs by applying the same changes as in https://github.com/ipfs/go-ipfs/pull/8183 TLDR is that we clean up and unify the API. BREAKING CHANGE: We had to make breaking changes to `pubsub` commands sent over HTTP RPC to fix data corruption caused by topic names and payload bytes that included `\n`. More details in https://github.com/ipfs/go-ipfs/issues/7939 and https://github.com/ipfs/go-ipfs/pull/8183 --- package.json | 2 +- src/files/rm.js | 11 ++++++++- src/lib/http-rpc-wire-format.js | 41 +++++++++++++++++++++++++++++++++ src/pubsub/ls.js | 3 ++- src/pubsub/peers.js | 3 ++- src/pubsub/publish.js | 3 ++- src/pubsub/subscribe.js | 13 +++++------ test/utils/factory.js | 2 +- 8 files changed, 65 insertions(+), 13 deletions(-) create mode 100644 src/lib/http-rpc-wire-format.js diff --git a/package.json b/package.json index 32384042a..e019f3983 100644 --- a/package.json +++ b/package.json @@ -77,7 +77,7 @@ "devDependencies": { "aegir": "^36.0.1", "delay": "^5.0.0", - "go-ipfs": "0.10.0", + "go-ipfs": "0.11.0", "ipfsd-ctl": "^10.0.4", "it-all": "^1.0.4", "it-first": "^1.0.4", diff --git a/src/files/rm.js b/src/files/rm.js index 8f4ec4581..03aa688a2 100644 --- a/src/files/rm.js +++ b/src/files/rm.js @@ -1,5 +1,6 @@ import { configure } from '../lib/configure.js' import { toUrlSearchParams } from '../lib/to-url-search-params.js' +import HTTP from 'ipfs-utils/src/http.js' /** * @typedef {import('../types').HTTPClientExtraOptions} HTTPClientExtraOptions @@ -20,7 +21,15 @@ export const createRm = configure(api => { headers: options.headers }) - await res.text() + const body = await res.text() + // we don't expect text body to be ever present + // (if so, it means an error such as https://github.com/ipfs/go-ipfs/issues/8606) + if (body !== '') { + /** @type {Error} */ + const error = new HTTP.HTTPError(res) + error.message = body + throw error + } } return rm }) diff --git a/src/lib/http-rpc-wire-format.js b/src/lib/http-rpc-wire-format.js new file mode 100644 index 000000000..6e28747d1 --- /dev/null +++ b/src/lib/http-rpc-wire-format.js @@ -0,0 +1,41 @@ +import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' +import { toString as uint8ArrayToString } from 'uint8arrays/to-string' +import { base64url } from 'multiformats/bases/base64' + +/* HTTP RPC: + * - wraps binary data in multibase. base64url is used to avoid issues + * when a binary data is passed as search param in URL. + * Historical context: https://github.com/ipfs/go-ipfs/issues/7939 + * Multibase wrapping introduced in: https://github.com/ipfs/go-ipfs/pull/8183 + */ + +/** + * @param {Array} strings + * @returns {Array} strings + */ +const rpcArrayToTextArray = strings => { + if (Array.isArray(strings)) { + return strings.map(rpcToText) + } + return strings +} + +/** + * @param {string} mb + * @returns {string} + */ +const rpcToText = mb => uint8ArrayToString(rpcToBytes(mb)) + +/** + * @param {string} mb + * @returns {Uint8Array} + */ +const rpcToBytes = mb => base64url.decode(mb) + +/** + * @param {string} text + * @returns {string} + */ +const textToUrlSafeRpc = text => base64url.encode(uint8ArrayFromString(text)) + +export { rpcArrayToTextArray, rpcToText, rpcToBytes, textToUrlSafeRpc } diff --git a/src/pubsub/ls.js b/src/pubsub/ls.js index 920e986c6..94fb4227a 100644 --- a/src/pubsub/ls.js +++ b/src/pubsub/ls.js @@ -1,5 +1,6 @@ import { configure } from '../lib/configure.js' import { toUrlSearchParams } from '../lib/to-url-search-params.js' +import { rpcArrayToTextArray } from '../lib/http-rpc-wire-format.js' /** * @typedef {import('../types').HTTPClientExtraOptions} HTTPClientExtraOptions @@ -17,7 +18,7 @@ export const createLs = configure(api => { headers: options.headers })).json() - return Strings || [] + return rpcArrayToTextArray(Strings) || [] } return ls }) diff --git a/src/pubsub/peers.js b/src/pubsub/peers.js index 33a0009f5..a7cfff6c0 100644 --- a/src/pubsub/peers.js +++ b/src/pubsub/peers.js @@ -1,5 +1,6 @@ import { configure } from '../lib/configure.js' import { toUrlSearchParams } from '../lib/to-url-search-params.js' +import { textToUrlSafeRpc } from '../lib/http-rpc-wire-format.js' /** * @typedef {import('../types').HTTPClientExtraOptions} HTTPClientExtraOptions @@ -14,7 +15,7 @@ export const createPeers = configure(api => { const res = await api.post('pubsub/peers', { signal: options.signal, searchParams: toUrlSearchParams({ - arg: topic, + arg: textToUrlSafeRpc(topic), ...options }), headers: options.headers diff --git a/src/pubsub/publish.js b/src/pubsub/publish.js index 5c51dca64..b393e4889 100644 --- a/src/pubsub/publish.js +++ b/src/pubsub/publish.js @@ -2,6 +2,7 @@ import { configure } from '../lib/configure.js' import { toUrlSearchParams } from '../lib/to-url-search-params.js' import { multipartRequest } from 'ipfs-core-utils/multipart-request' import { abortSignal } from '../lib/abort-signal.js' +import { textToUrlSafeRpc } from '../lib/http-rpc-wire-format.js' import { AbortController } from 'native-abort-controller' /** @@ -15,7 +16,7 @@ export const createPublish = configure(api => { */ async function publish (topic, data, options = {}) { const searchParams = toUrlSearchParams({ - arg: topic, + arg: textToUrlSafeRpc(topic), ...options }) diff --git a/src/pubsub/subscribe.js b/src/pubsub/subscribe.js index 7ae5b97a9..468206ead 100644 --- a/src/pubsub/subscribe.js +++ b/src/pubsub/subscribe.js @@ -1,8 +1,7 @@ -import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' -import { toString as uint8ArrayToString } from 'uint8arrays/to-string' import debug from 'debug' import { configure } from '../lib/configure.js' import { toUrlSearchParams } from '../lib/to-url-search-params.js' +import { textToUrlSafeRpc, rpcArrayToTextArray, rpcToBytes } from '../lib/http-rpc-wire-format.js' const log = debug('ipfs-http-client:pubsub:subscribe') /** @@ -43,7 +42,7 @@ export const createSubscribe = (options, subsTracker) => { api.post('pubsub/sub', { signal: options.signal, searchParams: toUrlSearchParams({ - arg: topic, + arg: textToUrlSafeRpc(topic), ...options }), headers: options.headers @@ -95,10 +94,10 @@ async function readMessages (response, { onMessage, onEnd, onError }) { } onMessage({ - from: uint8ArrayToString(uint8ArrayFromString(msg.from, 'base64pad'), 'base58btc'), - data: uint8ArrayFromString(msg.data, 'base64pad'), - seqno: uint8ArrayFromString(msg.seqno, 'base64pad'), - topicIDs: msg.topicIDs + from: msg.from, + data: rpcToBytes(msg.data), + seqno: rpcToBytes(msg.seqno), + topicIDs: rpcArrayToTextArray(msg.topicIDs) }) } catch (/** @type {any} */ err) { err.message = `Failed to parse pubsub message: ${err.message}` diff --git a/test/utils/factory.js b/test/utils/factory.js index 012137867..401b791e1 100644 --- a/test/utils/factory.js +++ b/test/utils/factory.js @@ -16,7 +16,7 @@ const commonOptions = { const commonOverrides = { go: { - ipfsBin: isNode ? path() : undefined + ipfsBin: isNode ? (process.env.IPFS_GO_EXEC || path()) : undefined } }