Skip to content

Commit

Permalink
feat: add negotiateFully option when opening streams (#2331)
Browse files Browse the repository at this point in the history
Adds an option to `libp2p.dialProtocol` and `connection.newStream` to enable optimistic protocol selection but defaults to waiting for negotiation to finish before sending data.

This increases the time-to-first-byte of a stream negotiated with a single protocol but restores the previous behaviour.

Fixes #2321

---------

Co-authored-by: Chad Nehemiah <chad.nehemiah94@gmail.com>
  • Loading branch information
achingbrain and maschad authored Dec 28, 2023
1 parent 4e0135c commit 5d1f68e
Show file tree
Hide file tree
Showing 7 changed files with 183 additions and 15 deletions.
4 changes: 2 additions & 2 deletions packages/interface-compliance-tests/src/mocks/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { Uint8ArrayList } from 'uint8arraylist'
import { mockMultiaddrConnection } from './multiaddr-connection.js'
import { mockMuxer } from './muxer.js'
import { mockRegistrar } from './registrar.js'
import type { AbortOptions, ComponentLogger, Logger, MultiaddrConnection, Connection, Stream, Direction, ConnectionTimeline, ConnectionStatus, PeerId, StreamMuxer, StreamMuxerFactory } from '@libp2p/interface'
import type { AbortOptions, ComponentLogger, Logger, MultiaddrConnection, Connection, Stream, Direction, ConnectionTimeline, ConnectionStatus, PeerId, StreamMuxer, StreamMuxerFactory, NewStreamOptions } from '@libp2p/interface'
import type { Registrar } from '@libp2p/interface-internal'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { Duplex, Source } from 'it-stream-types'
Expand Down Expand Up @@ -69,7 +69,7 @@ class MockConnection implements Connection {
this.log = logger.forComponent(this.id)
}

async newStream (protocols: string | string[], options?: AbortOptions): Promise<Stream> {
async newStream (protocols: string | string[], options?: NewStreamOptions): Promise<Stream> {
if (!Array.isArray(protocols)) {
protocols = [protocols]
}
Expand Down
29 changes: 29 additions & 0 deletions packages/interface/src/connection/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,37 @@ export interface NewStreamOptions extends AbortOptions {
/**
* Opt-in to running over a transient connection - one that has time/data limits
* placed on it.
*
* @default false
*/
runOnTransientConnection?: boolean

/**
* By default when negotiating a protocol the dialer writes then protocol name
* then reads the response.
*
* When a only a single protocol is being negotiated on an outbound stream,
* and the stream is written to before being read from, we can optimistically
* write the protocol name and the first chunk of data together in the first
* message.
*
* Reading and handling the protocol response is done asynchronously, which
* means we can skip a round trip on writing to newly opened streams which
* significantly reduces the time-to-first-byte on a stream.
*
* The side-effect of this is that the underlying stream won't negotiate the
* protocol until either data is written to or read from the stream so it will
* not be opened on the remote until this is done.
*
* Pass `false` here to optimistically write the protocol name and first chunk
* of data in the first message.
*
* If multiple protocols are being negotiated, negotiation is always completed
* in full before the stream is returned so this option has no effect.
*
* @default true
*/
negotiateFully?: boolean
}

export type ConnectionStatus = 'open' | 'closing' | 'closed'
Expand Down
104 changes: 103 additions & 1 deletion packages/libp2p/test/connection-manager/direct.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import fs from 'node:fs'
import os from 'node:os'
import path from 'node:path'
import { yamux } from '@chainsafe/libp2p-yamux'
import { type Connection, type ConnectionProtector, isConnection, type PeerId } from '@libp2p/interface'
import { type Connection, type ConnectionProtector, isConnection, type PeerId, type Stream } from '@libp2p/interface'
import { AbortError, ERR_TIMEOUT, TypedEventEmitter, start, stop } from '@libp2p/interface'
import { mockConnection, mockConnectionGater, mockDuplex, mockMultiaddrConnection, mockUpgrader } from '@libp2p/interface-compliance-tests/mocks'
import { defaultLogger } from '@libp2p/logger'
Expand All @@ -20,6 +20,7 @@ import { MemoryDatastore } from 'datastore-core/memory'
import delay from 'delay'
import { pipe } from 'it-pipe'
import { pushable } from 'it-pushable'
import pDefer from 'p-defer'
import pWaitFor from 'p-wait-for'
import Sinon from 'sinon'
import { stubInterface } from 'sinon-ts'
Expand Down Expand Up @@ -640,4 +641,105 @@ describe('libp2p.dialer (direct, TCP)', () => {

expect(connection.remotePeer.toString()).to.equal(remotePeerId.toString())
})

it('should negotiate protocol fully when dialing a protocol', async () => {
remoteLibp2p = await createLibp2pNode({
addresses: {
listen: [
'/ip4/0.0.0.0/tcp/0'
]
},
transports: [
tcp()
],
streamMuxers: [
yamux()
],
connectionEncryption: [
plaintext()
]
})

libp2p = await createLibp2pNode({
peerId,
transports: [
tcp()
],
streamMuxers: [
yamux()
],
connectionEncryption: [
plaintext()
]
})

await Promise.all([
remoteLibp2p.start(),
libp2p.start()
])

const protocol = '/test/1.0.0'
const streamOpen = pDefer<Stream>()

await remoteLibp2p.handle(protocol, ({ stream }) => {
streamOpen.resolve(stream)
})

const outboundStream = await libp2p.dialProtocol(remoteLibp2p.getMultiaddrs(), protocol)

expect(outboundStream).to.have.property('protocol', protocol)

await expect(streamOpen.promise).to.eventually.have.property('protocol', protocol)
})

it('should negotiate protocol fully when opening on a connection', async () => {
remoteLibp2p = await createLibp2pNode({
addresses: {
listen: [
'/ip4/0.0.0.0/tcp/0'
]
},
transports: [
tcp()
],
streamMuxers: [
yamux()
],
connectionEncryption: [
plaintext()
]
})

libp2p = await createLibp2pNode({
peerId,
transports: [
tcp()
],
streamMuxers: [
yamux()
],
connectionEncryption: [
plaintext()
]
})

await Promise.all([
remoteLibp2p.start(),
libp2p.start()
])

const protocol = '/test/1.0.0'
const streamOpen = pDefer<Stream>()

await remoteLibp2p.handle(protocol, ({ stream }) => {
streamOpen.resolve(stream)
})

const connection = await libp2p.dial(remoteLibp2p.getMultiaddrs())
const outboundStream = await connection.newStream(protocol)

expect(outboundStream).to.have.property('protocol', protocol)

await expect(streamOpen.promise).to.eventually.have.property('protocol', protocol)
})
})
9 changes: 8 additions & 1 deletion packages/multistream-select/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,14 @@ export interface ProtocolStream<Stream> {
}

export interface MultistreamSelectInit extends AbortOptions, LoggerOptions, Partial<LengthPrefixedStreamOpts> {

/**
* When false, and only a single protocol is being negotiated, use optimistic
* select to send both the protocol name and the first data buffer in the
* initial message, saving a round trip for connection establishment.
*
* @default true
*/
negotiateFully?: boolean
}

export { select } from './select.js'
Expand Down
2 changes: 1 addition & 1 deletion packages/multistream-select/src/select.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ export interface SelectStream extends Duplex<any, any, any> {
export async function select <Stream extends SelectStream> (stream: Stream, protocols: string | string[], options: MultistreamSelectInit): Promise<ProtocolStream<Stream>> {
protocols = Array.isArray(protocols) ? [...protocols] : [protocols]

if (protocols.length === 1) {
if (protocols.length === 1 && options.negotiateFully === false) {
return optimisticSelect(stream, protocols[0], options)
}

Expand Down
32 changes: 28 additions & 4 deletions packages/multistream-select/test/dialer.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ describe('Dialer', () => {
const input = [randomBytes(10), randomBytes(64), randomBytes(3)]

void mss.select(outgoingStream, protocol, {
log: logger('mss:test-outgoing')
log: logger('mss:test-outgoing'),
negotiateFully: false
})

// have to interact with the stream to start protocol negotiation
Expand Down Expand Up @@ -126,13 +127,14 @@ describe('Dialer', () => {
})
})

describe('dialer.lazySelect', () => {
it('should lazily select a single protocol', async () => {
describe('dialer optimistic select', () => {
it('should optimistically select a single protocol when negotiateFully is false', async () => {
const protocol = '/echo/1.0.0'
const [outgoingStream, incomingStream] = duplexPair<Uint8Array>()

const selection = await mss.select(outgoingStream, [protocol], {
log: logger('mss:test-lazy')
log: logger('mss:test-optimistic'),
negotiateFully: false
})
expect(selection.protocol).to.equal(protocol)

Expand All @@ -151,5 +153,27 @@ describe('Dialer', () => {
uint8ArrayFromString(`${protocol}\n`),
...input).subarray())
})

it('should not optimistically select a single protocol when negotiateFully is true', async () => {
const protocols = ['/echo/1.0.0']
const selectedProtocol = protocols[protocols.length - 1]
const [outgoingStream, incomingStream] = duplexPair<Uint8Array>()

void mss.handle(incomingStream, [selectedProtocol], {
log: logger('mss:test-incoming')
})

const selection = await mss.select(outgoingStream, protocols, {
log: logger('mss:test-un-optimistic'),
negotiateFully: true
})
expect(selection.protocol).to.equal(selectedProtocol)

// Ensure stream is usable after selection
const input = [randomBytes(10), randomBytes(64), randomBytes(3)]
void pipe(input, selection.stream)
const output = await all(incomingStream.source)
expect(new Uint8ArrayList(...output).slice()).to.eql(new Uint8ArrayList(...input).slice())
})
})
})
18 changes: 12 additions & 6 deletions packages/multistream-select/test/integration.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ describe('Dialer and Listener integration', () => {
const pair = duplexPair<Uint8ArrayList | Uint8Array>()

const dialerSelection = await mss.select(pair[0], [protocol], {
log: logger('mss:test')
log: logger('mss:test'),
negotiateFully: false
})
expect(dialerSelection.protocol).to.equal(protocol)

Expand All @@ -119,7 +120,8 @@ describe('Dialer and Listener integration', () => {
const pair = duplexPair<Uint8ArrayList | Uint8Array>()

const dialerSelection = await mss.select(pair[0], [protocol], {
log: logger('mss:test')
log: logger('mss:test'),
negotiateFully: false
})
expect(dialerSelection.protocol).to.equal(protocol)

Expand All @@ -139,7 +141,8 @@ describe('Dialer and Listener integration', () => {
const pair = duplexPair<Uint8ArrayList | Uint8Array>()

const dialerSelection = await mss.select(pair[0], [protocol], {
log: logger('mss:dialer')
log: logger('mss:dialer'),
negotiateFully: false
})
expect(dialerSelection.protocol).to.equal(protocol)

Expand Down Expand Up @@ -167,7 +170,8 @@ describe('Dialer and Listener integration', () => {
const pair = duplexPair<Uint8ArrayList | Uint8Array>()

const dialerSelection = await mss.select(pair[0], [protocol], {
log: logger('mss:dialer')
log: logger('mss:dialer'),
negotiateFully: false
})
expect(dialerSelection.protocol).to.equal(protocol)

Expand Down Expand Up @@ -200,7 +204,8 @@ describe('Dialer and Listener integration', () => {

// lazy succeeds
const dialerSelection = await mss.select(pair[0], [protocol], {
log: logger('mss:dialer')
log: logger('mss:dialer'),
negotiateFully: false
})
expect(dialerSelection.protocol).to.equal(protocol)

Expand All @@ -219,7 +224,8 @@ describe('Dialer and Listener integration', () => {
const pair = duplexPair<Uint8ArrayList | Uint8Array>()

const dialerSelection = await mss.select(pair[0], [protocol], {
log: logger('mss:test')
log: logger('mss:test'),
negotiateFully: false
})
expect(dialerSelection.protocol).to.equal(protocol)

Expand Down

0 comments on commit 5d1f68e

Please sign in to comment.