Skip to content

Commit

Permalink
refactor: remove abortable iterator (#2237)
Browse files Browse the repository at this point in the history
We use an `abortable-iterator` to race an abort signal against an
incoming for streams and connections so we do not wait on slow connections
forever.

Unfortunately this introduces so much latency it becomes a performance
bottleneck, and in all cases we can just add an `abort` listener to the
incoming abort signal and close the underlying stream if it fires.

Removing the latency added by the abortable iterator reveals that tests using
it-pair, it-pushable and friends have many timing issues so it's necessary
to ensure we are supplying data to streams as it is required.

This opens up the possibility of having backpressure applied to protocol
streams by the underlying transports which isn't currently possible.
  • Loading branch information
achingbrain authored Nov 17, 2023
1 parent 9c67c5b commit 6625a27
Show file tree
Hide file tree
Showing 75 changed files with 1,705 additions and 1,620 deletions.
15 changes: 9 additions & 6 deletions packages/integration-tests/test/circuit-relay.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import all from 'it-all'
import { pipe } from 'it-pipe'
import { createLibp2p, type Libp2pOptions } from 'libp2p'
import defer from 'p-defer'
import pRetry from 'p-retry'
import pWaitFor from 'p-wait-for'
import sinon from 'sinon'
import { Uint8ArrayList } from 'uint8arraylist'
Expand Down Expand Up @@ -502,13 +503,15 @@ describe('circuit-relay', () => {
expect(conns).to.have.lengthOf(1)
expect(conns).to.have.nested.property('[0].status', 'open')

// we should not have any streams with the hop codec
const streams = local.getConnections(relay1.peerId)
.map(conn => conn.streams)
.flat()
.filter(stream => stream.protocol === RELAY_V2_HOP_CODEC)
await pRetry(() => {
// we should not have any streams with the hop codec
const streams = local.getConnections(relay1.peerId)
.map(conn => conn.streams)
.flat()
.filter(stream => stream.protocol === RELAY_V2_HOP_CODEC)

expect(streams).to.be.empty()
expect(streams).to.be.empty()
})
})

it('destination peer should stay connected to an already connected relay on hop failure', async () => {
Expand Down
3 changes: 3 additions & 0 deletions packages/integration-tests/test/fetch.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ async function createNode (): Promise<Libp2p<{ fetch: Fetch }>> {
return createLibp2p(createBaseOptions({
services: {
fetch: fetch()
},
connectionManager: {
minConnections: 0
}
}))
}
Expand Down
2 changes: 1 addition & 1 deletion packages/interface-compliance-tests/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,13 @@
"@libp2p/peer-collections": "^4.0.8",
"@libp2p/peer-id": "^3.0.6",
"@libp2p/peer-id-factory": "^3.0.8",
"@libp2p/utils": "^4.0.7",
"@multiformats/multiaddr": "^12.1.10",
"abortable-iterator": "^5.0.1",
"aegir": "^41.0.2",
"delay": "^6.0.0",
"it-all": "^3.0.2",
"it-drain": "^3.0.2",
"it-handshake": "^4.1.3",
"it-map": "^3.0.4",
"it-ndjson": "^1.0.3",
"it-pair": "^2.0.6",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { logger } from '@libp2p/logger'
import { multiaddr } from '@multiformats/multiaddr'
import { duplexPair } from 'it-pair/duplex'
import type { MultiaddrConnection } from '@libp2p/interface/connection'
Expand All @@ -14,7 +15,8 @@ export function createMaConnPair (): [MultiaddrConnection, MultiaddrConnection]
remoteAddr: multiaddr('/ip4/127.0.0.1/tcp/4001'),
timeline: {
open: Date.now()
}
},
log: logger('duplex-maconn')
}

return output
Expand Down
113 changes: 0 additions & 113 deletions packages/interface-compliance-tests/src/mocks/connection-encrypter.ts

This file was deleted.

60 changes: 51 additions & 9 deletions packages/interface-compliance-tests/src/mocks/connection.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import { CodeError } from '@libp2p/interface/errors'
import { defaultLogger } from '@libp2p/logger'
import { defaultLogger, logger } from '@libp2p/logger'
import * as mss from '@libp2p/multistream-select'
import { peerIdFromString } from '@libp2p/peer-id'
import { closeSource } from '@libp2p/utils/close-source'
import { duplexPair } from 'it-pair/duplex'
import { pipe } from 'it-pipe'
import { Uint8ArrayList } from 'uint8arraylist'
import { mockMultiaddrConnection } from './multiaddr-connection.js'
import { mockMuxer } from './muxer.js'
import { mockRegistrar } from './registrar.js'
import type { AbortOptions, ComponentLogger } from '@libp2p/interface'
import type { AbortOptions, ComponentLogger, Logger } from '@libp2p/interface'
import type { MultiaddrConnection, Connection, Stream, Direction, ConnectionTimeline, ConnectionStatus } from '@libp2p/interface/connection'
import type { PeerId } from '@libp2p/interface/peer-id'
import type { StreamMuxer, StreamMuxerFactory } from '@libp2p/interface/stream-muxer'
Expand Down Expand Up @@ -44,6 +45,7 @@ class MockConnection implements Connection {
public streams: Stream[]
public tags: string[]
public transient: boolean
public log: Logger

private readonly muxer: StreamMuxer
private readonly maConn: MultiaddrConnection
Expand All @@ -67,6 +69,7 @@ class MockConnection implements Connection {
this.maConn = maConn
this.transient = false
this.logger = logger
this.log = logger.forComponent(this.id)
}

async newStream (protocols: string | string[], options?: AbortOptions): Promise<Stream> {
Expand Down Expand Up @@ -139,7 +142,9 @@ export function mockConnection (maConn: MultiaddrConnection, opts: MockConnectio
direction,
onIncomingStream: (muxedStream) => {
try {
mss.handle(muxedStream, registrar.getProtocols())
mss.handle(muxedStream, registrar.getProtocols(), {
log
})
.then(({ stream, protocol }) => {
log('%s: incoming stream opened on %s', direction, protocol)
muxedStream.protocol = protocol
Expand Down Expand Up @@ -185,10 +190,12 @@ export interface StreamInit {
}

export function mockStream (stream: Duplex<AsyncGenerator<Uint8ArrayList>, Source<Uint8ArrayList | Uint8Array>, Promise<void>>, init: StreamInit = {}): Stream {
const originalSource = stream.source
const id = `stream-${Date.now()}`
const log = logger(`libp2p:mock-stream:${id}`)

// ensure stream output is `Uint8ArrayList` as it would be from an actual
// Stream where everything is length-varint encoded
const originalSource = stream.source
stream.source = (async function * (): AsyncGenerator<Uint8ArrayList, any, unknown> {
for await (const buf of originalSource) {
if (buf instanceof Uint8Array) {
Expand All @@ -199,12 +206,44 @@ export function mockStream (stream: Duplex<AsyncGenerator<Uint8ArrayList>, Sourc
}
})()

return {
const abortSinkController = new AbortController()
const originalSink = stream.sink.bind(stream)
stream.sink = async (source) => {
abortSinkController.signal.addEventListener('abort', () => {
closeSource(source, log)
})

await originalSink(source)
}

const mockStream: Stream = {
...stream,
close: async () => {},
closeRead: async () => {},
closeWrite: async () => {},
abort: () => {},
close: async (options) => {
await mockStream.closeRead(options)
await mockStream.closeWrite(options)
},
closeRead: async () => {
closeSource(originalSource, log)
mockStream.timeline.closeRead = Date.now()

if (mockStream.timeline.closeWrite != null) {
mockStream.timeline.close = Date.now()
}
},
closeWrite: async () => {
abortSinkController.abort()
mockStream.timeline.closeWrite = Date.now()

if (mockStream.timeline.closeRead != null) {
mockStream.timeline.close = Date.now()
}
},
abort: () => {
closeSource(originalSource, log)
mockStream.timeline.closeWrite = Date.now()
mockStream.timeline.closeRead = Date.now()
mockStream.timeline.close = Date.now()
},
direction: 'outbound',
protocol: '/foo/1.0.0',
timeline: {
Expand All @@ -215,8 +254,11 @@ export function mockStream (stream: Duplex<AsyncGenerator<Uint8ArrayList>, Sourc
status: 'open',
readStatus: 'ready',
writeStatus: 'ready',
log: logger('mock-stream'),
...init
}

return mockStream
}

export interface StreamPairInit {
Expand Down
1 change: 0 additions & 1 deletion packages/interface-compliance-tests/src/mocks/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
export { mockConnectionEncrypter } from './connection-encrypter.js'
export { mockConnectionGater } from './connection-gater.js'
export { mockConnectionManager, mockNetwork } from './connection-manager.js'
export { mockConnection, mockStream, streamPair, connectionPair } from './connection.js'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { logger } from '@libp2p/logger'
import { multiaddr } from '@multiformats/multiaddr'
import { abortableSource } from 'abortable-iterator'
import { duplexPair } from 'it-pair/duplex'
Expand All @@ -16,6 +17,7 @@ export function mockMultiaddrConnection (source: Duplex<AsyncGenerator<Uint8Arra
open: Date.now()
},
remoteAddr: multiaddr(`/ip4/127.0.0.1/tcp/4001/p2p/${peerId.toString()}`),
log: logger('mock-maconn'),
...source
}

Expand Down Expand Up @@ -49,7 +51,8 @@ export function mockMultiaddrConnPair (opts: MockMultiaddrConnPairOptions): { in
abort: (err: Error) => {
outbound.timeline.close = Date.now()
controller.abort(err)
}
},
log: logger('mock-maconn-outbound')
}

const inbound: MultiaddrConnection = {
Expand All @@ -65,7 +68,8 @@ export function mockMultiaddrConnPair (opts: MockMultiaddrConnPairOptions): { in
abort: (err: Error) => {
outbound.timeline.close = Date.now()
controller.abort(err)
}
},
log: logger('mock-maconn-inbound')
}

// Make the sources abortable so we can close them easily
Expand Down
27 changes: 20 additions & 7 deletions packages/interface-compliance-tests/src/mocks/muxer.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { AbstractStream, type AbstractStreamInit } from '@libp2p/interface/stream-muxer/stream'
import { type Logger, logger } from '@libp2p/logger'
import { AbstractStream, type AbstractStreamInit } from '@libp2p/utils/abstract-stream'
import { abortableSource } from 'abortable-iterator'
import map from 'it-map'
import * as ndjson from 'it-ndjson'
Expand Down Expand Up @@ -249,14 +249,27 @@ class MockMuxer implements StreamMuxer {
return
}

this.log('closing muxed streams')
const signal = options?.signal ?? AbortSignal.timeout(10)

await Promise.all(
this.streams.map(async s => s.close())
)
try {
// try to gracefully close all streams
await Promise.all(
this.streams.map(async s => s.close({
signal
}))
)

this.input.end()

this.closeController.abort()
this.input.end()
// try to gracefully close the muxer
await this.input.onEmpty({
signal
})

this.closeController.abort()
} catch (err: any) {
this.abort(err)
}
}

abort (err: Error): void {
Expand Down
Loading

0 comments on commit 6625a27

Please sign in to comment.