Skip to content

Commit

Permalink
fix(@libp2p/webrtc): set max message size in alignment with spec (#2050)
Browse files Browse the repository at this point in the history
  • Loading branch information
maschad authored Sep 15, 2023
1 parent d9159dd commit 122f1e6
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 17 deletions.
19 changes: 13 additions & 6 deletions packages/transport-webrtc/src/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,19 @@ export interface WebRTCStreamInit extends AbstractStreamInit {
}

// Max message size that can be sent to the DataChannel
const MAX_MESSAGE_SIZE = 16 * 1024
export const MAX_MESSAGE_SIZE = 16 * 1024

// How much can be buffered to the DataChannel at once
const MAX_BUFFERED_AMOUNT = 16 * 1024 * 1024
export const MAX_BUFFERED_AMOUNT = 16 * 1024 * 1024

// How long time we wait for the 'bufferedamountlow' event to be emitted
const BUFFERED_AMOUNT_LOW_TIMEOUT = 30 * 1000
export const BUFFERED_AMOUNT_LOW_TIMEOUT = 30 * 1000

// protobuf field definition overhead
const PROTOBUF_OVERHEAD = 3
export const PROTOBUF_OVERHEAD = 5

// Length of varint, in bytes.
export const VARINT_LENGTH = 2

export class WebRTCStream extends AbstractStream {
/**
Expand All @@ -58,6 +61,10 @@ export class WebRTCStream extends AbstractStream {
private readonly incomingData: Pushable<Uint8Array>

private messageQueue?: Uint8ArrayList

/**
* The maximum size of a message in bytes
*/
private readonly maxDataSize: number

constructor (init: WebRTCStreamInit) {
Expand All @@ -70,7 +77,7 @@ export class WebRTCStream extends AbstractStream {
this.dataChannelOptions = {
bufferedAmountLowEventTimeout: init.dataChannelOptions?.bufferedAmountLowEventTimeout ?? BUFFERED_AMOUNT_LOW_TIMEOUT,
maxBufferedAmount: init.dataChannelOptions?.maxBufferedAmount ?? MAX_BUFFERED_AMOUNT,
maxMessageSize: init.dataChannelOptions?.maxMessageSize ?? MAX_MESSAGE_SIZE
maxMessageSize: init.dataChannelOptions?.maxMessageSize ?? init.maxDataSize
}
this.maxDataSize = init.maxDataSize

Expand Down Expand Up @@ -275,7 +282,7 @@ export function createStream (options: WebRTCStreamOptions): WebRTCStream {
return new WebRTCStream({
id: direction === 'inbound' ? (`i${channel.id}`) : `r${channel.id}`,
direction,
maxDataSize: (dataChannelOptions?.maxMessageSize ?? MAX_MESSAGE_SIZE) - PROTOBUF_OVERHEAD,
maxDataSize: (dataChannelOptions?.maxMessageSize ?? MAX_MESSAGE_SIZE) - PROTOBUF_OVERHEAD - VARINT_LENGTH,
dataChannelOptions,
onEnd,
channel,
Expand Down
18 changes: 7 additions & 11 deletions packages/transport-webrtc/test/stream.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,22 @@ import * as lengthPrefixed from 'it-length-prefixed'
import { pushable } from 'it-pushable'
import { Uint8ArrayList } from 'uint8arraylist'
import { Message } from '../src/pb/message.js'
import { createStream } from '../src/stream.js'
import { MAX_BUFFERED_AMOUNT, MAX_MESSAGE_SIZE, PROTOBUF_OVERHEAD, createStream } from '../src/stream.js'

const mockDataChannel = (opts: { send: (bytes: Uint8Array) => void, bufferedAmount?: number }): RTCDataChannel => {
return {
readyState: 'open',
close: () => {},
addEventListener: (_type: string, _listener: () => void) => {},
removeEventListener: (_type: string, _listener: () => void) => {},
close: () => { },
addEventListener: (_type: string, _listener: () => void) => { },
removeEventListener: (_type: string, _listener: () => void) => { },
...opts
} as RTCDataChannel
}

const MAX_MESSAGE_SIZE = 16 * 1024

describe('Max message size', () => {
it(`sends messages smaller or equal to ${MAX_MESSAGE_SIZE} bytes in one`, async () => {
const sent: Uint8ArrayList = new Uint8ArrayList()
const data = new Uint8Array(MAX_MESSAGE_SIZE - 5)
const data = new Uint8Array(MAX_MESSAGE_SIZE - PROTOBUF_OVERHEAD)
const p = pushable()

// Make sure that the data that ought to be sent will result in a message with exactly MAX_MESSAGE_SIZE
Expand All @@ -42,8 +40,7 @@ describe('Max message size', () => {
p.end()
await webrtcStream.sink(p)

// length(message) + message + length(FIN) + FIN
expect(length(sent)).to.equal(4)
expect(length(sent)).to.equal(6)

for (const buf of sent) {
expect(buf.byteLength).to.be.lessThanOrEqual(MAX_MESSAGE_SIZE)
Expand Down Expand Up @@ -80,7 +77,6 @@ describe('Max message size', () => {
})

it('closes the stream if bufferamountlow timeout', async () => {
const MAX_BUFFERED_AMOUNT = 16 * 1024 * 1024 + 1
const timeout = 100
let closed = false
const webrtcStream = createStream({
Expand All @@ -91,7 +87,7 @@ describe('Max message size', () => {
send: () => {
throw new Error('Expected to not send')
},
bufferedAmount: MAX_BUFFERED_AMOUNT
bufferedAmount: MAX_BUFFERED_AMOUNT + 1
}),
direction: 'outbound',
onEnd: () => {
Expand Down

0 comments on commit 122f1e6

Please sign in to comment.