From 93025998e99a2faf26eedd05a1e65da32b0ac7f2 Mon Sep 17 00:00:00 2001 From: Khafra Date: Sat, 11 May 2024 16:36:34 -0400 Subject: [PATCH] refactor websocket control frame handling (#3241) Co-authored-by: tai-kun --- lib/web/websocket/receiver.js | 253 ++++++++++++++++++---------------- lib/web/websocket/util.js | 15 +- test/websocket/issue-2859.js | 38 +++++ 3 files changed, 189 insertions(+), 117 deletions(-) create mode 100644 test/websocket/issue-2859.js diff --git a/lib/web/websocket/receiver.js b/lib/web/websocket/receiver.js index d1af7b46349..a63e7426918 100644 --- a/lib/web/websocket/receiver.js +++ b/lib/web/websocket/receiver.js @@ -1,10 +1,11 @@ 'use strict' const { Writable } = require('node:stream') +const assert = require('node:assert') const { parserStates, opcodes, states, emptyBuffer, sentCloseFrameState } = require('./constants') const { kReadyState, kSentClose, kResponse, kReceivedClose } = require('./symbols') const { channels } = require('../../core/diagnostics') -const { isValidStatusCode, failWebsocketConnection, websocketMessageReceived, utf8Decode } = require('./util') +const { isValidStatusCode, failWebsocketConnection, websocketMessageReceived, utf8Decode, isControlFrame } = require('./util') const { WebsocketFrameSend } = require('./frame') const { CloseEvent } = require('./events') @@ -53,23 +54,18 @@ class ByteParser extends Writable { } const buffer = this.consume(2) + const fin = (buffer[0] & 0x80) !== 0 + const opcode = buffer[0] & 0x0F + const masked = (buffer[1] & 0x80) === 0x80 - this.#info.fin = (buffer[0] & 0x80) !== 0 - this.#info.opcode = buffer[0] & 0x0F - this.#info.masked = (buffer[1] & 0x80) === 0x80 - - if (this.#info.masked) { + if (masked) { failWebsocketConnection(this.ws, 'Frame cannot be masked') return callback() } - // If we receive a fragmented message, we use the type of the first - // frame to parse the full message as binary/text, when it's terminated - this.#info.originalOpcode ??= this.#info.opcode - - this.#info.fragmented = !this.#info.fin && this.#info.opcode !== opcodes.CONTINUATION + const fragmented = !fin && opcode !== opcodes.CONTINUATION - if (this.#info.fragmented && this.#info.opcode !== opcodes.BINARY && this.#info.opcode !== opcodes.TEXT) { + if (fragmented && opcode !== opcodes.BINARY && opcode !== opcodes.TEXT) { // Only text and binary frames can be fragmented failWebsocketConnection(this.ws, 'Invalid frame type was fragmented.') return @@ -77,6 +73,20 @@ class ByteParser extends Writable { const payloadLength = buffer[1] & 0x7F + if (isControlFrame(opcode)) { + const loop = this.parseControlFrame(callback, { + opcode, + fragmented, + payloadLength + }) + + if (loop) { + continue + } else { + return + } + } + if (payloadLength <= 125) { this.#info.payloadLength = payloadLength this.#state = parserStates.READ_DATA @@ -86,114 +96,18 @@ class ByteParser extends Writable { this.#state = parserStates.PAYLOADLENGTH_64 } + // TODO(@KhafraDev): handle continuation frames separately as their + // semantics are different from TEXT/BINARY frames. + this.#info.originalOpcode ??= opcode + this.#info.opcode = opcode + this.#info.masked = masked + this.#info.fin = fin + this.#info.fragmented = fragmented + if (this.#info.fragmented && payloadLength > 125) { // A fragmented frame can't be fragmented itself failWebsocketConnection(this.ws, 'Fragmented frame exceeded 125 bytes.') return - } else if ( - (this.#info.opcode === opcodes.PING || - this.#info.opcode === opcodes.PONG || - this.#info.opcode === opcodes.CLOSE) && - payloadLength > 125 - ) { - // Control frames can have a payload length of 125 bytes MAX - failWebsocketConnection(this.ws, 'Payload length for control frame exceeded 125 bytes.') - return - } else if (this.#info.opcode === opcodes.CLOSE) { - if (payloadLength === 1) { - failWebsocketConnection(this.ws, 'Received close frame with a 1-byte body.') - return - } - - const body = this.consume(payloadLength) - - this.#info.closeInfo = this.parseCloseBody(body) - - if (this.#info.closeInfo.error) { - const { code, reason } = this.#info.closeInfo - - callback(new CloseEvent('close', { wasClean: false, reason, code })) - return - } - - if (this.ws[kSentClose] !== sentCloseFrameState.SENT) { - // If an endpoint receives a Close frame and did not previously send a - // Close frame, the endpoint MUST send a Close frame in response. (When - // sending a Close frame in response, the endpoint typically echos the - // status code it received.) - let body = emptyBuffer - if (this.#info.closeInfo.code) { - body = Buffer.allocUnsafe(2) - body.writeUInt16BE(this.#info.closeInfo.code, 0) - } - const closeFrame = new WebsocketFrameSend(body) - - this.ws[kResponse].socket.write( - closeFrame.createFrame(opcodes.CLOSE), - (err) => { - if (!err) { - this.ws[kSentClose] = sentCloseFrameState.SENT - } - } - ) - } - - // Upon either sending or receiving a Close control frame, it is said - // that _The WebSocket Closing Handshake is Started_ and that the - // WebSocket connection is in the CLOSING state. - this.ws[kReadyState] = states.CLOSING - this.ws[kReceivedClose] = true - - this.end() - - return - } else if (this.#info.opcode === opcodes.PING) { - // Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in - // response, unless it already received a Close frame. - // A Pong frame sent in response to a Ping frame must have identical - // "Application data" - - const body = this.consume(payloadLength) - - if (!this.ws[kReceivedClose]) { - const frame = new WebsocketFrameSend(body) - - this.ws[kResponse].socket.write(frame.createFrame(opcodes.PONG)) - - if (channels.ping.hasSubscribers) { - channels.ping.publish({ - payload: body - }) - } - } - - this.#state = parserStates.INFO - - if (this.#byteOffset > 0) { - continue - } else { - callback() - return - } - } else if (this.#info.opcode === opcodes.PONG) { - // A Pong frame MAY be sent unsolicited. This serves as a - // unidirectional heartbeat. A response to an unsolicited Pong frame is - // not expected. - - const body = this.consume(payloadLength) - - if (channels.pong.hasSubscribers) { - channels.pong.publish({ - payload: body - }) - } - - if (this.#byteOffset > 0) { - continue - } else { - callback() - return - } } } else if (this.#state === parserStates.PAYLOADLENGTH_16) { if (this.#byteOffset < 2) { @@ -303,6 +217,8 @@ class ByteParser extends Writable { } parseCloseBody (data) { + assert(data.length !== 1) + // https://datatracker.ietf.org/doc/html/rfc6455#section-7.1.5 /** @type {number|undefined} */ let code @@ -336,6 +252,111 @@ class ByteParser extends Writable { return { code, reason, error: false } } + /** + * Parses control frames. + * @param {Buffer} data + * @param {(err?: Error) => void} callback + * @param {{ opcode: number, fragmented: boolean, payloadLength: number }} info + */ + parseControlFrame (callback, info) { + assert(!info.fragmented) + + if (info.payloadLength > 125) { + // Control frames can have a payload length of 125 bytes MAX + callback(new Error('Payload length for control frame exceeded 125 bytes.')) + return false + } + + const body = this.consume(info.payloadLength) + + if (info.opcode === opcodes.CLOSE) { + if (info.payloadLength === 1) { + failWebsocketConnection(this.ws, 'Received close frame with a 1-byte body.') + return + } + + this.#info.closeInfo = this.parseCloseBody(body) + + if (this.#info.closeInfo.error) { + const { code, reason } = this.#info.closeInfo + + callback(new CloseEvent('close', { wasClean: false, reason, code })) + return + } + + if (this.ws[kSentClose] !== sentCloseFrameState.SENT) { + // If an endpoint receives a Close frame and did not previously send a + // Close frame, the endpoint MUST send a Close frame in response. (When + // sending a Close frame in response, the endpoint typically echos the + // status code it received.) + let body = emptyBuffer + if (this.#info.closeInfo.code) { + body = Buffer.allocUnsafe(2) + body.writeUInt16BE(this.#info.closeInfo.code, 0) + } + const closeFrame = new WebsocketFrameSend(body) + + this.ws[kResponse].socket.write( + closeFrame.createFrame(opcodes.CLOSE), + (err) => { + if (!err) { + this.ws[kSentClose] = sentCloseFrameState.SENT + } + } + ) + } + + // Upon either sending or receiving a Close control frame, it is said + // that _The WebSocket Closing Handshake is Started_ and that the + // WebSocket connection is in the CLOSING state. + this.ws[kReadyState] = states.CLOSING + this.ws[kReceivedClose] = true + + this.end() + + return + } else if (info.opcode === opcodes.PING) { + // Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in + // response, unless it already received a Close frame. + // A Pong frame sent in response to a Ping frame must have identical + // "Application data" + + if (!this.ws[kReceivedClose]) { + const frame = new WebsocketFrameSend(body) + + this.ws[kResponse].socket.write(frame.createFrame(opcodes.PONG)) + + if (channels.ping.hasSubscribers) { + channels.ping.publish({ + payload: body + }) + } + } + + if (this.#byteOffset <= 0) { + callback() + return false + } + } else if (info.opcode === opcodes.PONG) { + // A Pong frame MAY be sent unsolicited. This serves as a + // unidirectional heartbeat. A response to an unsolicited Pong frame is + // not expected. + + if (channels.pong.hasSubscribers) { + channels.pong.publish({ + payload: body + }) + } + + if (this.#byteOffset <= 0) { + callback() + return false + } + } + + return true + } + get closingInfo () { return this.#info.closeInfo } diff --git a/lib/web/websocket/util.js b/lib/web/websocket/util.js index b023e1f1f60..9a984128858 100644 --- a/lib/web/websocket/util.js +++ b/lib/web/websocket/util.js @@ -210,6 +210,18 @@ function failWebsocketConnection (ws, reason) { } } +/** + * @see https://datatracker.ietf.org/doc/html/rfc6455#section-5.5 + * @param {number} opcode + */ +function isControlFrame (opcode) { + return ( + opcode === opcodes.CLOSE || + opcode === opcodes.PING || + opcode === opcodes.PONG + ) +} + // https://nodejs.org/api/intl.html#detecting-internationalization-support const hasIntl = typeof process.versions.icu === 'string' const fatalDecoder = hasIntl ? new TextDecoder('utf-8', { fatal: true }) : undefined @@ -237,5 +249,6 @@ module.exports = { isValidStatusCode, failWebsocketConnection, websocketMessageReceived, - utf8Decode + utf8Decode, + isControlFrame } diff --git a/test/websocket/issue-2859.js b/test/websocket/issue-2859.js new file mode 100644 index 00000000000..bd20012fc6b --- /dev/null +++ b/test/websocket/issue-2859.js @@ -0,0 +1,38 @@ +'use strict' + +const { test } = require('node:test') +const { WebSocketServer } = require('ws') +const { WebSocket } = require('../..') +const diagnosticsChannel = require('node:diagnostics_channel') +const { tspl } = require('@matteo.collina/tspl') + +test('Fragmented frame with a ping frame in the first of it', async (t) => { + const { completed, deepStrictEqual, strictEqual } = tspl(t, { plan: 2 }) + + const server = new WebSocketServer({ port: 0 }) + + server.on('connection', (ws) => { + const socket = ws._socket + + socket.write(Buffer.from([0x89, 0x05, 0x48, 0x65, 0x6c, 0x6c, 0x6f])) // ping "Hello" + socket.write(Buffer.from([0x01, 0x03, 0x48, 0x65, 0x6c])) // Text frame "Hel" + socket.write(Buffer.from([0x80, 0x02, 0x6c, 0x6f])) // Text frame "lo" + }) + + t.after(() => { + server.close() + ws.close() + }) + + const ws = new WebSocket(`ws://127.0.0.1:${server.address().port}`) + + diagnosticsChannel.channel('undici:websocket:ping').subscribe( + ({ payload }) => deepStrictEqual(payload, Buffer.from('Hello')) + ) + + ws.addEventListener('message', ({ data }) => { + strictEqual(data, 'Hello') + }) + + await completed +})