diff --git a/packages/transport-webrtc/src/stream.ts b/packages/transport-webrtc/src/stream.ts index 756cf7895d..1e9bb0c3c7 100644 --- a/packages/transport-webrtc/src/stream.ts +++ b/packages/transport-webrtc/src/stream.ts @@ -29,16 +29,19 @@ export interface WebRTCStreamInit extends AbstractStreamInit { } // Max message size that can be sent to the DataChannel -const MAX_MESSAGE_SIZE = 16 * 1024 +export const MAX_MESSAGE_SIZE = 16 * 1024 // How much can be buffered to the DataChannel at once -const MAX_BUFFERED_AMOUNT = 16 * 1024 * 1024 +export const MAX_BUFFERED_AMOUNT = 16 * 1024 * 1024 // How long time we wait for the 'bufferedamountlow' event to be emitted -const BUFFERED_AMOUNT_LOW_TIMEOUT = 30 * 1000 +export const BUFFERED_AMOUNT_LOW_TIMEOUT = 30 * 1000 // protobuf field definition overhead -const PROTOBUF_OVERHEAD = 3 +export const PROTOBUF_OVERHEAD = 5 + +// Length of varint, in bytes. +export const VARINT_LENGTH = 2 export class WebRTCStream extends AbstractStream { /** @@ -58,6 +61,10 @@ export class WebRTCStream extends AbstractStream { private readonly incomingData: Pushable private messageQueue?: Uint8ArrayList + + /** + * The maximum size of a message in bytes + */ private readonly maxDataSize: number constructor (init: WebRTCStreamInit) { @@ -70,7 +77,7 @@ export class WebRTCStream extends AbstractStream { this.dataChannelOptions = { bufferedAmountLowEventTimeout: init.dataChannelOptions?.bufferedAmountLowEventTimeout ?? BUFFERED_AMOUNT_LOW_TIMEOUT, maxBufferedAmount: init.dataChannelOptions?.maxBufferedAmount ?? MAX_BUFFERED_AMOUNT, - maxMessageSize: init.dataChannelOptions?.maxMessageSize ?? MAX_MESSAGE_SIZE + maxMessageSize: init.dataChannelOptions?.maxMessageSize ?? init.maxDataSize } this.maxDataSize = init.maxDataSize @@ -275,7 +282,7 @@ export function createStream (options: WebRTCStreamOptions): WebRTCStream { return new WebRTCStream({ id: direction === 'inbound' ? (`i${channel.id}`) : `r${channel.id}`, direction, - maxDataSize: (dataChannelOptions?.maxMessageSize ?? MAX_MESSAGE_SIZE) - PROTOBUF_OVERHEAD, + maxDataSize: (dataChannelOptions?.maxMessageSize ?? MAX_MESSAGE_SIZE) - PROTOBUF_OVERHEAD - VARINT_LENGTH, dataChannelOptions, onEnd, channel, diff --git a/packages/transport-webrtc/test/stream.spec.ts b/packages/transport-webrtc/test/stream.spec.ts index 1812ca2c7a..500cbb02de 100644 --- a/packages/transport-webrtc/test/stream.spec.ts +++ b/packages/transport-webrtc/test/stream.spec.ts @@ -6,24 +6,22 @@ import * as lengthPrefixed from 'it-length-prefixed' import { pushable } from 'it-pushable' import { Uint8ArrayList } from 'uint8arraylist' import { Message } from '../src/pb/message.js' -import { createStream } from '../src/stream.js' +import { MAX_BUFFERED_AMOUNT, MAX_MESSAGE_SIZE, PROTOBUF_OVERHEAD, createStream } from '../src/stream.js' const mockDataChannel = (opts: { send: (bytes: Uint8Array) => void, bufferedAmount?: number }): RTCDataChannel => { return { readyState: 'open', - close: () => {}, - addEventListener: (_type: string, _listener: () => void) => {}, - removeEventListener: (_type: string, _listener: () => void) => {}, + close: () => { }, + addEventListener: (_type: string, _listener: () => void) => { }, + removeEventListener: (_type: string, _listener: () => void) => { }, ...opts } as RTCDataChannel } -const MAX_MESSAGE_SIZE = 16 * 1024 - describe('Max message size', () => { it(`sends messages smaller or equal to ${MAX_MESSAGE_SIZE} bytes in one`, async () => { const sent: Uint8ArrayList = new Uint8ArrayList() - const data = new Uint8Array(MAX_MESSAGE_SIZE - 5) + const data = new Uint8Array(MAX_MESSAGE_SIZE - PROTOBUF_OVERHEAD) const p = pushable() // Make sure that the data that ought to be sent will result in a message with exactly MAX_MESSAGE_SIZE @@ -42,8 +40,7 @@ describe('Max message size', () => { p.end() await webrtcStream.sink(p) - // length(message) + message + length(FIN) + FIN - expect(length(sent)).to.equal(4) + expect(length(sent)).to.equal(6) for (const buf of sent) { expect(buf.byteLength).to.be.lessThanOrEqual(MAX_MESSAGE_SIZE) @@ -80,7 +77,6 @@ describe('Max message size', () => { }) it('closes the stream if bufferamountlow timeout', async () => { - const MAX_BUFFERED_AMOUNT = 16 * 1024 * 1024 + 1 const timeout = 100 let closed = false const webrtcStream = createStream({ @@ -91,7 +87,7 @@ describe('Max message size', () => { send: () => { throw new Error('Expected to not send') }, - bufferedAmount: MAX_BUFFERED_AMOUNT + bufferedAmount: MAX_BUFFERED_AMOUNT + 1 }), direction: 'outbound', onEnd: () => {