Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

Commit

Permalink
feat: complete PubSub implementation
Browse files Browse the repository at this point in the history
* feat: PubSub Interop Tests and CLI+HTTP-API Implementation (#1081)

* test: enable pubsub tests

* fix: generate meaniful error when pubsub is called and not enabled

* test: enable pubsub for factory daemon

* test: enable pubsub tests

* fix: generate meaniful error when pubsub is called and not enabled

* test: enable pubsub for factory daemon

* fiix(pubsub-subscribe): stop HAPI gzip from buffering our streamed response

* test: fix spec/pubsub

* fix: lint errors

* test: tests js/go pubsub interop

* test: pubsub interop tests

* test: enable pubsub tests

* fix: generate meaniful error when pubsub is called and not enabled

* test: enable pubsub for factory daemon

* fiix(pubsub-subscribe): stop HAPI gzip from buffering our streamed response

* test: fix spec/pubsub

* fix: lint errors

* test: tests js/go pubsub interop

* test: pubsub interop tests

* test: more tests with different data types

Note that binary data from JS to GO fails

* HTTP API server: parsing query string as binary in pubsub publish

* HTTP API: pubsub: publish should fail gracefully when no argument is given

* chore: update deps

* chore: update deps

* last pass

* chore: update deps

* test: update interop tests

* trying to fix cli pubsub tests

* HTTP API server: pubsub pub buffer should have content

* making linter happier

* pubsub cli tests: higher timeout

* making the linter even happier

* tests: increasing some of the timeouts

* fix: test was wrong

* moar

* mais um pouco

* fix

* meh

* key size

* moar timeouts

* taaaaaaaaaaake ooooooonnn meeeeeeee

* unomas

* take it all

* fix

* moar

* almost there

* almost there part II

* almost there part III

* take out coverage from travis and pass it to circle

* small adj in the travis config
  • Loading branch information
daviddias authored Nov 23, 2017
1 parent 1c8ad75 commit ac95601
Show file tree
Hide file tree
Showing 43 changed files with 712 additions and 377 deletions.
6 changes: 0 additions & 6 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,16 @@ matrix:
env: CXX=g++-4.8
- node_js: 8
env: CXX=g++-4.8
# - node_js: stable
# env: CXX=g++-4.8

script:
- npm run lint
- npm run test
- npm run coverage
- make test

before_script:
- export DISPLAY=:99.0
- sh -e /etc/init.d/xvfb start

after_success:
- npm run coverage-publish

addons:
firefox: 'latest'
apt:
Expand Down
4 changes: 4 additions & 0 deletions circle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ machine:
node:
version: stable

test:
post:
- npm run coverage -- --upload

dependencies:
pre:
- google-chrome --version
Expand Down
15 changes: 8 additions & 7 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
"test:unit:node:gateway": "aegir test -t node -f test/gateway/index.js",
"test:unit:node:cli": "aegir test -t node -f test/cli/index.js",
"test:unit:browser": "aegir test -t browser --no-cors",
"test:interop": "IPFS_TEST=interop aegir test -t node -t browser -f test/interop",
"test:interop": "IPFS_TEST=interop aegir test -t node -f test/interop",
"test:interop:node": "IPFS_TEST=interop aegir test -t node -f test/interop/node.js",
"test:interop:browser": "IPFS_TEST=interop aegir test -t browser -f test/interop/browser.js",
"test:bootstrapers": "IPFS_TEST=bootstrapers aegir test -t browser -f test/bootstrapers.js",
Expand Down Expand Up @@ -63,7 +63,7 @@
},
"homepage": "https://github.com/ipfs/js-ipfs#readme",
"devDependencies": {
"aegir": "^12.1.3",
"aegir": "^12.2.0",
"buffer-loader": "0.0.1",
"chai": "^4.1.2",
"delay": "^2.0.0",
Expand All @@ -75,8 +75,8 @@
"expose-loader": "^0.7.4",
"form-data": "^2.3.1",
"hat": "0.0.3",
"interface-ipfs-core": "~0.36.7",
"ipfsd-ctl": "~0.24.1",
"interface-ipfs-core": "~0.36.8",
"ipfsd-ctl": "~0.25.1",
"left-pad": "^1.2.0",
"lodash": "^4.17.4",
"mocha": "^4.0.1",
Expand All @@ -92,21 +92,22 @@
},
"dependencies": {
"async": "^2.6.0",
"binary-querystring": "~0.1.2",
"bl": "^1.2.1",
"boom": "^7.1.1",
"bs58": "^4.0.1",
"byteman": "^1.3.5",
"cids": "^0.5.2",
"debug": "^3.1.0",
"file-type": "^7.2.0",
"file-type": "^7.3.0",
"filesize": "^3.5.11",
"fsm-event": "^2.1.0",
"get-folder-size": "^1.0.0",
"glob": "^7.1.2",
"hapi": "^16.6.2",
"hapi-set-header": "^1.0.2",
"hoek": "^5.0.2",
"ipfs-api": "^17.1.0",
"ipfs-api": "^17.1.2",
"ipfs-bitswap": "~0.17.4",
"ipfs-block": "~0.6.1",
"ipfs-block-service": "~0.13.0",
Expand All @@ -120,7 +121,7 @@
"joi": "^13.0.2",
"libp2p": "~0.13.1",
"libp2p-circuit": "~0.1.4",
"libp2p-floodsub": "~0.12.1",
"libp2p-floodsub": "~0.13.0",
"libp2p-kad-dht": "~0.6.0",
"libp2p-mdns": "~0.9.1",
"libp2p-multiplex": "~0.5.0",
Expand Down
24 changes: 24 additions & 0 deletions src/core/components/no-floodsub.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
'use strict'

const EventEmitter = require('events')

function fail () {
throw new Error('The daemon must be run with \'--enable-pubsub-experiment\'')
}

class NoFloodSub extends EventEmitter {
constructor () {
super()

this.peers = new Map()
this.subscriptions = new Set()
}

start (callback) { callback() }
stop (callback) { callback() }
publish () { fail() }
subscribe () { fail() }
unsubscribe () { fail() }
}

module.exports = NoFloodSub
11 changes: 5 additions & 6 deletions src/core/components/start.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
const series = require('async/series')
const Bitswap = require('ipfs-bitswap')
const FloodSub = require('libp2p-floodsub')
const NoFloodSub = require('./no-floodsub')
const setImmediate = require('async/setImmediate')
const promisify = require('promisify-es6')

Expand Down Expand Up @@ -50,12 +51,10 @@ module.exports = (self) => {
self._bitswap.start()
self._blockService.setExchange(self._bitswap)

if (self._options.EXPERIMENTAL.pubsub) {
self._pubsub = new FloodSub(self._libp2pNode)
self._pubsub.start(done)
} else {
done()
}
self._pubsub = self._options.EXPERIMENTAL.pubsub
? new FloodSub(self._libp2pNode)
: new NoFloodSub()
self._pubsub.start(done)
})
})
}
8 changes: 1 addition & 7 deletions src/core/components/stop.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,7 @@ module.exports = (self) => {
self._bitswap.stop()

series([
(cb) => {
if (self._options.EXPERIMENTAL.pubsub) {
self._pubsub.stop(cb)
} else {
cb()
}
},
(cb) => self._pubsub.stop(cb),
(cb) => self.libp2p.stop(cb),
(cb) => self._repo.close(cb)
], done)
Expand Down
10 changes: 7 additions & 3 deletions src/http/api/resources/pubsub.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

const PassThrough = require('stream').PassThrough
const bs58 = require('bs58')
const binaryQueryString = require('binary-querystring')

exports = module.exports

Expand Down Expand Up @@ -48,6 +49,7 @@ exports.subscribe = {

reply(res)
.header('X-Chunked-Output', '1')
.header('content-encoding', 'identity') // stop gzip from buffering, see https://github.com/hapijs/hapi/issues/2975
.header('content-type', 'application/json')
})
}
Expand All @@ -57,19 +59,21 @@ exports.publish = {
handler: (request, reply) => {
const arg = request.query.arg
const topic = arg[0]
const buf = arg[1]

const rawArgs = binaryQueryString(request.url.search)
const buf = rawArgs.arg && rawArgs.arg[1]

const ipfs = request.server.app.ipfs

if (!topic) {
return reply(new Error('Missing topic'))
}

if (!buf) {
if (!buf || buf.length === 0) {
return reply(new Error('Missing buf'))
}

ipfs.pubsub.publish(topic, Buffer.from(String(buf)), (err) => {
ipfs.pubsub.publish(topic, buf, (err) => {
if (err) {
return reply(new Error(`Failed to publish to topic ${topic}: ${err}`))
}
Expand Down
57 changes: 30 additions & 27 deletions test/cli/bitswap.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,36 +4,39 @@
const expect = require('chai').expect
const runOn = require('../utils/on-and-off').on

describe('bitswap', () => runOn((thing) => {
let ipfs
const key = 'QmUBdnXXPyoDFXj3Hj39dNJ5VkN3QFRskXxcGaYFBB8CNR'
describe('bitswap', function () {
runOn((thing) => {
this.timeout(30000)
let ipfs
const key = 'QmUBdnXXPyoDFXj3Hj39dNJ5VkN3QFRskXxcGaYFBB8CNR'

before((done) => {
ipfs = thing.ipfs
ipfs('block get ' + key)
.then(() => {})
.catch(() => {})
setTimeout(done, 800)
})
before((done) => {
ipfs = thing.ipfs
ipfs('block get ' + key)
.then(() => {})
.catch(() => {})
setTimeout(done, 800)
})

it('wantlist', () => {
return ipfs('bitswap wantlist').then((out) => {
expect(out).to.eql(key + '\n')
it('wantlist', () => {
return ipfs('bitswap wantlist').then((out) => {
expect(out).to.eql(key + '\n')
})
})
})

it('stat', () => {
return ipfs('bitswap stat').then((out) => {
expect(out).to.be.eql([
'bitswap status',
' blocks received: 0',
' dup blocks received: 0',
' dup data received: 0B',
' wantlist [1 keys]',
` ${key}`,
' partners [0]',
' '
].join('\n') + '\n')
it('stat', () => {
return ipfs('bitswap stat').then((out) => {
expect(out).to.be.eql([
'bitswap status',
' blocks received: 0',
' dup blocks received: 0',
' dup data received: 0B',
' wantlist [1 keys]',
` ${key}`,
' partners [0]',
' '
].join('\n') + '\n')
})
})
})
}))
})
23 changes: 17 additions & 6 deletions test/cli/block.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,32 +11,41 @@ describe('block', () => runOnAndOff((thing) => {
ipfs = thing.ipfs
})

it('put', () => {
it('put', function () {
this.timeout(40 * 1000)
return ipfs('block put test/fixtures/test-data/hello').then((out) => {
expect(out).to.eql('QmZjTnYw2TFhn9Nn7tjmPSoTBoY7YRkwPzwSrSbabY24Kp\n')
})
})

it('put with flags, format and mhtype', () => {
it('put with flags, format and mhtype', function () {
this.timeout(40 * 1000)

return ipfs('block put --format eth-block --mhtype keccak-256 test/fixtures/test-data/eth-block')
.then((out) =>
expect(out).to.eql('z43AaGF23fmvRnDP56Ub9WcJCfzSfqtmzNCCvmz5eudT8dtdCDS\n'))
})

it('get', () => {
it('get', function () {
this.timeout(40 * 1000)

return ipfs('block get QmZjTnYw2TFhn9Nn7tjmPSoTBoY7YRkwPzwSrSbabY24Kp')
.then((out) => expect(out).to.eql('hello world\n'))
})

it('get block from file without a final newline', () => {
it('get block from file without a final newline', function () {
this.timeout(40 * 1000)

return ipfs('block put test/fixtures/test-data/no-newline').then((out) => {
expect(out).to.eql('QmTwbQs4sGcCiPxV97SpbHS7QgmVg9SiKxcG1AcF1Ly2SL\n')
return ipfs('block get QmTwbQs4sGcCiPxV97SpbHS7QgmVg9SiKxcG1AcF1Ly2SL')
})
.then((out) => expect(out).to.eql('there is no newline at end of this file'))
})

it('stat', () => {
it('stat', function () {
this.timeout(40 * 1000)

return ipfs('block stat QmZjTnYw2TFhn9Nn7tjmPSoTBoY7YRkwPzwSrSbabY24Kp')
.then((out) => {
expect(out).to.eql([
Expand All @@ -46,7 +55,9 @@ describe('block', () => runOnAndOff((thing) => {
})
})

it.skip('rm', () => {
it.skip('rm', function () {
this.timeout(40 * 1000)

return ipfs('block rm QmZjTnYw2TFhn9Nn7tjmPSoTBoY7YRkwPzwSrSbabY24Kp')
.then((out) => {
expect(out).to.eql(
Expand Down
Loading

0 comments on commit ac95601

Please sign in to comment.