Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
fix: apply message size limit before decoding message
Browse files Browse the repository at this point in the history
If we apply the message size limit after decoding the message it's
too late as we've already processed the bad message.

Instead, if the buffer full of unprocessed messages grows to be
large than the max message size (e.g. we have not recieved a complete
message under the size limit), throw an error which will cause the
stream to be reset.
  • Loading branch information
achingbrain committed Nov 23, 2022
1 parent c813bae commit 91d2e8c
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 74 deletions.
25 changes: 16 additions & 9 deletions src/decode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import { Uint8ArrayList } from 'uint8arraylist'
import type { Source } from 'it-stream-types'
import type { Message } from './message-types.js'

export const MAX_MSG_SIZE = Math.pow(2, 20)

interface MessageHeader {
id: number
type: keyof typeof MessageTypeNames
Expand All @@ -13,10 +15,12 @@ interface MessageHeader {
class Decoder {
private readonly _buffer: Uint8ArrayList
private _headerInfo: MessageHeader | null
private readonly _maxMessageSize: number

constructor () {
constructor (maxMessageSize: number) {
this._buffer = new Uint8ArrayList()
this._headerInfo = null
this._maxMessageSize = maxMessageSize
}

write (chunk: Uint8Array) {
Expand All @@ -25,6 +29,11 @@ class Decoder {
}

this._buffer.append(chunk)

if (this._buffer.byteLength > this._maxMessageSize) {
throw Object.assign(new Error('message size too large!'), { code: 'ERR_MSG_TOO_BIG' })
}

const msgs: Message[] = []

while (this._buffer.length !== 0) {
Expand Down Expand Up @@ -117,16 +126,14 @@ function readVarInt (buf: Uint8ArrayList, offset: number = 0) {
}

/**
* Decode a chunk and yield an _array_ of decoded messages
* Decode a chunk and yield decoded messages
*/
export async function * decode (source: Source<Uint8Array>) {
const decoder = new Decoder()

for await (const chunk of source) {
const msgs = decoder.write(chunk)
export function decode (maxMessageSize: number = MAX_MSG_SIZE) {
return async function * decodeMessages (source: Source<Uint8Array>) {
const decoder = new Decoder(maxMessageSize)

if (msgs.length > 0) {
yield msgs
for await (const chunk of source) {
yield * decoder.write(chunk)
}
}
}
4 changes: 1 addition & 3 deletions src/mplex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import { pushableV } from 'it-pushable'
import { abortableSource } from 'abortable-iterator'
import { encode } from './encode.js'
import { decode } from './decode.js'
import { restrictSize } from './restrict-size.js'
import { MessageTypes, MessageTypeNames, Message } from './message-types.js'
import { createStream } from './stream.js'
import { toString as uint8ArrayToString } from 'uint8arrays'
Expand Down Expand Up @@ -204,8 +203,7 @@ export class MplexStreamMuxer implements StreamMuxer {
try {
await pipe(
source,
decode,
restrictSize(this._init.maxMsgSize),
decode(this._init.maxMsgSize),
async source => {
for await (const msg of source) {
await this._handleIncoming(msg)
Expand Down
36 changes: 0 additions & 36 deletions src/restrict-size.ts

This file was deleted.

2 changes: 1 addition & 1 deletion src/stream.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { abortableSource } from 'abortable-iterator'
import { pushable } from 'it-pushable'
import errCode from 'err-code'
import { MAX_MSG_SIZE } from './restrict-size.js'
import { MAX_MSG_SIZE } from './decode.js'
import { anySignal } from 'any-signal'
import { InitiatorMessageTypes, ReceiverMessageTypes } from './message-types.js'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
Expand Down
15 changes: 6 additions & 9 deletions test/coder.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,8 @@ describe('coder', () => {

it('should decode header', async () => {
const source = [uint8ArrayFromString('8801023137', 'base16')]
for await (const msgs of decode(source)) {
expect(msgs.length).to.equal(1)

expect(messageWithBytes(msgs[0])).to.be.deep.equal({ id: 17, type: 0, data: uint8ArrayFromString('17') })
for await (const msg of decode()(source)) {
expect(messageWithBytes(msg)).to.be.deep.equal({ id: 17, type: 0, data: uint8ArrayFromString('17') })
}
})

Expand Down Expand Up @@ -67,8 +65,8 @@ describe('coder', () => {
const source = [uint8ArrayFromString('88010231379801023139a801023231', 'base16')]

const res = []
for await (const msgs of decode(source)) {
res.push(...msgs)
for await (const msg of decode()(source)) {
res.push(msg)
}

expect(res.map(messageWithBytes)).to.deep.equal([
Expand All @@ -89,9 +87,8 @@ describe('coder', () => {
it('should decode zero length body msg', async () => {
const source = [uint8ArrayFromString('880100', 'base16')]

for await (const msgs of decode(source)) {
expect(msgs.length).to.equal(1)
expect(messageWithBytes(msgs[0])).to.be.eql({ id: 17, type: 0, data: new Uint8Array(0) })
for await (const msg of decode()(source)) {
expect(messageWithBytes(msg)).to.be.eql({ id: 17, type: 0, data: new Uint8Array(0) })
}
})
})
16 changes: 8 additions & 8 deletions test/mplex.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,10 @@ describe('mplex', () => {

await muxer.sink(stream)

const messages = await all(decode(bufs))
const messages = await all(decode()(bufs))

expect(messages).to.have.nested.property('[0][0].id', 11, 'Did not specify the correct stream id')
expect(messages).to.have.nested.property('[0][0].type', MessageTypes.RESET_RECEIVER, 'Did not reset the stream that tipped us over the inbound stream limit')
expect(messages).to.have.nested.property('[0].id', 11, 'Did not specify the correct stream id')
expect(messages).to.have.nested.property('[0].type', MessageTypes.RESET_RECEIVER, 'Did not reset the stream that tipped us over the inbound stream limit')
})

it('should reset a stream that fills the message buffer', async () => {
Expand All @@ -103,7 +103,7 @@ describe('mplex', () => {
const dataMessage: MessageInitiatorMessage = {
id,
type: MessageTypes.MESSAGE_INITIATOR,
data: new Uint8ArrayList(new Uint8Array(1024 * 1024))
data: new Uint8ArrayList(new Uint8Array(1024 * 1000))
}
yield dataMessage

Expand Down Expand Up @@ -144,9 +144,9 @@ describe('mplex', () => {

// collect outgoing mplex messages
const muxerFinished = pDefer()
let messages: Message[][] = []
let messages: Message[] = []
void Promise.resolve().then(async () => {
messages = await all(decode(muxer.source))
messages = await all(decode()(muxer.source))
muxerFinished.resolve()
})

Expand All @@ -159,7 +159,7 @@ describe('mplex', () => {

// should have sent reset message to peer for this stream
await muxerFinished.promise
expect(messages).to.have.nested.property('[0][0].id', id)
expect(messages).to.have.nested.property('[0][0].type', MessageTypes.RESET_RECEIVER)
expect(messages).to.have.nested.property('[0].id', id)
expect(messages).to.have.nested.property('[0].type', MessageTypes.RESET_RECEIVER)
})
})
19 changes: 11 additions & 8 deletions test/restrict-size.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,30 @@ import all from 'it-all'
import drain from 'it-drain'
import each from 'it-foreach'
import { Message, MessageTypes } from '../src/message-types.js'
import { restrictSize } from '../src/restrict-size.js'
import { Uint8ArrayList } from 'uint8arraylist'
import { decode } from '../src/decode.js'
import { encode } from '../src/encode.js'

describe('restrict-size', () => {
describe('restrict size', () => {
it('should throw when size is too big', async () => {
const maxSize = 32

const input: Message[] = [
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(8)) },
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(16)) },
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(maxSize)) },
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(64)) },
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(16)) }
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(10)) }
]

const output: Message[] = []

try {
await pipe(
input,
restrictSize(maxSize),
(source) => each(source, chunk => {
output.push(chunk)
encode,
decode(maxSize),
(source) => each(source, msg => {
output.push(msg)
}),
async (source) => await drain(source)
)
Expand All @@ -51,7 +53,8 @@ describe('restrict-size', () => {

const output = await pipe(
input,
restrictSize(32),
encode,
decode(32),
async (source) => await all(source)
)
expect(output).to.deep.equal(input)
Expand Down

0 comments on commit 91d2e8c

Please sign in to comment.