Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
fix: do not treat stream source as pushable (#211)
Browse files Browse the repository at this point in the history
Since the `source` and `sink` properties may be changed by downstream code (eg: in libp2p after multiselect completes), the muxer shouldn't rely on the initial `source` implementation staying the same.

Instead, any functionality needed by the muxer is exposed as an additional function attached to the stream.
And any functionality needed by the stream internally is used by referencing a private variable whose reference cannot change.

Co-authored-by: achingbrain <alex@achingbrain.net>
  • Loading branch information
wemeetagain and achingbrain authored Sep 8, 2022
1 parent 8facf2b commit 359c103
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 16 deletions.
9 changes: 5 additions & 4 deletions src/mplex.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { pipe } from 'it-pipe'
import { Pushable, pushableV } from 'it-pushable'
import { pushableV } from 'it-pushable'
import { abortableSource } from 'abortable-iterator'
import { encode } from './encode.js'
import { decode } from './decode.js'
Expand Down Expand Up @@ -44,7 +44,8 @@ function printMessage (msg: Message) {
}

export interface MplexStream extends Stream {
source: Pushable<Uint8ArrayList>
sourceReadableLength: () => number
sourcePush: (data: Uint8ArrayList) => void
}

interface MplexStreamMuxerInit extends MplexInit, StreamMuxerInit {}
Expand Down Expand Up @@ -303,7 +304,7 @@ export class MplexStreamMuxer implements StreamMuxer {
switch (type) {
case MessageTypes.MESSAGE_INITIATOR:
case MessageTypes.MESSAGE_RECEIVER:
if (stream.source.readableLength > maxBufferSize) {
if (stream.sourceReadableLength() > maxBufferSize) {
// Stream buffer has got too large, reset the stream
this._source.push({
id: message.id,
Expand All @@ -318,7 +319,7 @@ export class MplexStreamMuxer implements StreamMuxer {
}

// We got data from the remote, push it into our local stream
stream.source.push(message.data)
stream.sourcePush(message.data)
break
case MessageTypes.CLOSE_INITIATOR:
case MessageTypes.CLOSE_RECEIVER:
Expand Down
24 changes: 17 additions & 7 deletions src/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ export function createStream (options: Options): MplexStream {
}
}

const streamSource = pushable<Uint8ArrayList>({
onEnd: onSourceEnd
})

const stream: MplexStream = {
// Close for both Reading and Writing
close: () => {
Expand All @@ -106,7 +110,7 @@ export function createStream (options: Options): MplexStream {
return
}

stream.source.end()
streamSource.end()
},

// Close for writing
Expand All @@ -132,7 +136,7 @@ export function createStream (options: Options): MplexStream {
abort: (err: Error) => {
log.trace('%s stream %s abort', type, streamName, err)
// End the source with the passed error
stream.source.end(err)
streamSource.end(err)
abortController.abort()
onSinkEnd(err)
},
Expand All @@ -141,7 +145,7 @@ export function createStream (options: Options): MplexStream {
reset: () => {
const err = errCode(new Error('stream reset'), ERR_STREAM_RESET)
resetController.abort()
stream.source.end(err)
streamSource.end(err)
onSinkEnd(err)
},

Expand Down Expand Up @@ -213,7 +217,7 @@ export function createStream (options: Options): MplexStream {
}
}

stream.source.end(err)
streamSource.end(err)
onSinkEnd(err)
return
}
Expand All @@ -227,9 +231,15 @@ export function createStream (options: Options): MplexStream {
onSinkEnd()
},

source: pushable({
onEnd: onSourceEnd
}),
source: streamSource,

sourcePush: (data: Uint8ArrayList) => {
streamSource.push(data)
},

sourceReadableLength () {
return streamSource.readableLength
},

stat: {
direction: type === 'initiator' ? 'outbound' : 'inbound',
Expand Down
10 changes: 5 additions & 5 deletions test/stream.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ async function streamPair (n: number, onInitiatorMessage?: onMessage, onReceiver
onInitiatorMessage(msg, initiator, receiver)
}

receiver.source.push(msgToBuffer(msg))
receiver.sourcePush(msgToBuffer(msg))
}
const mockReceiverSend = (msg: Message) => {
receiverMessages.push(msg)
Expand All @@ -86,7 +86,7 @@ async function streamPair (n: number, onInitiatorMessage?: onMessage, onReceiver
onReceiverMessage(msg, initiator, receiver)
}

initiator.source.push(msgToBuffer(msg))
initiator.sourcePush(msgToBuffer(msg))
}
const initiator = createStream({ id, send: mockInitiatorSend, type: 'initiator' })
const receiver = createStream({ id, send: mockReceiverSend, type: 'receiver' })
Expand Down Expand Up @@ -493,9 +493,9 @@ describe('stream', () => {
const send = (msg: Message) => {
if (msg.type === MessageTypes.CLOSE_INITIATOR) {
// simulate remote closing connection
stream.source.end()
stream.closeRead()
} else if (msg.type === MessageTypes.MESSAGE_INITIATOR) {
stream.source.push(msgToBuffer(msg))
stream.sourcePush(msgToBuffer(msg))
}
}
const id = randomInt(1000)
Expand Down Expand Up @@ -538,7 +538,7 @@ describe('stream', () => {

const send = (msg: Message) => {
if (msg.type === MessageTypes.CLOSE_INITIATOR) {
stream.source.end()
stream.closeRead()
} else if (msg.type === MessageTypes.MESSAGE_INITIATOR) {
messages.push(msg)
}
Expand Down

0 comments on commit 359c103

Please sign in to comment.