diff --git a/package.json b/package.json index b47fa2e68..b9e66ee8b 100644 --- a/package.json +++ b/package.json @@ -72,16 +72,16 @@ }, "devDependencies": { "aegir": "^13.1.0", - "browser-process-platform": "^0.1.1", + "browser-process-platform": "~0.1.1", "chai": "^4.1.2", "cross-env": "^5.1.5", "dirty-chai": "^2.0.1", "eslint-plugin-react": "^7.8.1", - "go-ipfs-dep": "^0.4.14", + "go-ipfs-dep": "~0.4.15", "gulp": "^3.9.1", - "interface-ipfs-core": "~0.65.5", + "interface-ipfs-core": "~0.66.2", "ipfs": "~0.28.2", - "ipfsd-ctl": "~0.33.2", + "ipfsd-ctl": "~0.36.0", "pull-stream": "^3.6.8", "socket.io": "^2.1.0", "socket.io-client": "^2.1.0", diff --git a/src/ping-pull-stream.js b/src/ping-pull-stream.js index 9c18140e2..a7871faa3 100644 --- a/src/ping-pull-stream.js +++ b/src/ping-pull-stream.js @@ -2,7 +2,9 @@ const toPull = require('stream-to-pull-stream') const deferred = require('pull-defer') +const pump = require('pump') const moduleConfig = require('./utils/module-config') +const PingMessageStream = require('./utils/ping-message-stream') module.exports = (arg) => { const send = moduleConfig(arg) @@ -18,10 +20,13 @@ module.exports = (arg) => { qs: opts } const p = deferred.source() + const response = new PingMessageStream() send(request, (err, stream) => { if (err) { return p.abort(err) } - p.resolve(toPull.source(stream)) + + pump(stream, response) + p.resolve(toPull.source(response)) }) return p diff --git a/src/ping-readable-stream.js b/src/ping-readable-stream.js index 6281a44de..df4f70401 100644 --- a/src/ping-readable-stream.js +++ b/src/ping-readable-stream.js @@ -1,8 +1,8 @@ 'use strict' -const Stream = require('readable-stream') const pump = require('pump') const moduleConfig = require('./utils/module-config') +const PingMessageStream = require('./utils/ping-message-stream') module.exports = (arg) => { const send = moduleConfig(arg) @@ -17,17 +17,14 @@ module.exports = (arg) => { args: id, qs: opts } - // ndjson streams objects - const pt = new Stream.PassThrough({ - objectMode: true - }) - send(request, (err, stream) => { - if (err) { return pt.destroy(err) } + const response = new PingMessageStream() - pump(stream, pt) + send(request, (err, stream) => { + if (err) { return response.emit('error', err) } + pump(stream, response) }) - return pt + return response } } diff --git a/src/ping.js b/src/ping.js index 2682e9752..8b9081967 100644 --- a/src/ping.js +++ b/src/ping.js @@ -1,8 +1,10 @@ 'use strict' const promisify = require('promisify-es6') +const pump = require('pump') +const Writable = require('readable-stream').Writable const moduleConfig = require('./utils/module-config') -const streamToValue = require('./utils/stream-to-value') +const PingMessageStream = require('./utils/ping-message-stream') module.exports = (arg) => { const send = moduleConfig(arg) @@ -29,15 +31,23 @@ module.exports = (arg) => { } // Transform the response stream to a value: - // [{ Success: , Time: , Text: }] - const transform = (res, callback) => { - streamToValue(res, (err, res) => { - if (err) { - return callback(err) - } - - callback(null, res) - }) + // [{ success: , time: , text: }] + const transform = (stream, callback) => { + const messageConverter = new PingMessageStream() + const responses = [] + + pump( + stream, + messageConverter, + new Writable({ + objectMode: true, + write (chunk, enc, cb) { + responses.push(chunk) + cb() + } + }), + (err) => callback(err, responses) + ) } send.andTransform(request, transform, callback) diff --git a/src/utils/ping-message-converter.js b/src/utils/ping-message-converter.js new file mode 100644 index 000000000..79d9fc3be --- /dev/null +++ b/src/utils/ping-message-converter.js @@ -0,0 +1,23 @@ +'use strict' + +// Converts IPFS API ping messages to lowercase +// +// { +// Success: true, +// Text: 'foobar', +// Time: 0 +// } +// + +module.exports = function pingMessageConverter (obj) { + if (!isPingMessage(obj)) throw new Error('Invalid ping message received') + return { + success: obj.Success, + time: obj.Time, + text: obj.Text + } +} + +function isPingMessage (obj) { + return obj && typeof obj.Success === 'boolean' +} diff --git a/src/utils/ping-message-stream.js b/src/utils/ping-message-stream.js new file mode 100644 index 000000000..944e2d9cf --- /dev/null +++ b/src/utils/ping-message-stream.js @@ -0,0 +1,27 @@ +'use strict' + +const TransformStream = require('readable-stream').Transform +const pingMessageConverter = require('./ping-message-converter') + +class PingMessageStream extends TransformStream { + constructor (options) { + const opts = Object.assign(options || {}, { objectMode: true }) + super(opts) + } + + _transform (obj, enc, callback) { + try { + const msg = pingMessageConverter(obj) + this.push(msg) + + if (!msg.success) { + throw new Error(msg.text) + } + } catch (err) { + return callback(err) + } + callback() + } +} + +module.exports = PingMessageStream diff --git a/test/get.spec.js b/test/get.spec.js index a8a2adbf5..91d16f3fe 100644 --- a/test/get.spec.js +++ b/test/get.spec.js @@ -72,7 +72,7 @@ describe('.get (specific go-ipfs features)', function () { 'compression-level': 10 }, (err, files) => { expect(err).to.exist() - expect(err.toString()).to.equal('Error: Compression level must be between 1 and 9') + expect(err.toString()).to.equal('Error: compression level must be between 1 and 9') done() }) }) diff --git a/test/interface/ping.spec.js b/test/interface/ping.spec.js new file mode 100644 index 000000000..910bf3820 --- /dev/null +++ b/test/interface/ping.spec.js @@ -0,0 +1,32 @@ +/* eslint-env mocha */ +/* eslint max-nested-callbacks: ["error", 8] */ +'use strict' + +const test = require('interface-ipfs-core') +const parallel = require('async/parallel') + +const IPFSApi = require('../../src') +const f = require('../utils/factory') + +const nodes = [] +const common = { + setup: function (callback) { + callback(null, { + spawnNode: (cb) => { + f.spawn({ initOptions: { bits: 1024 } }, (err, _ipfsd) => { + if (err) { + return cb(err) + } + + nodes.push(_ipfsd) + cb(null, IPFSApi(_ipfsd.apiAddr)) + }) + } + }) + }, + teardown: function (callback) { + parallel(nodes.map((node) => (cb) => node.stop(cb)), callback) + } +} + +test.ping(common) diff --git a/test/ping.spec.js b/test/ping.spec.js index 49e96a8b0..a80f3a9d4 100644 --- a/test/ping.spec.js +++ b/test/ping.spec.js @@ -12,8 +12,14 @@ const parallel = require('async/parallel') const series = require('async/series') const IPFSApi = require('../src') +const PingMessageStream = require('../src/utils/ping-message-stream') const f = require('./utils/factory') +// Determine if a ping response object is a pong, or something else, like a status message +function isPong (pingResponse) { + return Boolean(pingResponse && pingResponse.time) +} + describe('.ping', function () { let ipfs let ipfsd @@ -75,12 +81,12 @@ describe('.ping', function () { ipfs.ping(otherId, (err, res) => { expect(err).to.not.exist() expect(res).to.be.an('array') - expect(res).to.have.lengthOf(3) + expect(res.filter(isPong)).to.have.lengthOf(1) res.forEach(packet => { - expect(packet).to.have.keys('Success', 'Time', 'Text') - expect(packet.Time).to.be.a('number') + expect(packet).to.have.keys('success', 'time', 'text') + expect(packet.time).to.be.a('number') }) - const resultMsg = res.find(packet => packet.Text.includes('Average latency')) + const resultMsg = res.find(packet => packet.text.includes('Average latency')) expect(resultMsg).to.exist() done() }) @@ -90,12 +96,12 @@ describe('.ping', function () { ipfs.ping(otherId, { count: 2 }, (err, res) => { expect(err).to.not.exist() expect(res).to.be.an('array') - expect(res).to.have.lengthOf(4) + expect(res.filter(isPong)).to.have.lengthOf(2) res.forEach(packet => { - expect(packet).to.have.keys('Success', 'Time', 'Text') - expect(packet.Time).to.be.a('number') + expect(packet).to.have.keys('success', 'time', 'text') + expect(packet.time).to.be.a('number') }) - const resultMsg = res.find(packet => packet.Text.includes('Average latency')) + const resultMsg = res.find(packet => packet.text.includes('Average latency')) expect(resultMsg).to.exist() done() }) @@ -105,12 +111,12 @@ describe('.ping', function () { ipfs.ping(otherId, { n: 2 }, (err, res) => { expect(err).to.not.exist() expect(res).to.be.an('array') - expect(res).to.have.lengthOf(4) + expect(res.filter(isPong)).to.have.lengthOf(2) res.forEach(packet => { - expect(packet).to.have.keys('Success', 'Time', 'Text') - expect(packet.Time).to.be.a('number') + expect(packet).to.have.keys('success', 'time', 'text') + expect(packet.time).to.be.a('number') }) - const resultMsg = res.find(packet => packet.Text.includes('Average latency')) + const resultMsg = res.find(packet => packet.text.includes('Average latency')) expect(resultMsg).to.exist() done() }) @@ -129,12 +135,12 @@ describe('.ping', function () { return ipfs.ping(otherId) .then((res) => { expect(res).to.be.an('array') - expect(res).to.have.lengthOf(3) + expect(res.filter(isPong)).to.have.lengthOf(1) res.forEach(packet => { - expect(packet).to.have.keys('Success', 'Time', 'Text') - expect(packet.Time).to.be.a('number') + expect(packet).to.have.keys('success', 'time', 'text') + expect(packet.time).to.be.a('number') }) - const resultMsg = res.find(packet => packet.Text.includes('Average latency')) + const resultMsg = res.find(packet => packet.text.includes('Average latency')) expect(resultMsg).to.exist() }) }) @@ -145,12 +151,12 @@ describe('.ping', function () { collect((err, data) => { expect(err).to.not.exist() expect(data).to.be.an('array') - expect(data).to.have.lengthOf(3) + expect(data.filter(isPong)).to.have.lengthOf(1) data.forEach(packet => { - expect(packet).to.have.keys('Success', 'Time', 'Text') - expect(packet.Time).to.be.a('number') + expect(packet).to.have.keys('success', 'time', 'text') + expect(packet.time).to.be.a('number') }) - const resultMsg = data.find(packet => packet.Text.includes('Average latency')) + const resultMsg = data.find(packet => packet.text.includes('Average latency')) expect(resultMsg).to.exist() done() }) @@ -162,15 +168,22 @@ describe('.ping', function () { ipfs.pingReadableStream(otherId) .on('data', data => { expect(data).to.be.an('object') - expect(data).to.have.keys('Success', 'Time', 'Text') - packetNum++ + expect(data).to.have.keys('success', 'time', 'text') + if (isPong(data)) packetNum++ }) .on('error', err => { expect(err).not.to.exist() }) .on('end', () => { - expect(packetNum).to.be.above(2) + expect(packetNum).to.equal(1) done() }) }) + + it('message conversion fails if invalid message is received', () => { + const messageConverter = new PingMessageStream() + expect(() => { + messageConverter.write({some: 'InvalidMessage'}) + }).to.throw('Invalid ping message received') + }) })