diff --git a/package.json b/package.json index 9923dd31d..4abc58bde 100644 --- a/package.json +++ b/package.json @@ -56,6 +56,7 @@ "rusty-motors-shard": "link:packages/shard", "rusty-motors-shared": "link:packages/shared", "rusty-motors-shared-packets": "link:packages/shared-packets", + "rusty-motors-socket": "link:src/socket", "rusty-motors-transactions": "link:packages/transactions", "sequelize": "^6.37.3", "sqlite": "^5.1.1", diff --git a/packages/gateway/src/index.ts b/packages/gateway/src/index.ts index f51db0fc9..a999b0809 100644 --- a/packages/gateway/src/index.ts +++ b/packages/gateway/src/index.ts @@ -14,67 +14,22 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -import { randomUUID } from "node:crypto"; import { type OnDataHandler, type ServerLogger, type ServiceResponse, - addSocket, fetchStateFromDatabase, getOnDataHandler, - removeSocket, - wrapSocket, } from "rusty-motors-shared"; import { getServerLogger } from "rusty-motors-shared"; +import { newSocket } from "rusty-motors-socket"; import { Socket } from "node:net"; import { getGatewayServer } from "./GatewayServer.js"; import { getPortMessageType, UserStatusManager } from "rusty-motors-nps"; import { BasePacket } from "rusty-motors-shared-packets"; import * as Sentry from "@sentry/node"; - -/** - * @typedef {object} OnDataHandlerArgs - * @property {object} args - * @property {string} args.connectionId The connection id of the socket that - * received the data. - * @property {module:packages/shared/RawMessage} args.message The data that was received. - * @property {module:shared/log.ServerLogger} [args.log=getServerLogger({ name: "gateway" })] The logger to use. - * response - * to the - * data. - */ - -/** - * @typedef {function} OnDataHandler - * @param {OnDataHandlerArgs} args The arguments for the handler. - * @returns {ServiceResponse} The - * response - * to the - * data. - */ - -/** - * Handle socket errors - */ -export function socketErrorHandler({ - connectionId, - error, - log = getServerLogger({ - name: "socketErrorHandler", - }), -}: { - connectionId: string; - error: NodeJS.ErrnoException; - log?: ServerLogger; -}) { - // Handle socket errors - if (error.code == "ECONNRESET") { - log.debug(`Connection ${connectionId} reset`); - return; - } - throw Error(`Socket error: ${error.message} on connection ${connectionId}`); -} +import { socketErrorHandler } from "./socketErrorHandler.js"; /** * Handle the end of a socket connection @@ -83,20 +38,20 @@ export function socketErrorHandler({ * @param {string} options.connectionId The connection ID * @param {import("pino").Logger} [options.log=getServerLogger({ name: "socketEndHandler" })] The logger to use */ -export function socketEndHandler({ - connectionId, - log = getServerLogger({ - name: "socketEndHandler", - }), -}: { - connectionId: string; - log?: ServerLogger; -}) { - log.debug(`Connection ${connectionId} ended`); - - // Remove the socket from the global state - removeSocket(fetchStateFromDatabase(), connectionId).save(); -} +// export function socketEndHandler({ +// connectionId, +// log = getServerLogger({ +// name: "socketEndHandler", +// }), +// }: { +// connectionId: string; +// log?: ServerLogger; +// }) { +// log.debug(`Connection ${connectionId} ended`); + +// // Remove the socket from the global state +// removeSocket(fetchStateFromDatabase(), connectionId).save(); +// } /** * Handle incoming TCP connections @@ -122,14 +77,7 @@ export function onSocketConnection({ throw Error("localPort or remoteAddress is undefined"); } - // This is a new connection so generate a new connection ID - const newConnectionId = randomUUID(); - - // Wrap the socket and add it to the global state - const wrappedSocket = wrapSocket(incomingSocket, newConnectionId); - - // Add the socket to the global state - addSocket(fetchStateFromDatabase(), wrappedSocket).save(); + const socket = newSocket(incomingSocket); // ======================= // Handle incoming socket in shadow mode @@ -138,17 +86,17 @@ export function onSocketConnection({ try { // Get expected message type const messageType = getPortMessageType(localPort); - log.debug(`[${newConnectionId}] Expected message type: ${messageType}`); + log.debug(`[${socket.id}] Expected message type: ${messageType}`); switch (messageType) { case "Game": { // Handle game messages // Create a new user status const userStatus = UserStatusManager.newUserStatus(); - log.debug(`[${newConnectionId}] Created new user status`); + log.debug(`[${socket.id}] Created new user status`); UserStatusManager.addUserStatus(userStatus); - log.debug(`[${newConnectionId}] Added user status to manager`); + log.debug(`[${socket.id}] Added user status to manager`); break; } @@ -159,12 +107,12 @@ export function onSocketConnection({ } default: { - log.warn(`[${newConnectionId}] No message type found`); + log.warn(`[${socket.id}] No message type found`); break; } } } catch (error) { - log.error(`[${newConnectionId}] Error handling socket: ${error}`); + log.error(`[${socket.id}] Error handling socket: ${error}`); } // This is a new TCP socket, so it's probably not using HTTP @@ -182,14 +130,13 @@ export function onSocketConnection({ } incomingSocket.on("error", (error) => - socketErrorHandler({ connectionId: newConnectionId, error }), + socketErrorHandler({ connectionId: socket.id, error }), ); // Add the data handler to the socket incomingSocket.on( "data", function socketDataHandler(incomingDataAsBuffer: Buffer) { - log.trace( `Incoming data on port ${localPort}: ${incomingDataAsBuffer.toString( "hex", @@ -212,7 +159,7 @@ export function onSocketConnection({ }, async () => { portOnDataHandler({ - connectionId: newConnectionId, + connectionId: socket.id, message: rawMessage, }) .then((response: ServiceResponse) => { diff --git a/packages/gateway/src/socketErrorHandler.test.ts b/packages/gateway/src/socketErrorHandler.test.ts new file mode 100644 index 000000000..98c905e50 --- /dev/null +++ b/packages/gateway/src/socketErrorHandler.test.ts @@ -0,0 +1,38 @@ +import { describe, it, expect, vi } from "vitest"; +import { socketErrorHandler } from "./socketErrorHandler.js"; +import { type ServerLogger } from "rusty-motors-shared"; + +describe("socketErrorHandler", () => { + + it("should log a debug message when error code is ECONNRESET", () => { + const connectionId = "12345"; + const error = { code: "ECONNRESET" } as NodeJS.ErrnoException; + const mockLogger = { + debug: vi.fn(), + } as unknown as ServerLogger; + + + socketErrorHandler({ connectionId, error, log: mockLogger }); + + expect(mockLogger.debug).toHaveBeenCalledWith( + `Connection ${connectionId} reset`, + ); + }); + + it("should throw an error when error code is not handled", () => { + const connectionId = "12345"; + const error = { + code: "EUNKNOWN", + message: "Unknown error", + } as NodeJS.ErrnoException; + const mockLogger = { + debug: vi.fn(), + } as unknown as ServerLogger; + + + expect(() => + socketErrorHandler({ connectionId, error, log: mockLogger }), + ).toThrow(`Socket error: ${error.message} on connection ${connectionId}`); + }); + +}); diff --git a/packages/gateway/src/socketErrorHandler.ts b/packages/gateway/src/socketErrorHandler.ts new file mode 100644 index 000000000..12df58624 --- /dev/null +++ b/packages/gateway/src/socketErrorHandler.ts @@ -0,0 +1,30 @@ +import { getServerLogger, type ServerLogger } from "rusty-motors-shared"; + +/** + * Handles socket errors by logging specific error codes or throwing an error. + * + * @param {Object} params - The parameters for the socket error handler. + * @param {string} params.connectionId - The ID of the connection where the error occurred. + * @param {NodeJS.ErrnoException} params.error - The error object containing details of the socket error. + * @param {ServerLogger} [params.log] - Optional logger instance for logging error details. Defaults to a server logger named "socketErrorHandler". + * + * @throws {Error} Throws an error if the socket error code is not handled. + */ +export function socketErrorHandler({ + connectionId, + error, + log = getServerLogger({ + name: "socketErrorHandler", + }), +}: { + connectionId: string; + error: NodeJS.ErrnoException; + log?: ServerLogger; +}) { + // Handle socket errors + if (error.code == "ECONNRESET") { + log.debug(`Connection ${connectionId} reset`); + return; + } + throw Error(`Socket error: ${error.message} on connection ${connectionId}`); +} diff --git a/packages/shared/index.ts b/packages/shared/index.ts index d9ee99771..bcfc5c3f7 100644 --- a/packages/shared/index.ts +++ b/packages/shared/index.ts @@ -27,10 +27,7 @@ export { createInitialState, fetchStateFromDatabase, addOnDataHandler, - addSocket, getOnDataHandler, - removeSocket, - wrapSocket, addEncryption, getEncryption, McosSession, diff --git a/packages/shared/src/State.ts b/packages/shared/src/State.ts index 4ff7f03c7..83dd533d4 100644 --- a/packages/shared/src/State.ts +++ b/packages/shared/src/State.ts @@ -7,15 +7,9 @@ // eslint-disable-next-line no-unused-vars import { Cipher, Decipher } from "crypto"; -import { Socket } from "node:net"; -import type { Logger } from "pino"; import { SerializedBufferOld } from "./SerializedBufferOld.js"; import { Serializable } from "rusty-motors-shared-packets"; - -/** - * @external RawMessage - * @see {@link "packages/shared/messageFactory.js.RawMessage"} - */ +import type { ServerLogger } from "./log.js"; /** * State management for the gateway server. @@ -146,47 +140,11 @@ export class McosSession { } } -/** - * @external net - * @see {@link https://nodejs.org/api/net.html} - */ - -/** - * A wrapped socket. - * - * This is a socket that has been wrapped with a connection id. - * @interface - */ -interface WrappedSocket { - socket: Socket; - connectionId: string; -} - -/** - * Wrap a socket with a connection id. - * - * @param {module:NetConnectOpts.Socket} socket The socket to wrap. - * @param {string} connectionId The connection id to wrap the socket with. - * @returns {WrappedSocket} The wrapped socket. - */ -export function wrapSocket( - socket: Socket, - connectionId: string, -): WrappedSocket { - return { - socket, - connectionId, - }; -} - type OnDataHandlerArgs = { connectionId: string; message: Serializable; - log?: Logger; + log?: ServerLogger; }; -/** - * @requires module:packages/shared/RawMessage - */ export interface ServiceResponse { connectionId: string; @@ -219,10 +177,10 @@ export type OnDataHandler = ( */ export interface State { filePaths: Record; - sockets: Record; + // sockets: Record; encryptions: Record; sessions: Record; - queuedConnections: Record; + // queuedConnections: Record; onDataHandlers: Record; save: (state?: State) => void; } @@ -250,10 +208,10 @@ export function createInitialState({ }): State { return { filePaths: {}, - sockets: {}, + // sockets: {}, encryptions: {}, sessions: {}, - queuedConnections: {}, + // queuedConnections: {}, onDataHandlers: {}, save: function (state?: State) { if (typeof state === "undefined") { @@ -320,136 +278,6 @@ export function getOnDataHandler( return state.onDataHandlers[port.toString()]; } -/** - * Add a socket to the state. - * - * This function adds a socket to the state. - * The returned state is a new state object, and the original state is not - * modified. You should then call the save function on the new state to update - * the database. - * - * @param {State} state The state to add the socket to. - * @param {WrappedSocket} socket The socket to add to the state. - * @returns {State} The state with the socket added. - */ -export function addSocket(state: State, socket: WrappedSocket): State { - const sockets = state.sockets; - sockets[socket.connectionId] = socket; - return { - ...state, - sockets, - }; -} - -/** - * Get a socket from the state. - * - * This function gets a socket from the state. - * - * @param {State} state The state to get the socket from. - * @param {string} connectionId The connection id of the socket to get. - * @returns {WrappedSocket | undefined} The socket with the given connection id, or undefined if no socket - */ -export function getSocket( - state: State, - connectionId: string, -): WrappedSocket | undefined { - return state.sockets[connectionId]; -} - -/** - * Get all the sockets from the state. - * - * This function gets all the sockets from the state. - * - * @param {State} state The state to get the sockets from. - * @returns {Record} An array of all the sockets in the state. - */ -export function getSockets(state: State): Record { - return state.sockets; -} - -/** - * Remove a socket from the state. - * - * This function removes a socket from the state. - * The returned state is a new state object, and the original state is not - * modified. You should then call the save function on the new state to update - * the database. - * - * @param {State} state The state to remove the socket from. - * @param {string} connectionId The connection id of the socket to remove. - * @returns {State} The state with the socket removed. - */ -export function removeSocket(state: State, connectionId: string): State { - const sockets = state.sockets; - delete sockets[connectionId]; - return { - ...state, - sockets, - }; -} - -/** - * Add a queued connection to the state. - * - * This function adds a queued connection to the state. - * The returned state is a new state object, and the original state is not - * modified. You should then call the save function on the new state to update - * the database. - * - * @param {State} state The state to add the queued connection to. - * @param {WrappedSocket} socket The queued connection to add to the state. - * @returns {State} The state with the queued connection added. - */ -export function addQueuedConnection( - state: State, - socket: WrappedSocket, -): State { - const queuedConnections = state.queuedConnections; - queuedConnections[socket.connectionId] = socket; - return { - ...state, - queuedConnections, - }; -} - -/** - * Get queued connections from the state. - * - * This function gets all the queued connections from the state. - * - * @param {State} state The state to get the queued connections from. - * @returns {string[]} An array of all the queued connections in the state. - */ -export function getQueuedConnections(state: State): string[] { - return Object.keys(state.queuedConnections); -} - -/** - * Remove a queued connection from the state. - * - * This function removes a queued connection from the state. - * The returned state is a new state object, and the original state is not - * modified. You should then call the save function on the new state to update - * the database. - * - * @param {State} state The state to remove the queued connection from. - * @param {string} connectionId The connection id of the queued connection to remove. - * @returns {State} The state with the queued connection removed. - */ -export function removeQueuedConnection( - state: State, - connectionId: string, -): State { - const queuedConnections = state.queuedConnections; - delete queuedConnections[connectionId]; - return { - ...state, - queuedConnections, - }; -} - /** * Add an encryption to the state. * diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 508e6ec2f..c6df72d35 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -10,7 +10,7 @@ importers: dependencies: '@adminjs/sequelize': specifier: ^4.1.1 - version: 4.1.1(adminjs@7.8.13(@types/react@18.3.4))(sequelize@6.37.3) + version: 4.1.1(adminjs@7.8.13(@types/react@18.3.4))(sequelize@6.37.3(pg@8.13.0)) '@databases/pg': specifier: ^5.5.0 version: 5.5.0(typescript@5.6.2) @@ -89,12 +89,15 @@ importers: rusty-motors-shared-packets: specifier: link:packages/shared-packets version: link:packages/shared-packets + rusty-motors-socket: + specifier: link:src/socket + version: link:src/socket rusty-motors-transactions: specifier: link:packages/transactions version: link:packages/transactions sequelize: specifier: ^6.37.3 - version: 6.37.3 + version: 6.37.3(pg@8.13.0) sqlite: specifier: ^5.1.1 version: 5.1.1 @@ -6223,11 +6226,11 @@ snapshots: - react-is - supports-color - '@adminjs/sequelize@4.1.1(adminjs@7.8.13(@types/react@18.3.4))(sequelize@6.37.3)': + '@adminjs/sequelize@4.1.1(adminjs@7.8.13(@types/react@18.3.4))(sequelize@6.37.3(pg@8.13.0))': dependencies: adminjs: 7.8.13(@types/react@18.3.4) escape-regexp: 0.0.1 - sequelize: 6.37.3 + sequelize: 6.37.3(pg@8.13.0) '@ampproject/remapping@2.3.0': dependencies: @@ -11814,7 +11817,7 @@ snapshots: sequelize-pool@7.1.0: {} - sequelize@6.37.3: + sequelize@6.37.3(pg@8.13.0): dependencies: '@types/debug': 4.1.12 '@types/validator': 13.12.0 @@ -11832,6 +11835,8 @@ snapshots: uuid: 8.3.2 validator: 13.12.0 wkx: 0.5.0 + optionalDependencies: + pg: 8.13.0 transitivePeerDependencies: - supports-color diff --git a/src/socket/index.ts b/src/socket/index.ts index 833fcddd3..4b402afd6 100644 --- a/src/socket/index.ts +++ b/src/socket/index.ts @@ -22,7 +22,7 @@ export function newSocket(socket: Socket): ConnectedSocket { const connectedSocket = new ConnectedSocket_() as ConnectedSocket; sockets.set(connectedSocket.id, connectedSocket); socket.on("data", (data) => { - connectedSocket.data = Buffer.concat([connectedSocket.data, data]); + connectedSocket.data = data; }); socket.on("close", () => { sockets.delete(connectedSocket.id);