Skip to content

Commit

Permalink
chore: select encrypter and muxer optimistically
Browse files Browse the repository at this point in the history
  • Loading branch information
achingbrain committed Nov 22, 2023
1 parent 4fcfd6a commit 95fac18
Show file tree
Hide file tree
Showing 10 changed files with 93 additions and 45 deletions.
28 changes: 14 additions & 14 deletions perf/impl/js-libp2p/v1.0-optimistic/libp2p/dist/index.min.js

Large diffs are not rendered by default.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ export declare class DefaultUpgrader implements Upgrader {
private readonly muxers;
private readonly inboundUpgradeTimeout;
private readonly events;
private readonly logger;
private readonly log;
constructor(components: DefaultUpgraderComponents, init: UpgraderInit);
shouldBlockConnection(remotePeer: PeerId, maConn: MultiaddrConnection, connectionType: ConnectionDeniedType): Promise<void>;
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

48 changes: 36 additions & 12 deletions perf/impl/js-libp2p/v1.0-optimistic/libp2p/dist/src/upgrader.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ export class DialQueue {
// internal peer dial queue - only one dial per peer at a time
const peerDialQueue = new PQueue({ concurrency: 1 })
peerDialQueue.on('error', (err) => {
this.log.error('error dialing [%s] %o', pendingDial.multiaddrs, err)
this.log.error('error dialing %s %o', pendingDial.multiaddrs, err)
})

const conn = await Promise.any(pendingDial.multiaddrs.map(async (addr, i) => {
Expand Down
49 changes: 37 additions & 12 deletions perf/impl/js-libp2p/v1.0-optimistic/libp2p/src/upgrader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,12 @@ export class DefaultUpgrader implements Upgrader {
private readonly muxers: Map<string, StreamMuxerFactory>
private readonly inboundUpgradeTimeout: number
private readonly events: TypedEventTarget<Libp2pEvents>
private readonly logger: ComponentLogger
private readonly log: Logger

constructor (components: DefaultUpgraderComponents, init: UpgraderInit) {
this.components = components
this.connectionEncryption = new Map()
this.log = components.logger.forComponent('libp2p:upgrader')
this.logger = components.logger

init.connectionEncryption.forEach(encrypter => {
this.connectionEncryption.set(encrypter.protocol, encrypter)
Expand Down Expand Up @@ -440,13 +438,13 @@ export class DefaultUpgrader implements Upgrader {
throw new CodeError('Stream is not multiplexed', codes.ERR_MUXER_UNAVAILABLE)
}

connection.log('starting new stream for protocols [%s]', protocols)
connection.log('starting new stream for protocols %s', protocols)
const muxedStream = await muxer.newStream()
connection.log.trace('started new stream %s for protocols [%s]', muxedStream.id, protocols)
connection.log.trace('started new stream %s for protocols %s', muxedStream.id, protocols)

try {
if (options.signal == null) {
this.log('No abort signal was passed while trying to negotiate protocols [%s] falling back to default timeout', protocols)
this.log('No abort signal was passed while trying to negotiate protocols %s falling back to default timeout', protocols)

const signal = AbortSignal.timeout(DEFAULT_PROTOCOL_SELECT_TIMEOUT)
setMaxListeners(Infinity, signal)
Expand All @@ -468,7 +466,7 @@ export class DefaultUpgrader implements Upgrader {
yieldBytes: false
}))
} else {
connection.log.trace('starting new stream for protocols [%s], using regular select', protocols);
connection.log.trace('starting new stream for protocols %s, using regular select', protocols);
({ stream, protocol } = await mss.select(muxedStream, protocols, {
...options,
log: muxedStream.log,
Expand Down Expand Up @@ -650,9 +648,23 @@ export class DefaultUpgrader implements Upgrader {
this.log('selecting outbound crypto protocol', protocols)

try {
const { stream, protocol } = await mss.select(connection, protocols, {
log: this.logger.forComponent('libp2p:mss:select')
})
let stream: MultiaddrConnection
let protocol: string

if (protocols.length === 1) {
connection.log.trace('selecting encryption protocol "%s", using lazy select', protocols[0]);
({ stream, protocol } = mss.lazySelect(connection, protocols[0], {
log: connection.log,
yieldBytes: true
}))
} else {
connection.log.trace('selecting encryption from %s, using regular select', protocols);
({ stream, protocol } = await mss.select(connection, protocols, {
log: connection.log,
yieldBytes: true
}))
}

const encrypter = this.connectionEncryption.get(protocol)

if (encrypter == null) {
Expand All @@ -678,9 +690,22 @@ export class DefaultUpgrader implements Upgrader {
const protocols = Array.from(muxers.keys())
this.log('outbound selecting muxer %s', protocols)
try {
const { stream, protocol } = await mss.select(connection, protocols, {
log: this.logger.forComponent('libp2p:mss:select')
})
let stream: MultiaddrConnection
let protocol: string

if (protocols.length === 1) {
connection.log.trace('selecting stream muxer "%s", using lazy select', protocols[0]);
({ stream, protocol } = mss.lazySelect(connection, protocols[0], {
log: connection.log,
yieldBytes: true
}))
} else {
connection.log.trace('selecting stream muxer from %s, using regular select', protocols);
({ stream, protocol } = await mss.select(connection, protocols, {
log: connection.log,
yieldBytes: true
}))
}
this.log('%s selected as muxer protocol', protocol)
const muxerFactory = muxers.get(protocol)

Expand Down

0 comments on commit 95fac18

Please sign in to comment.