Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

Commit

Permalink
fix(pubsub): multibase in pubsub http rpc (#3922)
Browse files Browse the repository at this point in the history
This PR  aims to restore interop with go-ipfs by applying the same changes as in ipfs/kubo#8183 

TLDR is that we clean up and unify the API. 

BREAKING CHANGE: We had to make breaking changes to `pubsub` commands sent over HTTP RPC  to fix data corruption caused by topic names and payload bytes that included `\n`. More details in ipfs/kubo#7939 and ipfs/kubo#8183
  • Loading branch information
lidel authored Dec 15, 2021
1 parent 33f1034 commit 6eeaca4
Show file tree
Hide file tree
Showing 23 changed files with 159 additions and 82 deletions.
12 changes: 6 additions & 6 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -331,9 +331,9 @@ jobs:
- name: ipfs browser exchange files
repo: https://github.com/ipfs-examples/js-ipfs-browser-exchange-files.git
deps: ipfs-core@$PWD/packages/ipfs-core/dist,ipfs@$PWD/packages/ipfs/dist,ipfs-core-types@$PWD/packages/ipfs-core-types/dist,ipfs-http-client@$PWD/packages/ipfs-http-client/dist
- name: ipfs browser ipns publish
repo: https://github.com/ipfs-examples/js-ipfs-browser-ipns-publish.git
deps: ipfs-core@$PWD/packages/ipfs-core/dist,ipfs-http-client@$PWD/packages/ipfs-http-client/dist
#- name: ipfs browser ipns publish TODO: re-enable after example bumped to go-ipfs 0.11 and ipfs-http-client from https://github.com/ipfs/js-ipfs/pull/3922
# repo: https://github.com/ipfs-examples/js-ipfs-browser-ipns-publish.git
# deps: ipfs-core@$PWD/packages/ipfs-core/dist,ipfs-http-client@$PWD/packages/ipfs-http-client/dist
- name: ipfs browser mfs
repo: https://github.com/ipfs-examples/js-ipfs-browser-mfs.git
deps: ipfs-core@$PWD/packages/ipfs-core/dist
Expand Down Expand Up @@ -373,9 +373,9 @@ jobs:
- name: ipfs custom libp2p
repo: https://github.com/ipfs-examples/js-ipfs-custom-libp2p.git
deps: ipfs-core@$PWD/packages/ipfs-core/dist
- name: ipfs-http-client browser pubsub
repo: https://github.com/ipfs-examples/js-ipfs-http-client-browser-pubsub.git
deps: ipfs-http-client@$PWD/packages/ipfs-http-client/dist,ipfs@$PWD/packages/ipfs/dist
#- name: ipfs-http-client browser pubsub TODO: re-enable after example bumped to go-ipfs 0.11 and ipfs-http-client from https://github.com/ipfs/js-ipfs/pull/3922
# repo: https://github.com/ipfs-examples/js-ipfs-http-client-browser-pubsub.git
# deps: ipfs-http-client@$PWD/packages/ipfs-http-client/dist,ipfs@$PWD/packages/ipfs/dist
- name: ipfs-http-client bundle webpack
repo: https://github.com/ipfs-examples/js-ipfs-http-client-bundle-webpack.git
deps: ipfs-http-client@$PWD/packages/ipfs-http-client/dist,ipfs@$PWD/packages/ipfs/dist
Expand Down
6 changes: 3 additions & 3 deletions packages/interface-ipfs-core/src/add.js
Original file line number Diff line number Diff line change
Expand Up @@ -463,9 +463,9 @@ export function testAdd (factory, options) {
sharding: true
},
config: {
// enable sharding for go
Experimental: {
ShardingEnabled: true
// enable sharding for go with automatic threshold dropped to the minimum so it shards everything
Internal: {
UnixFSShardingSizeThreshold: '1B'
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions packages/interface-ipfs-core/src/files/cp.js
Original file line number Diff line number Diff line change
Expand Up @@ -358,9 +358,9 @@ export function testCp (factory, options) {
sharding: true
},
config: {
// enable sharding for go
Experimental: {
ShardingEnabled: true
// enable sharding for go with automatic threshold dropped to the minimum so it shards everything
Internal: {
UnixFSShardingSizeThreshold: '1B'
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions packages/interface-ipfs-core/src/files/ls.js
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,9 @@ export function testLs (factory, options) {
sharding: true
},
config: {
// enable sharding for go
Experimental: {
ShardingEnabled: true
// enable sharding for go with automatic threshold dropped to the minimum so it shards everything
Internal: {
UnixFSShardingSizeThreshold: '1B'
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions packages/interface-ipfs-core/src/files/mkdir.js
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,9 @@ export function testMkdir (factory, options) {
sharding: true
},
config: {
// enable sharding for go
Experimental: {
ShardingEnabled: true
// enable sharding for go with automatic threshold dropped to the minimum so it shards everything
Internal: {
UnixFSShardingSizeThreshold: '1B'
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions packages/interface-ipfs-core/src/files/mv.js
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,9 @@ export function testMv (factory, options) {
sharding: true
},
config: {
// enable sharding for go
Experimental: {
ShardingEnabled: true
// enable sharding for go with automatic threshold dropped to the minimum so it shards everything
Internal: {
UnixFSShardingSizeThreshold: '1B'
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions packages/interface-ipfs-core/src/files/read.js
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,9 @@ export function testRead (factory, options) {
sharding: true
},
config: {
// enable sharding for go
Experimental: {
ShardingEnabled: true
// enable sharding for go with automatic threshold dropped to the minimum so it shards everything
Internal: {
UnixFSShardingSizeThreshold: '1B'
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions packages/interface-ipfs-core/src/files/rm.js
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,9 @@ export function testRm (factory, options) {
sharding: true
},
config: {
// enable sharding for go
Experimental: {
ShardingEnabled: true
// enable sharding for go with automatic threshold dropped to the minimum so it shards everything
Internal: {
UnixFSShardingSizeThreshold: '1B'
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions packages/interface-ipfs-core/src/files/stat.js
Original file line number Diff line number Diff line change
Expand Up @@ -382,9 +382,9 @@ export function testStat (factory, options) {
sharding: true
},
config: {
// enable sharding for go
Experimental: {
ShardingEnabled: true
// enable sharding for go with automatic threshold dropped to the minimum so it shards everything
Internal: {
UnixFSShardingSizeThreshold: '1B'
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions packages/interface-ipfs-core/src/files/write.js
Original file line number Diff line number Diff line change
Expand Up @@ -661,9 +661,9 @@ export function testWrite (factory, options) {
sharding: true
},
config: {
// enable sharding for go
Experimental: {
ShardingEnabled: true
// enable sharding for go with automatic threshold dropped to the minimum so it shards everything
Internal: {
UnixFSShardingSizeThreshold: '1B'
}
}
}
Expand Down
22 changes: 11 additions & 11 deletions packages/ipfs-cli/test/pubsub.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ describe('pubsub', () => {
timeout: undefined
}

it('should list toic peers', async () => {
it('should list topic peers', async () => {
const subName = 'sub-name'
const peer = 'peer-id'

Expand All @@ -79,7 +79,7 @@ describe('pubsub', () => {
expect(out).to.equal(`${peer}\n`)
})

it('should list toic peers with a timeout', async () => {
it('should list topic peers with a timeout', async () => {
const subName = 'sub-name'
const peer = 'peer-id'

Expand All @@ -101,19 +101,19 @@ describe('pubsub', () => {
}

it('should publish message', async () => {
const subName = 'sub-name'
const data = 'data'
const subName = 'sub-name-1'
const data = 'data\r\nfirst\nZażółć gęślą jaźń😇'

await cli(`pubsub pub ${subName} ${data}`, { ipfs })
await cli(`pubsub pub ${subName} "${data}"`, { ipfs })

expect(ipfs.pubsub.publish.calledWith(subName, uint8ArrayFromString(data), defaultOptions)).to.be.true()
})

it('should publish message with timeout', async () => {
const subName = 'sub-name'
const data = 'data'
const subName = 'sub-name-2'
const data = 'data\r\nsecond\nZażółć gęślą jaźń😇'

await cli(`pubsub pub ${subName} ${data} --timeout=1s`, { ipfs })
await cli(`pubsub pub ${subName} "${data}" --timeout=1s`, { ipfs })

expect(ipfs.pubsub.publish.calledWith(subName, uint8ArrayFromString(data), {
...defaultOptions,
Expand All @@ -128,17 +128,17 @@ describe('pubsub', () => {
}

it('should subscribe', async () => {
const subName = 'sub-name'
const subName = 'sub\nname'

await cli(`pubsub sub ${subName}`, { ipfs })
await cli(`pubsub sub "${subName}"`, { ipfs })

expect(ipfs.pubsub.subscribe.calledWith(subName, sinon.match.func, defaultOptions)).to.be.true()
})

it('should subscribe with a timeout', async () => {
const subName = 'sub-name'

await cli(`pubsub sub ${subName} --timeout=1s`, { ipfs })
await cli(`pubsub sub "${subName}" --timeout=1s`, { ipfs })

expect(ipfs.pubsub.subscribe.calledWith(subName, sinon.match.func, {
...defaultOptions,
Expand Down
2 changes: 1 addition & 1 deletion packages/ipfs-core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@
"@types/rimraf": "^3.0.1",
"aegir": "^36.0.1",
"delay": "^5.0.0",
"go-ipfs": "0.10.0",
"go-ipfs": "0.11.0",
"interface-blockstore-tests": "^2.0.1",
"interface-ipfs-core": "^0.152.2",
"ipfsd-ctl": "^10.0.4",
Expand Down
2 changes: 1 addition & 1 deletion packages/ipfs-http-client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@
"devDependencies": {
"aegir": "^36.0.1",
"delay": "^5.0.0",
"go-ipfs": "0.10.0",
"go-ipfs": "0.11.0",
"ipfsd-ctl": "^10.0.4",
"it-all": "^1.0.4",
"it-first": "^1.0.4",
Expand Down
11 changes: 10 additions & 1 deletion packages/ipfs-http-client/src/files/rm.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { configure } from '../lib/configure.js'
import { toUrlSearchParams } from '../lib/to-url-search-params.js'
import HTTP from 'ipfs-utils/src/http.js'

/**
* @typedef {import('../types').HTTPClientExtraOptions} HTTPClientExtraOptions
Expand All @@ -20,7 +21,15 @@ export const createRm = configure(api => {
headers: options.headers
})

await res.text()
const body = await res.text()
// we don't expect text body to be ever present
// (if so, it means an error such as https://github.com/ipfs/go-ipfs/issues/8606)
if (body !== '') {
/** @type {Error} */
const error = new HTTP.HTTPError(res)
error.message = body
throw error
}
}
return rm
})
41 changes: 41 additions & 0 deletions packages/ipfs-http-client/src/lib/http-rpc-wire-format.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import { base64url } from 'multiformats/bases/base64'

/* HTTP RPC:
* - wraps binary data in multibase. base64url is used to avoid issues
* when a binary data is passed as search param in URL.
* Historical context: https://github.com/ipfs/go-ipfs/issues/7939
* Multibase wrapping introduced in: https://github.com/ipfs/go-ipfs/pull/8183
*/

/**
* @param {Array<string>} strings
* @returns {Array<string>} strings
*/
const rpcArrayToTextArray = strings => {
if (Array.isArray(strings)) {
return strings.map(rpcToText)
}
return strings
}

/**
* @param {string} mb
* @returns {string}
*/
const rpcToText = mb => uint8ArrayToString(rpcToBytes(mb))

/**
* @param {string} mb
* @returns {Uint8Array}
*/
const rpcToBytes = mb => base64url.decode(mb)

/**
* @param {string} text
* @returns {string}
*/
const textToUrlSafeRpc = text => base64url.encode(uint8ArrayFromString(text))

export { rpcArrayToTextArray, rpcToText, rpcToBytes, textToUrlSafeRpc }
3 changes: 2 additions & 1 deletion packages/ipfs-http-client/src/pubsub/ls.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { configure } from '../lib/configure.js'
import { toUrlSearchParams } from '../lib/to-url-search-params.js'
import { rpcArrayToTextArray } from '../lib/http-rpc-wire-format.js'

/**
* @typedef {import('../types').HTTPClientExtraOptions} HTTPClientExtraOptions
Expand All @@ -17,7 +18,7 @@ export const createLs = configure(api => {
headers: options.headers
})).json()

return Strings || []
return rpcArrayToTextArray(Strings) || []
}
return ls
})
3 changes: 2 additions & 1 deletion packages/ipfs-http-client/src/pubsub/peers.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { configure } from '../lib/configure.js'
import { toUrlSearchParams } from '../lib/to-url-search-params.js'
import { textToUrlSafeRpc } from '../lib/http-rpc-wire-format.js'

/**
* @typedef {import('../types').HTTPClientExtraOptions} HTTPClientExtraOptions
Expand All @@ -14,7 +15,7 @@ export const createPeers = configure(api => {
const res = await api.post('pubsub/peers', {
signal: options.signal,
searchParams: toUrlSearchParams({
arg: topic,
arg: textToUrlSafeRpc(topic),
...options
}),
headers: options.headers
Expand Down
3 changes: 2 additions & 1 deletion packages/ipfs-http-client/src/pubsub/publish.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { configure } from '../lib/configure.js'
import { toUrlSearchParams } from '../lib/to-url-search-params.js'
import { multipartRequest } from 'ipfs-core-utils/multipart-request'
import { abortSignal } from '../lib/abort-signal.js'
import { textToUrlSafeRpc } from '../lib/http-rpc-wire-format.js'
import { AbortController } from 'native-abort-controller'

/**
Expand All @@ -15,7 +16,7 @@ export const createPublish = configure(api => {
*/
async function publish (topic, data, options = {}) {
const searchParams = toUrlSearchParams({
arg: topic,
arg: textToUrlSafeRpc(topic),
...options
})

Expand Down
13 changes: 6 additions & 7 deletions packages/ipfs-http-client/src/pubsub/subscribe.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import debug from 'debug'
import { configure } from '../lib/configure.js'
import { toUrlSearchParams } from '../lib/to-url-search-params.js'
import { textToUrlSafeRpc, rpcArrayToTextArray, rpcToBytes } from '../lib/http-rpc-wire-format.js'
const log = debug('ipfs-http-client:pubsub:subscribe')

/**
Expand Down Expand Up @@ -43,7 +42,7 @@ export const createSubscribe = (options, subsTracker) => {
api.post('pubsub/sub', {
signal: options.signal,
searchParams: toUrlSearchParams({
arg: topic,
arg: textToUrlSafeRpc(topic),
...options
}),
headers: options.headers
Expand Down Expand Up @@ -95,10 +94,10 @@ async function readMessages (response, { onMessage, onEnd, onError }) {
}

onMessage({
from: uint8ArrayToString(uint8ArrayFromString(msg.from, 'base64pad'), 'base58btc'),
data: uint8ArrayFromString(msg.data, 'base64pad'),
seqno: uint8ArrayFromString(msg.seqno, 'base64pad'),
topicIDs: msg.topicIDs
from: msg.from,
data: rpcToBytes(msg.data),
seqno: rpcToBytes(msg.seqno),
topicIDs: rpcArrayToTextArray(msg.topicIDs)
})
} catch (/** @type {any} */ err) {
err.message = `Failed to parse pubsub message: ${err.message}`
Expand Down
2 changes: 1 addition & 1 deletion packages/ipfs-http-client/test/utils/factory.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ const commonOptions = {

const commonOverrides = {
go: {
ipfsBin: isNode ? path() : undefined
ipfsBin: isNode ? (process.env.IPFS_GO_EXEC || path()) : undefined
}
}

Expand Down
Loading

0 comments on commit 6eeaca4

Please sign in to comment.