Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
feat: close read and write streams
Browse files Browse the repository at this point in the history
This also now throws an error when a write is attempted on a non existent stream. Previously we would just send the message, but this is against the mplex protocol.

Refs: #120
Supersedes: #115
  • Loading branch information
achingbrain committed May 19, 2022
1 parent a50edeb commit 72d0154
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 4 deletions.
10 changes: 9 additions & 1 deletion src/mplex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { createStream } from './stream.js'
import { toString as uint8ArrayToString } from 'uint8arrays'
import { trackedMap } from '@libp2p/tracked-map'
import { logger } from '@libp2p/logger'
import errCode from 'err-code'
import type { Components } from '@libp2p/interfaces/components'
import type { Sink } from 'it-stream-types'
import type { StreamMuxer, StreamMuxerInit } from '@libp2p/interfaces/stream-muxer'
Expand Down Expand Up @@ -130,6 +131,10 @@ export class MplexStreamMuxer implements StreamMuxer {
}

const send = (msg: Message) => {
if (!registry.has(id)) {
throw errCode(new Error('the stream is not in the muxer registry, it may have already been closed'), 'ERR_STREAM_DOESNT_EXIST')
}

if (log.enabled) {
log.trace('%s stream %s send', type, id, printMessage(msg))
}
Expand Down Expand Up @@ -241,14 +246,17 @@ export class MplexStreamMuxer implements StreamMuxer {
switch (type) {
case MessageTypes.MESSAGE_INITIATOR:
case MessageTypes.MESSAGE_RECEIVER:
// We got data from the remote, push it into our local stream
stream.source.push(message.data.slice())
break
case MessageTypes.CLOSE_INITIATOR:
case MessageTypes.CLOSE_RECEIVER:
stream.close()
// We should expect no more data from the remote, stop reading
stream.closeRead()
break
case MessageTypes.RESET_INITIATOR:
case MessageTypes.RESET_RECEIVER:
// Stop reading and writing to the stream immediately
stream.reset()
break
default:
Expand Down
28 changes: 26 additions & 2 deletions src/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const log = logger('libp2p:mplex:stream')

const ERR_MPLEX_STREAM_RESET = 'ERR_MPLEX_STREAM_RESET'
const ERR_MPLEX_STREAM_ABORT = 'ERR_MPLEX_STREAM_ABORT'
const ERR_MPLEX_SINK_ENDED = 'ERR_MPLEX_SINK_ENDED'

export interface Options {
id: number
Expand Down Expand Up @@ -86,9 +87,28 @@ export function createStream (options: Options): MplexStream {
}

const stream = {
// Close for both Reading and Writing
close: async () => {
await Promise.all([
stream.closeRead(),
stream.closeWrite()
])
},
// Close for reading
close: () => {
stream.source.end()
closeRead: async () => {
if (sourceEnded) {
return
}

await stream.source.end()
},
// Close for writing
closeWrite: async () => {
if (sinkEnded) {
return
}

await stream.sink([])
},
// Close for reading and writing (local error)
abort: (err?: Error) => {
Expand All @@ -106,6 +126,10 @@ export function createStream (options: Options): MplexStream {
onSinkEnd(err)
},
sink: async (source: Source<Uint8Array>) => {
if (sinkEnded) {
throw errCode(new Error('stream closed for writing'), ERR_MPLEX_SINK_ENDED)
}

source = abortableSource(source, anySignal([
abortController.signal,
resetController.signal
Expand Down
2 changes: 1 addition & 1 deletion test/stream.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ async function streamPair (n: number, onInitiatorMessage?: onMessage, onReceiver

// when the initiator sends a CLOSE message, we call close
if (msg.type === MessageTypes.CLOSE_INITIATOR) {
receiver.close()
receiver.closeRead()
}

// when the initiator sends a RESET message, we call close
Expand Down

0 comments on commit 72d0154

Please sign in to comment.