From c9db5f0c18ae4449ed31cfa63edaebb61f62988d Mon Sep 17 00:00:00 2001 From: achingbrain Date: Sun, 17 Dec 2023 11:41:02 +0000 Subject: [PATCH] chore: add test --- packages/multistream-select/src/select.ts | 6 +++ .../test/integration.spec.ts | 41 ++++++++++++++++--- 2 files changed, 42 insertions(+), 5 deletions(-) diff --git a/packages/multistream-select/src/select.ts b/packages/multistream-select/src/select.ts index 752dd49ac1..e48ffdb785 100644 --- a/packages/multistream-select/src/select.ts +++ b/packages/multistream-select/src/select.ts @@ -182,6 +182,12 @@ function optimisticSelect (stream: Stream, protoco sentProtocol = true sendingProtocol = false doneSendingProtocol.resolve() + + // finish negotiation but don't block more sending + negotiate() + .catch(err => { + options.log.error('could not finish optimistic protocol negotiation of %s', protocol, err) + }) } else { yield buf } diff --git a/packages/multistream-select/test/integration.spec.ts b/packages/multistream-select/test/integration.spec.ts index 479f798ad3..729439083d 100644 --- a/packages/multistream-select/test/integration.spec.ts +++ b/packages/multistream-select/test/integration.spec.ts @@ -88,7 +88,7 @@ describe('Dialer and Listener integration', () => { expect(new Uint8ArrayList(...output[0]).slice()).to.eql(new Uint8ArrayList(...input).slice()) }) - it('should handle and lazySelect', async () => { + it('should handle and optimistically select', async () => { const protocol = '/echo/1.0.0' const pair = duplexPair() @@ -113,7 +113,7 @@ describe('Dialer and Listener integration', () => { expect(new Uint8ArrayList(...dialerOut).slice()).to.eql(new Uint8ArrayList(...input).slice()) }) - it('should handle and lazySelect that fails', async () => { + it('should handle and optimistically select that fails', async () => { const protocol = '/echo/1.0.0' const otherProtocol = '/echo/2.0.0' const pair = duplexPair() @@ -134,7 +134,7 @@ describe('Dialer and Listener integration', () => { .to.eventually.be.rejected.with.property('code', 'ERR_UNSUPPORTED_PROTOCOL') }) - it('should handle and lazySelect only by reading', async () => { + it('should handle and optimistically select only by reading', async () => { const protocol = '/echo/1.0.0' const pair = duplexPair() @@ -162,7 +162,38 @@ describe('Dialer and Listener integration', () => { expect(new Uint8ArrayList(...dialerOut).slice()).to.eql(new Uint8ArrayList(...input).slice()) }) - it('should handle and lazySelect only by reading that fails', async () => { + it('should handle and optimistically select only by writing', async () => { + const protocol = '/echo/1.0.0' + const pair = duplexPair() + + const dialerSelection = await mss.select(pair[0], [protocol], { + log: logger('mss:dialer') + }) + expect(dialerSelection.protocol).to.equal(protocol) + + // ensure stream is usable after selection + const input = [randomBytes(10), randomBytes(64), randomBytes(3)] + + const [listenerOut] = await Promise.all([ + // the listener handles the incoming stream + mss.handle(pair[1], protocol, { + log: logger('mss:listener') + }).then(async result => { + // the listener reads from the incoming stream + return pipe(result.stream, async source => all(source)) + }), + Promise.resolve().then(async () => { + // the dialer just writes to the stream + await pair[0].sink(async function * () { + yield * input + }()) + }) + ]) + + expect(new Uint8ArrayList(...listenerOut).slice()).to.eql(new Uint8ArrayList(...input).slice()) + }) + + it('should handle and optimistically select only by reading that fails', async () => { const protocol = '/echo/1.0.0' const otherProtocol = '/echo/2.0.0' const pair = duplexPair() @@ -183,7 +214,7 @@ describe('Dialer and Listener integration', () => { .to.eventually.be.rejected.with.property('code', 'ERR_UNSUPPORTED_PROTOCOL') }) - it('should abort an unhandled lazySelect', async () => { + it('should abort an unhandled optimistically select', async () => { const protocol = '/echo/1.0.0' const pair = duplexPair()