Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: close streams gracefully #57

Merged
merged 3 commits into from
Aug 3, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 8 additions & 11 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -171,24 +171,21 @@
"docs": "aegir docs"
},
"dependencies": {
"@libp2p/interface-connection": "^5.1.0",
"@libp2p/interface-stream-muxer": "^4.1.2",
"@libp2p/interfaces": "^3.3.2",
"@libp2p/logger": "^2.0.7",
"@libp2p/interface": "^0.1.0",
"@libp2p/logger": "^3.0.0",
"abortable-iterator": "^5.0.1",
"any-signal": "^4.1.1",
"it-foreach": "^2.0.3",
"it-pipe": "^3.0.1",
"it-pushable": "^3.1.3",
"it-pushable": "^3.2.0",
"uint8arraylist": "^2.4.3"
},
"devDependencies": {
"@dapplion/benchmark": "^0.2.4",
"@libp2p/interface-stream-muxer-compliance-tests": "^7.0.3",
"@libp2p/mplex": "^8.0.3",
"aegir": "^39.0.7",
"@libp2p/interface-compliance-tests": "^4.0.0",
"@libp2p/mplex": "^9.0.0",
"aegir": "^40.0.1",
"it-drain": "^3.0.2",
"it-pair": "^2.0.6",
"it-stream-types": "^2.0.1"
},
"browser": {}
}
}
2 changes: 1 addition & 1 deletion src/config.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { CodeError } from '@libp2p/interfaces/errors'
import { CodeError } from '@libp2p/interface/errors'
import { logger, type Logger } from '@libp2p/logger'
import { ERR_INVALID_CONFIG, INITIAL_STREAM_WINDOW, MAX_STREAM_WINDOW } from './constants.js'

Expand Down
2 changes: 1 addition & 1 deletion src/decode.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { CodeError } from '@libp2p/interfaces/errors'
import { CodeError } from '@libp2p/interface/errors'
import { Uint8ArrayList } from 'uint8arraylist'
import { ERR_DECODE_INVALID_VERSION, ERR_DECODE_IN_PROGRESS } from './constants.js'
import { type FrameHeader, FrameType, HEADER_LENGTH, YAMUX_VERSION } from './frame.js'
Expand Down
2 changes: 1 addition & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Yamux } from './muxer.js'
import type { YamuxMuxerInit } from './muxer.js'
import type { StreamMuxerFactory } from '@libp2p/interface-stream-muxer'
import type { StreamMuxerFactory } from '@libp2p/interface/stream-muxer'
export { GoAwayCode } from './frame.js'

export function yamux (init: YamuxMuxerInit = {}): () => StreamMuxerFactory {
Expand Down
100 changes: 57 additions & 43 deletions src/muxer.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { CodeError } from '@libp2p/interfaces/errors'
import { CodeError } from '@libp2p/interface/errors'
import { logger, type Logger } from '@libp2p/logger'
import { abortableSource } from 'abortable-iterator'
import { anySignal, type ClearableSignal } from 'any-signal'
import { pipe } from 'it-pipe'
import { pushable, type Pushable } from 'it-pushable'
import { type Config, defaultConfig, verifyConfig } from './config.js'
Expand All @@ -9,13 +9,14 @@ import { Decoder } from './decode.js'
import { encodeHeader } from './encode.js'
import { Flag, type FrameHeader, FrameType, GoAwayCode, stringifyHeader } from './frame.js'
import { StreamState, YamuxStream } from './stream.js'
import type { Stream } from '@libp2p/interface-connection'
import type { StreamMuxer, StreamMuxerFactory, StreamMuxerInit } from '@libp2p/interface-stream-muxer'
import type { Logger } from '@libp2p/logger'
import type { AbortOptions } from '@libp2p/interface'
import type { Stream } from '@libp2p/interface/connection'
import type { StreamMuxer, StreamMuxerFactory, StreamMuxerInit } from '@libp2p/interface/stream-muxer'
import type { Sink, Source } from 'it-stream-types'
import type { Uint8ArrayList } from 'uint8arraylist'

const YAMUX_PROTOCOL_ID = '/yamux/1.0.0'
const CLOSE_TIMEOUT = 500

export interface YamuxMuxerInit extends StreamMuxerInit, Partial<Config> {
}
Expand All @@ -36,12 +37,15 @@ export class Yamux implements StreamMuxerFactory {
}
}

export interface CloseOptions extends AbortOptions {
reason?: GoAwayCode
}

export class YamuxMuxer implements StreamMuxer {
protocol = YAMUX_PROTOCOL_ID
source: Pushable<Uint8Array>
sink: Sink<Source<Uint8ArrayList | Uint8Array>, Promise<void>>

private readonly _init: YamuxMuxerInit
private readonly config: Config
private readonly log?: Logger

Expand Down Expand Up @@ -75,7 +79,6 @@ export class YamuxMuxer implements StreamMuxer {
private readonly onStreamEnd?: (stream: Stream) => void

constructor (init: YamuxMuxerInit) {
this._init = init
this.client = init.direction === 'outbound'
this.config = { ...defaultConfig, ...init }
this.log = this.config.log
Expand All @@ -89,22 +92,19 @@ export class YamuxMuxer implements StreamMuxer {
this._streams = new Map()

this.source = pushable({
onEnd: (err?: Error): void => {
onEnd: (): void => {
this.log?.trace('muxer source ended')
this.close(err)

this._streams.forEach(stream => {
stream.destroy()
})
}
})

this.sink = async (source: Source<Uint8ArrayList | Uint8Array>): Promise<void> => {
let signal: ClearableSignal | undefined

if (this._init.signal != null) {
signal = anySignal([this.closeController.signal, this._init.signal])
}

source = abortableSource(
source,
signal ?? this.closeController.signal,
this.closeController.signal,
{ returnOnAbort: true }
)

Expand Down Expand Up @@ -133,15 +133,15 @@ export class YamuxMuxer implements StreamMuxer {
}

error = err as Error
} finally {
if (signal != null) {
signal.clear()
}
}

this.log?.trace('muxer sink ended')

this.close(error, reason)
if (error != null) {
this.abort(error, reason)
} else {
await this.close({ reason })
}
}

this.numInboundStreams = 0
Expand Down Expand Up @@ -261,34 +261,48 @@ export class YamuxMuxer implements StreamMuxer {

/**
* Close the muxer
*
* @param err
* @param reason - The GoAway reason to be sent
*/
close (err?: Error, reason?: GoAwayCode): void {
async close (options: CloseOptions = {}): Promise<void> {
if (this.closeController.signal.aborted) {
// already closed
return
}

// If reason was provided, use that, otherwise use the presence of `err` to determine the reason
reason = reason ?? (err === undefined ? GoAwayCode.InternalError : GoAwayCode.NormalTermination)
const reason = options?.reason ?? GoAwayCode.NormalTermination

if (err != null) {
this.log?.error('muxer close reason=%s error=%s', GoAwayCode[reason], err)
} else {
this.log?.trace('muxer close reason=%s', GoAwayCode[reason])
this.log?.trace('muxer close reason=%s', reason)

options.signal = options.signal ?? AbortSignal.timeout(CLOSE_TIMEOUT)

try {
// If err is provided, abort all underlying streams, else close all underlying streams
wemeetagain marked this conversation as resolved.
Show resolved Hide resolved
await Promise.all(
[...this._streams.values()].map(async s => s.close(options))
)

// send reason to the other side, allow the other side to close gracefully
this.sendGoAway(reason)

this._closeMuxer()
} catch (err: any) {
this.abort(err)
}
}

// If err is provided, abort all underlying streams, else close all underlying streams
if (err === undefined) {
for (const stream of this._streams.values()) {
stream.close()
}
} else {
for (const stream of this._streams.values()) {
stream.abort(err)
}
abort (err: Error, reason?: GoAwayCode): void {
if (this.closeController.signal.aborted) {
// already closed
return
}

reason = reason ?? GoAwayCode.InternalError

// If reason was provided, use that, otherwise use the presence of `err` to determine the reason
this.log?.error('muxer abort reason=%s error=%s', reason, err)

// Abort all underlying streams
for (const stream of this._streams.values()) {
stream.abort(err)
}

// send reason to the other side, allow the other side to close gracefully
Expand Down Expand Up @@ -319,16 +333,16 @@ export class YamuxMuxer implements StreamMuxer {
}

const stream = new YamuxStream({
id,
id: id.toString(),
name,
state,
direction,
sendFrame: this.sendFrame.bind(this),
onStreamEnd: () => {
onEnd: () => {
this.closeStream(id)
this.onStreamEnd?.(stream)
},
log: this.log,
log: logger(`libp2p:yamux:${direction}:${id}`),
config: this.config,
getRTT: this.getRTT.bind(this)
})
Expand Down
Loading