Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

fix: do not treat stream source as pushable #211

Merged
merged 2 commits into from
Sep 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions src/mplex.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { pipe } from 'it-pipe'
import { Pushable, pushableV } from 'it-pushable'
import { pushableV } from 'it-pushable'
import { abortableSource } from 'abortable-iterator'
import { encode } from './encode.js'
import { decode } from './decode.js'
Expand Down Expand Up @@ -44,7 +44,8 @@ function printMessage (msg: Message) {
}

export interface MplexStream extends Stream {
source: Pushable<Uint8ArrayList>
sourceReadableLength: () => number
sourcePush: (data: Uint8ArrayList) => void
}

interface MplexStreamMuxerInit extends MplexInit, StreamMuxerInit {}
Expand Down Expand Up @@ -303,7 +304,7 @@ export class MplexStreamMuxer implements StreamMuxer {
switch (type) {
case MessageTypes.MESSAGE_INITIATOR:
case MessageTypes.MESSAGE_RECEIVER:
if (stream.source.readableLength > maxBufferSize) {
if (stream.sourceReadableLength() > maxBufferSize) {
// Stream buffer has got too large, reset the stream
this._source.push({
id: message.id,
Expand All @@ -318,7 +319,7 @@ export class MplexStreamMuxer implements StreamMuxer {
}

// We got data from the remote, push it into our local stream
stream.source.push(message.data)
stream.sourcePush(message.data)
break
case MessageTypes.CLOSE_INITIATOR:
case MessageTypes.CLOSE_RECEIVER:
Expand Down
24 changes: 17 additions & 7 deletions src/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ export function createStream (options: Options): MplexStream {
}
}

const streamSource = pushable<Uint8ArrayList>({
onEnd: onSourceEnd
})

const stream: MplexStream = {
// Close for both Reading and Writing
close: () => {
Expand All @@ -106,7 +110,7 @@ export function createStream (options: Options): MplexStream {
return
}

stream.source.end()
streamSource.end()
},

// Close for writing
Expand All @@ -132,7 +136,7 @@ export function createStream (options: Options): MplexStream {
abort: (err: Error) => {
log.trace('%s stream %s abort', type, streamName, err)
// End the source with the passed error
stream.source.end(err)
streamSource.end(err)
abortController.abort()
onSinkEnd(err)
},
Expand All @@ -141,7 +145,7 @@ export function createStream (options: Options): MplexStream {
reset: () => {
const err = errCode(new Error('stream reset'), ERR_STREAM_RESET)
resetController.abort()
stream.source.end(err)
streamSource.end(err)
onSinkEnd(err)
},

Expand Down Expand Up @@ -213,7 +217,7 @@ export function createStream (options: Options): MplexStream {
}
}

stream.source.end(err)
streamSource.end(err)
onSinkEnd(err)
return
}
Expand All @@ -227,9 +231,15 @@ export function createStream (options: Options): MplexStream {
onSinkEnd()
},

source: pushable({
onEnd: onSourceEnd
}),
source: streamSource,

sourcePush: (data: Uint8ArrayList) => {
streamSource.push(data)
},

sourceReadableLength () {
return streamSource.readableLength
},

stat: {
direction: type === 'initiator' ? 'outbound' : 'inbound',
Expand Down
10 changes: 5 additions & 5 deletions test/stream.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ async function streamPair (n: number, onInitiatorMessage?: onMessage, onReceiver
onInitiatorMessage(msg, initiator, receiver)
}

receiver.source.push(msgToBuffer(msg))
receiver.sourcePush(msgToBuffer(msg))
}
const mockReceiverSend = (msg: Message) => {
receiverMessages.push(msg)
Expand All @@ -86,7 +86,7 @@ async function streamPair (n: number, onInitiatorMessage?: onMessage, onReceiver
onReceiverMessage(msg, initiator, receiver)
}

initiator.source.push(msgToBuffer(msg))
initiator.sourcePush(msgToBuffer(msg))
}
const initiator = createStream({ id, send: mockInitiatorSend, type: 'initiator' })
const receiver = createStream({ id, send: mockReceiverSend, type: 'receiver' })
Expand Down Expand Up @@ -493,9 +493,9 @@ describe('stream', () => {
const send = (msg: Message) => {
if (msg.type === MessageTypes.CLOSE_INITIATOR) {
// simulate remote closing connection
stream.source.end()
stream.closeRead()
} else if (msg.type === MessageTypes.MESSAGE_INITIATOR) {
stream.source.push(msgToBuffer(msg))
stream.sourcePush(msgToBuffer(msg))
}
}
const id = randomInt(1000)
Expand Down Expand Up @@ -538,7 +538,7 @@ describe('stream', () => {

const send = (msg: Message) => {
if (msg.type === MessageTypes.CLOSE_INITIATOR) {
stream.source.end()
stream.closeRead()
} else if (msg.type === MessageTypes.MESSAGE_INITIATOR) {
messages.push(msg)
}
Expand Down