From 760594e57224e38139a560c37747e52f9dd3e593 Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Thu, 23 Jun 2022 19:29:26 +0100 Subject: [PATCH] fix: do not unsubscribe after publish (#78) Now that we route all messages via a `'message'` event, we can't check the number of listeners for a given topic using the listener count. --- src/index.ts | 11 ++++------- test/pubsub.spec.ts | 25 ++++++++++++++++++++++++- 2 files changed, 28 insertions(+), 8 deletions(-) diff --git a/src/index.ts b/src/index.ts index 5f481d175d..449581d16c 100644 --- a/src/index.ts +++ b/src/index.ts @@ -621,10 +621,6 @@ export abstract class PubSubBaseProtocol extends EventEmi super.dispatchEvent(new CustomEvent('message', { detail: rpcMessage })) - - if (this.listenerCount(topic) === 0) { - this.unsubscribe(topic) - } } } @@ -655,6 +651,8 @@ export abstract class PubSubBaseProtocol extends EventEmi throw new Error('Pubsub has not started') } + log('subscribe to topic: %s', topic) + if (!this.subscriptions.has(topic)) { this.subscriptions.add(topic) @@ -676,11 +674,10 @@ export abstract class PubSubBaseProtocol extends EventEmi super.removeEventListener(topic) const wasSubscribed = this.subscriptions.has(topic) - const listeners = this.listenerCount(topic) - log('unsubscribe from %s - am subscribed %s, listeners %d', topic, wasSubscribed, listeners) + log('unsubscribe from %s - am subscribed %s', topic, wasSubscribed) - if (wasSubscribed && listeners === 0) { + if (wasSubscribed) { this.subscriptions.delete(topic) for (const peerId of this.peers.keys()) { diff --git a/test/pubsub.spec.ts b/test/pubsub.spec.ts index f78ff51884..d5d8d7fe0b 100644 --- a/test/pubsub.spec.ts +++ b/test/pubsub.spec.ts @@ -31,7 +31,8 @@ describe('pubsub base implementation', () => { beforeEach(async () => { const peerId = await createPeerId() pubsub = new PubsubImplementation({ - multicodecs: [protocol] + multicodecs: [protocol], + emitSelf: true }) pubsub.init(new Components({ peerId: peerId, @@ -75,6 +76,28 @@ describe('pubsub base implementation', () => { await expect(pubsub.validate(signedMessage)).to.eventually.be.undefined() }) + + it('calls publishes messages twice', async () => { + let count = 0 + + await pubsub.start() + pubsub.subscribe(topic) + + pubsub.addEventListener('message', evt => { + if (evt.detail.topic === topic) { + count++ + } + }) + await pubsub.publish(topic, message) + await pubsub.publish(topic, message) + + // event dispatch is async + await pWaitFor(() => { + return count === 2 + }) + + expect(count).to.eql(2) + }) }) describe('subscribe', () => {