Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

fix: apply message size limit before decoding message #231

Merged
merged 2 commits into from
Nov 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 18 additions & 7 deletions src/decode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import { Uint8ArrayList } from 'uint8arraylist'
import type { Source } from 'it-stream-types'
import type { Message } from './message-types.js'

export const MAX_MSG_SIZE = 1 << 20 // 1MB

interface MessageHeader {
id: number
type: keyof typeof MessageTypeNames
Expand All @@ -13,10 +15,12 @@ interface MessageHeader {
class Decoder {
private readonly _buffer: Uint8ArrayList
private _headerInfo: MessageHeader | null
private readonly _maxMessageSize: number

constructor () {
constructor (maxMessageSize: number = MAX_MSG_SIZE) {
this._buffer = new Uint8ArrayList()
this._headerInfo = null
this._maxMessageSize = maxMessageSize
}

write (chunk: Uint8Array) {
Expand All @@ -25,6 +29,11 @@ class Decoder {
}

this._buffer.append(chunk)

if (this._buffer.byteLength > this._maxMessageSize) {
throw Object.assign(new Error('message size too large!'), { code: 'ERR_MSG_TOO_BIG' })
}

const msgs: Message[] = []

while (this._buffer.length !== 0) {
Expand Down Expand Up @@ -119,14 +128,16 @@ function readVarInt (buf: Uint8ArrayList, offset: number = 0) {
/**
* Decode a chunk and yield an _array_ of decoded messages
*/
export async function * decode (source: Source<Uint8Array>) {
const decoder = new Decoder()
export function decode (maxMessageSize: number = MAX_MSG_SIZE) {
return async function * decodeMessages (source: Source<Uint8Array>): Source<Message> {
const decoder = new Decoder(maxMessageSize)

for await (const chunk of source) {
const msgs = decoder.write(chunk)
for await (const chunk of source) {
const msgs = decoder.write(chunk)

if (msgs.length > 0) {
yield msgs
if (msgs.length > 0) {
yield * msgs
}
}
}
}
4 changes: 1 addition & 3 deletions src/mplex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import { pushableV } from 'it-pushable'
import { abortableSource } from 'abortable-iterator'
import { encode } from './encode.js'
import { decode } from './decode.js'
import { restrictSize } from './restrict-size.js'
import { MessageTypes, MessageTypeNames, Message } from './message-types.js'
import { createStream } from './stream.js'
import { toString as uint8ArrayToString } from 'uint8arrays'
Expand Down Expand Up @@ -204,8 +203,7 @@ export class MplexStreamMuxer implements StreamMuxer {
try {
await pipe(
source,
decode,
restrictSize(this._init.maxMsgSize),
decode(this._init.maxMsgSize),
async source => {
for await (const msg of source) {
await this._handleIncoming(msg)
Expand Down
36 changes: 0 additions & 36 deletions src/restrict-size.ts

This file was deleted.

2 changes: 1 addition & 1 deletion src/stream.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { abortableSource } from 'abortable-iterator'
import { pushable } from 'it-pushable'
import errCode from 'err-code'
import { MAX_MSG_SIZE } from './restrict-size.js'
import { MAX_MSG_SIZE } from './decode.js'
import { anySignal } from 'any-signal'
import { InitiatorMessageTypes, ReceiverMessageTypes } from './message-types.js'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
Expand Down
15 changes: 6 additions & 9 deletions test/coder.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,8 @@ describe('coder', () => {

it('should decode header', async () => {
const source = [uint8ArrayFromString('8801023137', 'base16')]
for await (const msgs of decode(source)) {
expect(msgs.length).to.equal(1)

expect(messageWithBytes(msgs[0])).to.be.deep.equal({ id: 17, type: 0, data: uint8ArrayFromString('17') })
for await (const msg of decode()(source)) {
expect(messageWithBytes(msg)).to.be.deep.equal({ id: 17, type: 0, data: uint8ArrayFromString('17') })
}
})

Expand Down Expand Up @@ -67,8 +65,8 @@ describe('coder', () => {
const source = [uint8ArrayFromString('88010231379801023139a801023231', 'base16')]

const res = []
for await (const msgs of decode(source)) {
res.push(...msgs)
for await (const msg of decode()(source)) {
res.push(msg)
}

expect(res.map(messageWithBytes)).to.deep.equal([
Expand All @@ -89,9 +87,8 @@ describe('coder', () => {
it('should decode zero length body msg', async () => {
const source = [uint8ArrayFromString('880100', 'base16')]

for await (const msgs of decode(source)) {
expect(msgs.length).to.equal(1)
expect(messageWithBytes(msgs[0])).to.be.eql({ id: 17, type: 0, data: new Uint8Array(0) })
for await (const msg of decode()(source)) {
expect(messageWithBytes(msg)).to.be.eql({ id: 17, type: 0, data: new Uint8Array(0) })
}
})
})
16 changes: 8 additions & 8 deletions test/mplex.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,10 @@ describe('mplex', () => {

await muxer.sink(stream)

const messages = await all(decode(bufs))
const messages = await all(decode()(bufs))

expect(messages).to.have.nested.property('[0][0].id', 11, 'Did not specify the correct stream id')
expect(messages).to.have.nested.property('[0][0].type', MessageTypes.RESET_RECEIVER, 'Did not reset the stream that tipped us over the inbound stream limit')
expect(messages).to.have.nested.property('[0].id', 11, 'Did not specify the correct stream id')
expect(messages).to.have.nested.property('[0].type', MessageTypes.RESET_RECEIVER, 'Did not reset the stream that tipped us over the inbound stream limit')
})

it('should reset a stream that fills the message buffer', async () => {
Expand All @@ -103,7 +103,7 @@ describe('mplex', () => {
const dataMessage: MessageInitiatorMessage = {
id,
type: MessageTypes.MESSAGE_INITIATOR,
data: new Uint8ArrayList(new Uint8Array(1024 * 1024))
data: new Uint8ArrayList(new Uint8Array(1024 * 1000))
}
yield dataMessage

Expand Down Expand Up @@ -144,9 +144,9 @@ describe('mplex', () => {

// collect outgoing mplex messages
const muxerFinished = pDefer()
let messages: Message[][] = []
let messages: Message[] = []
void Promise.resolve().then(async () => {
messages = await all(decode(muxer.source))
messages = await all(decode()(muxer.source))
muxerFinished.resolve()
})

Expand All @@ -159,7 +159,7 @@ describe('mplex', () => {

// should have sent reset message to peer for this stream
await muxerFinished.promise
expect(messages).to.have.nested.property('[0][0].id', id)
expect(messages).to.have.nested.property('[0][0].type', MessageTypes.RESET_RECEIVER)
expect(messages).to.have.nested.property('[0].id', id)
expect(messages).to.have.nested.property('[0].type', MessageTypes.RESET_RECEIVER)
})
})
15 changes: 9 additions & 6 deletions test/restrict-size.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,28 @@ import all from 'it-all'
import drain from 'it-drain'
import each from 'it-foreach'
import { Message, MessageTypes } from '../src/message-types.js'
import { restrictSize } from '../src/restrict-size.js'
import { encode } from '../src/encode.js'
import { decode } from '../src/decode.js'
import { Uint8ArrayList } from 'uint8arraylist'

describe('restrict-size', () => {
describe('restrict size', () => {
it('should throw when size is too big', async () => {
const maxSize = 32

const input: Message[] = [
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(8)) },
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(16)) },
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(maxSize)) },
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(64)) },
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(16)) }
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(64)) }
]

const output: Message[] = []

try {
await pipe(
input,
restrictSize(maxSize),
encode,
decode(maxSize),
(source) => each(source, chunk => {
output.push(chunk)
}),
Expand All @@ -51,7 +53,8 @@ describe('restrict-size', () => {

const output = await pipe(
input,
restrictSize(32),
encode,
decode(32),
async (source) => await all(source)
)
expect(output).to.deep.equal(input)
Expand Down