From f4463df94dbea7e0b24daa934f58e169cb877cce Mon Sep 17 00:00:00 2001 From: harkamal Date: Fri, 30 Sep 2022 01:17:47 +0530 Subject: [PATCH 1/5] Add stream option to limit inbound message size --- src/stream.ts | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/stream.ts b/src/stream.ts index 247f9497..392948f4 100644 --- a/src/stream.ts +++ b/src/stream.ts @@ -10,6 +10,11 @@ type OutboundStreamOpts = { maxBufferSize?: number } +type InboundStreamOpts = { + /** Max size in bytes for reading messages from the stream */ + maxDataLength?: number +} + export class OutboundStream { private readonly pushable: Pushable private readonly closeController: AbortController @@ -54,11 +59,11 @@ export class InboundStream { private readonly rawStream: Stream private readonly closeController: AbortController - constructor(rawStream: Stream) { + constructor(rawStream: Stream, opts: InboundStreamOpts) { this.rawStream = rawStream this.closeController = new AbortController() - this.source = abortableSource(pipe(this.rawStream, decode()), this.closeController.signal, { returnOnAbort: true }) + this.source = abortableSource(pipe(this.rawStream, decode(opts)), this.closeController.signal, { returnOnAbort: true }) } close(): void { From 9ee6d4f1fab8751cb51568fab7266f30726fa312 Mon Sep 17 00:00:00 2001 From: harkamal Date: Tue, 4 Oct 2022 15:55:11 +0530 Subject: [PATCH 2/5] give empty default to opts --- src/stream.ts | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/stream.ts b/src/stream.ts index 392948f4..1b7f71f3 100644 --- a/src/stream.ts +++ b/src/stream.ts @@ -59,11 +59,13 @@ export class InboundStream { private readonly rawStream: Stream private readonly closeController: AbortController - constructor(rawStream: Stream, opts: InboundStreamOpts) { + constructor(rawStream: Stream, opts: InboundStreamOpts = {}) { this.rawStream = rawStream this.closeController = new AbortController() - this.source = abortableSource(pipe(this.rawStream, decode(opts)), this.closeController.signal, { returnOnAbort: true }) + this.source = abortableSource(pipe(this.rawStream, decode(opts)), this.closeController.signal, { + returnOnAbort: true + }) } close(): void { From d37318fa3ba6d8f19c01266836c0ebaf55290397 Mon Sep 17 00:00:00 2001 From: harkamal Date: Tue, 4 Oct 2022 16:47:01 +0530 Subject: [PATCH 3/5] add the option in gossibsub opts to set the inbound max len --- src/index.ts | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/index.ts b/src/index.ts index cbe9f157..af29762a 100644 --- a/src/index.ts +++ b/src/index.ts @@ -160,6 +160,13 @@ export interface GossipsubOpts extends GossipsubOptsSpec, PubSubInit { */ maxOutboundBufferSize?: number + /** + * Specify max size to skip decoding messages whose data + * section exceeds this size. + * + */ + maxInboundDataLength?: number + /** * If provided, only allow topics in this list */ @@ -772,7 +779,7 @@ export class GossipSub extends EventEmitter implements PubSub this.log(err)) From 607b3fb1a4ccd8fe5d89bb3ae162717e72916eaf Mon Sep 17 00:00:00 2001 From: harkamal Date: Wed, 5 Oct 2022 13:27:23 +0530 Subject: [PATCH 4/5] add max inbout size validation spec --- src/index.ts | 12 ++++++++++-- test/gossip.spec.ts | 35 ++++++++++++++++++++++++++++++++++- 2 files changed, 44 insertions(+), 3 deletions(-) diff --git a/src/index.ts b/src/index.ts index af29762a..d009e726 100644 --- a/src/index.ts +++ b/src/index.ts @@ -930,11 +930,19 @@ export class GossipSub extends EventEmitter implements PubSub { init: { scoreParams: { IPColocationFactorThreshold: GossipsubDhi + 3 - } + }, + maxInboundDataLength: 4000000 } }) }) @@ -79,6 +80,38 @@ describe('gossip', () => { nodeASpy.pushGossip.restore() }) + it('should reject incoming messages bigger than maxInboundDataLength limit', async function () { + this.timeout(10e4) + const nodeA = nodes[0] + const nodeB = nodes[1] + + const twoNodes = [nodeA, nodeB] + const topic = 'Z' + // add subscriptions to each node + twoNodes.forEach((n) => n.getPubSub().subscribe(topic)) + + // every node connected to every other + await connectAllPubSubNodes(twoNodes) + + // wait for subscriptions to be transmitted + await Promise.all(twoNodes.map(async (n) => await pEvent(n.getPubSub(), 'subscription-change'))) + + // await mesh rebalancing + await Promise.all(twoNodes.map(async (n) => await pEvent(n.getPubSub(), 'gossipsub:heartbeat'))) + + // set spy. NOTE: Forcing private property to be public + const nodeBSpy = nodeB.getPubSub() as Partial as SinonStubbedInstance<{ + handlePeerReadStreamError: GossipSub['handlePeerReadStreamError'] + }> + sinon.spy(nodeBSpy, 'handlePeerReadStreamError') + + // This should lead to handlePeerReadStreamError at nodeB + await nodeA.getPubSub().publish(topic, new Uint8Array(5000000)) + await pEvent(nodeA.getPubSub(), 'gossipsub:heartbeat') + const expectedError = nodeBSpy.handlePeerReadStreamError.getCalls()[0]?.args[0] + expect(expectedError !== undefined && (expectedError as unknown as { code: string }).code, 'ERR_MSG_DATA_TOO_LONG') + }) + it('should send piggyback control into other sent messages', async function () { this.timeout(10e4) const nodeA = nodes[0] From 5d6bcc857983344a80c9468805d3c1d44f189d27 Mon Sep 17 00:00:00 2001 From: harkamal Date: Wed, 26 Oct 2022 17:11:16 +0530 Subject: [PATCH 5/5] fix test --- test/gossip.spec.ts | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/test/gossip.spec.ts b/test/gossip.spec.ts index 299c9696..47aee6a3 100644 --- a/test/gossip.spec.ts +++ b/test/gossip.spec.ts @@ -88,28 +88,31 @@ describe('gossip', () => { const twoNodes = [nodeA, nodeB] const topic = 'Z' // add subscriptions to each node - twoNodes.forEach((n) => n.getPubSub().subscribe(topic)) + twoNodes.forEach((n) => n.pubsub.subscribe(topic)) // every node connected to every other await connectAllPubSubNodes(twoNodes) // wait for subscriptions to be transmitted - await Promise.all(twoNodes.map(async (n) => await pEvent(n.getPubSub(), 'subscription-change'))) + await Promise.all(twoNodes.map(async (n) => await pEvent(n.pubsub, 'subscription-change'))) // await mesh rebalancing - await Promise.all(twoNodes.map(async (n) => await pEvent(n.getPubSub(), 'gossipsub:heartbeat'))) + await Promise.all(twoNodes.map(async (n) => await pEvent(n.pubsub, 'gossipsub:heartbeat'))) // set spy. NOTE: Forcing private property to be public - const nodeBSpy = nodeB.getPubSub() as Partial as SinonStubbedInstance<{ + const nodeBSpy = nodeB.pubsub as Partial as SinonStubbedInstance<{ handlePeerReadStreamError: GossipSub['handlePeerReadStreamError'] }> sinon.spy(nodeBSpy, 'handlePeerReadStreamError') // This should lead to handlePeerReadStreamError at nodeB - await nodeA.getPubSub().publish(topic, new Uint8Array(5000000)) - await pEvent(nodeA.getPubSub(), 'gossipsub:heartbeat') + await nodeA.pubsub.publish(topic, new Uint8Array(5000000)) + await pEvent(nodeA.pubsub, 'gossipsub:heartbeat') const expectedError = nodeBSpy.handlePeerReadStreamError.getCalls()[0]?.args[0] expect(expectedError !== undefined && (expectedError as unknown as { code: string }).code, 'ERR_MSG_DATA_TOO_LONG') + + // unset spy + nodeBSpy.handlePeerReadStreamError.restore() }) it('should send piggyback control into other sent messages', async function () {