diff --git a/src/mplex.ts b/src/mplex.ts index 37a9a71..34abe72 100644 --- a/src/mplex.ts +++ b/src/mplex.ts @@ -1,5 +1,5 @@ import { pipe } from 'it-pipe' -import { Pushable, pushableV } from 'it-pushable' +import { pushableV } from 'it-pushable' import { abortableSource } from 'abortable-iterator' import { encode } from './encode.js' import { decode } from './decode.js' @@ -44,7 +44,8 @@ function printMessage (msg: Message) { } export interface MplexStream extends Stream { - source: Pushable + sourceReadableLength: () => number + sourcePush: (data: Uint8ArrayList) => void } interface MplexStreamMuxerInit extends MplexInit, StreamMuxerInit {} @@ -303,7 +304,7 @@ export class MplexStreamMuxer implements StreamMuxer { switch (type) { case MessageTypes.MESSAGE_INITIATOR: case MessageTypes.MESSAGE_RECEIVER: - if (stream.source.readableLength > maxBufferSize) { + if (stream.sourceReadableLength() > maxBufferSize) { // Stream buffer has got too large, reset the stream this._source.push({ id: message.id, @@ -318,7 +319,7 @@ export class MplexStreamMuxer implements StreamMuxer { } // We got data from the remote, push it into our local stream - stream.source.push(message.data) + stream.sourcePush(message.data) break case MessageTypes.CLOSE_INITIATOR: case MessageTypes.CLOSE_RECEIVER: diff --git a/src/stream.ts b/src/stream.ts index 4004f9e..5d09ae0 100644 --- a/src/stream.ts +++ b/src/stream.ts @@ -89,6 +89,10 @@ export function createStream (options: Options): MplexStream { } } + const streamSource = pushable({ + onEnd: onSourceEnd + }) + const stream: MplexStream = { // Close for both Reading and Writing close: () => { @@ -106,7 +110,7 @@ export function createStream (options: Options): MplexStream { return } - stream.source.end() + streamSource.end() }, // Close for writing @@ -132,7 +136,7 @@ export function createStream (options: Options): MplexStream { abort: (err: Error) => { log.trace('%s stream %s abort', type, streamName, err) // End the source with the passed error - stream.source.end(err) + streamSource.end(err) abortController.abort() onSinkEnd(err) }, @@ -141,7 +145,7 @@ export function createStream (options: Options): MplexStream { reset: () => { const err = errCode(new Error('stream reset'), ERR_STREAM_RESET) resetController.abort() - stream.source.end(err) + streamSource.end(err) onSinkEnd(err) }, @@ -213,7 +217,7 @@ export function createStream (options: Options): MplexStream { } } - stream.source.end(err) + streamSource.end(err) onSinkEnd(err) return } @@ -227,9 +231,15 @@ export function createStream (options: Options): MplexStream { onSinkEnd() }, - source: pushable({ - onEnd: onSourceEnd - }), + source: streamSource, + + sourcePush: (data: Uint8ArrayList) => { + streamSource.push(data) + }, + + sourceReadableLength () { + return streamSource.readableLength + }, stat: { direction: type === 'initiator' ? 'outbound' : 'inbound', diff --git a/test/stream.spec.ts b/test/stream.spec.ts index 1dfb5d2..4121a09 100644 --- a/test/stream.spec.ts +++ b/test/stream.spec.ts @@ -77,7 +77,7 @@ async function streamPair (n: number, onInitiatorMessage?: onMessage, onReceiver onInitiatorMessage(msg, initiator, receiver) } - receiver.source.push(msgToBuffer(msg)) + receiver.sourcePush(msgToBuffer(msg)) } const mockReceiverSend = (msg: Message) => { receiverMessages.push(msg) @@ -86,7 +86,7 @@ async function streamPair (n: number, onInitiatorMessage?: onMessage, onReceiver onReceiverMessage(msg, initiator, receiver) } - initiator.source.push(msgToBuffer(msg)) + initiator.sourcePush(msgToBuffer(msg)) } const initiator = createStream({ id, send: mockInitiatorSend, type: 'initiator' }) const receiver = createStream({ id, send: mockReceiverSend, type: 'receiver' }) @@ -493,9 +493,9 @@ describe('stream', () => { const send = (msg: Message) => { if (msg.type === MessageTypes.CLOSE_INITIATOR) { // simulate remote closing connection - stream.source.end() + stream.closeRead() } else if (msg.type === MessageTypes.MESSAGE_INITIATOR) { - stream.source.push(msgToBuffer(msg)) + stream.sourcePush(msgToBuffer(msg)) } } const id = randomInt(1000) @@ -538,7 +538,7 @@ describe('stream', () => { const send = (msg: Message) => { if (msg.type === MessageTypes.CLOSE_INITIATOR) { - stream.source.end() + stream.closeRead() } else if (msg.type === MessageTypes.MESSAGE_INITIATOR) { messages.push(msg) }