Skip to content

Commit

Permalink
fix: fix double publish bug
Browse files Browse the repository at this point in the history
The tests of this module are currently failing but they don't fail
the build because the error that should fail it is thrown [out of
the flow control](https://github.com/ChainSafe/gossipsub-js/blob/master/test/floodsub.spec.js#L21)
 of the tests.  An [UnhandledPromiseRejectionWarning](https://travis-ci.com/ChainSafe/gossipsub-js/jobs/278427804#L270)
is thrown.

This PR:

1. Passes an `error` object to `done` if the error condition is hit
2. Fixes a bug where a message was being sent back to the sender
    and then emitted by the sender even though `emitSelf` was false
  • Loading branch information
achingbrain committed Jan 24, 2020
1 parent 71cb905 commit 519f868
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 15 deletions.
4 changes: 4 additions & 0 deletions src/pubsub.js
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,10 @@ class BasicPubSub extends Pubsub {
* @param {rpc.RPC.Message} msg
*/
_processRpcMessage (msg) {
if (this.peerInfo.id.toB58String() === msg.from && !this._options.emitSelf) {
return
}

// Emit to self
this._emitMessage(msg.topicIDs, msg)
}
Expand Down
38 changes: 23 additions & 15 deletions test/floodsub.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ const {
first
} = require('./utils')

const shouldNotHappen = () => expect.fail()

describe('gossipsub fallbacks to floodsub', () => {
let registrarRecords = Array.from({ length: 2 })

Expand Down Expand Up @@ -189,18 +187,23 @@ describe('gossipsub fallbacks to floodsub', () => {
])
})

it('Publish to a topic - nodeGs', async () => {
it('Publish to a topic - nodeGs', (done) => {
const shouldNotHappen = () => {
done(new Error('Should not be here'))
}

const promise = new Promise((resolve) => nodeFs.once(topic, resolve))
nodeGs.once(topic, (m) => shouldNotHappen)

nodeGs.publish(topic, Buffer.from('hey'))

const msg = await promise

expect(msg.data.toString()).to.equal('hey')
expect(msg.from).to.be.eql(nodeGs.peerInfo.id.toB58String())
promise.then((msg) => {
expect(msg.data.toString()).to.equal('hey')
expect(msg.from).to.be.eql(nodeGs.peerInfo.id.toB58String())

nodeGs.removeListener(topic, shouldNotHappen)
nodeGs.removeListener(topic, shouldNotHappen)
done()
}, done)
})

it('Publish to a topic - nodeFs', async () => {
Expand All @@ -212,19 +215,20 @@ describe('gossipsub fallbacks to floodsub', () => {

expect(msg.data.toString()).to.equal('banana')
expect(msg.from).to.be.eql(nodeFs.peerInfo.id.toB58String())

nodeFs.removeListener(topic, shouldNotHappen)
})

it('Publish 10 msg to a topic', (done) => {
let counter = 0

nodeGs.once(topic, shouldNotHappen)
const shouldNotHappen = (msg) => {
done(new Error('Should not be here'))
}

nodeGs.once(topic, shouldNotHappen)
nodeFs.on(topic, receivedMsg)

function receivedMsg (msg) {
expect(msg.data.toString()).to.equal('banana')
expect(msg.data.toString()).to.equal('banana ' + counter)
expect(msg.from).to.be.eql(nodeGs.peerInfo.id.toB58String())
expect(Buffer.isBuffer(msg.seqno)).to.be.true()
expect(msg.topicIDs).to.be.eql([topic])
Expand All @@ -236,18 +240,22 @@ describe('gossipsub fallbacks to floodsub', () => {
}
}

times(10, () => nodeGs.publish(topic, Buffer.from('banana')))
times(10, (index) => nodeGs.publish(topic, Buffer.from('banana ' + index)))
})

it('Publish 10 msg to a topic as array', (done) => {
let counter = 0

const shouldNotHappen = () => {
done(new Error('Should not be here'))
}

nodeGs.once(topic, shouldNotHappen)

nodeFs.on(topic, receivedMsg)

function receivedMsg (msg) {
expect(msg.data.toString()).to.equal('banana')
expect(msg.data.toString()).to.equal('banana ' + counter)
expect(msg.from).to.be.eql(nodeGs.peerInfo.id.toB58String())
expect(Buffer.isBuffer(msg.seqno)).to.be.true()
expect(msg.topicIDs).to.be.eql([topic])
Expand All @@ -260,7 +268,7 @@ describe('gossipsub fallbacks to floodsub', () => {
}

const msgs = []
times(10, () => msgs.push(Buffer.from('banana')))
times(10, (index) => msgs.push(Buffer.from('banana ' + index)))
nodeGs.publish(topic, msgs)
})
})
Expand Down

0 comments on commit 519f868

Please sign in to comment.