Skip to content

Commit

Permalink
refactor: 🎨 migrate to socket module (#2180)
Browse files Browse the repository at this point in the history
migrate from using the state feature of the shared module to the socket
module

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced a new socket management system for improved connection
handling.
	- Added structured error handling for socket connections.

- **Bug Fixes**
	- Updated logging to accurately reflect the new socket IDs.

- **Refactor**
- Removed outdated socket handling functions and properties to
streamline state management.

- **Chores**
	- Added a new dependency for socket functionality.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
  • Loading branch information
drazisil authored Sep 28, 2024
1 parent 9dba87f commit f076c91
Show file tree
Hide file tree
Showing 8 changed files with 110 additions and 264 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
"rusty-motors-shard": "link:packages/shard",
"rusty-motors-shared": "link:packages/shared",
"rusty-motors-shared-packets": "link:packages/shared-packets",
"rusty-motors-socket": "link:src/socket",
"rusty-motors-transactions": "link:packages/transactions",
"sequelize": "^6.37.3",
"sqlite": "^5.1.1",
Expand Down
101 changes: 24 additions & 77 deletions packages/gateway/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,67 +14,22 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

import { randomUUID } from "node:crypto";
import {
type OnDataHandler,
type ServerLogger,
type ServiceResponse,
addSocket,
fetchStateFromDatabase,
getOnDataHandler,
removeSocket,
wrapSocket,
} from "rusty-motors-shared";
import { getServerLogger } from "rusty-motors-shared";
import { newSocket } from "rusty-motors-socket";

import { Socket } from "node:net";
import { getGatewayServer } from "./GatewayServer.js";
import { getPortMessageType, UserStatusManager } from "rusty-motors-nps";
import { BasePacket } from "rusty-motors-shared-packets";
import * as Sentry from "@sentry/node";

/**
* @typedef {object} OnDataHandlerArgs
* @property {object} args
* @property {string} args.connectionId The connection id of the socket that
* received the data.
* @property {module:packages/shared/RawMessage} args.message The data that was received.
* @property {module:shared/log.ServerLogger} [args.log=getServerLogger({ name: "gateway" })] The logger to use.
* response
* to the
* data.
*/

/**
* @typedef {function} OnDataHandler
* @param {OnDataHandlerArgs} args The arguments for the handler.
* @returns {ServiceResponse} The
* response
* to the
* data.
*/

/**
* Handle socket errors
*/
export function socketErrorHandler({
connectionId,
error,
log = getServerLogger({
name: "socketErrorHandler",
}),
}: {
connectionId: string;
error: NodeJS.ErrnoException;
log?: ServerLogger;
}) {
// Handle socket errors
if (error.code == "ECONNRESET") {
log.debug(`Connection ${connectionId} reset`);
return;
}
throw Error(`Socket error: ${error.message} on connection ${connectionId}`);
}
import { socketErrorHandler } from "./socketErrorHandler.js";

/**
* Handle the end of a socket connection
Expand All @@ -83,20 +38,20 @@ export function socketErrorHandler({
* @param {string} options.connectionId The connection ID
* @param {import("pino").Logger} [options.log=getServerLogger({ name: "socketEndHandler" })] The logger to use
*/
export function socketEndHandler({
connectionId,
log = getServerLogger({
name: "socketEndHandler",
}),
}: {
connectionId: string;
log?: ServerLogger;
}) {
log.debug(`Connection ${connectionId} ended`);

// Remove the socket from the global state
removeSocket(fetchStateFromDatabase(), connectionId).save();
}
// export function socketEndHandler({
// connectionId,
// log = getServerLogger({
// name: "socketEndHandler",
// }),
// }: {
// connectionId: string;
// log?: ServerLogger;
// }) {
// log.debug(`Connection ${connectionId} ended`);

// // Remove the socket from the global state
// removeSocket(fetchStateFromDatabase(), connectionId).save();
// }

/**
* Handle incoming TCP connections
Expand All @@ -122,14 +77,7 @@ export function onSocketConnection({
throw Error("localPort or remoteAddress is undefined");
}

// This is a new connection so generate a new connection ID
const newConnectionId = randomUUID();

// Wrap the socket and add it to the global state
const wrappedSocket = wrapSocket(incomingSocket, newConnectionId);

// Add the socket to the global state
addSocket(fetchStateFromDatabase(), wrappedSocket).save();
const socket = newSocket(incomingSocket);

// =======================
// Handle incoming socket in shadow mode
Expand All @@ -138,17 +86,17 @@ export function onSocketConnection({
try {
// Get expected message type
const messageType = getPortMessageType(localPort);
log.debug(`[${newConnectionId}] Expected message type: ${messageType}`);
log.debug(`[${socket.id}] Expected message type: ${messageType}`);

switch (messageType) {
case "Game": {
// Handle game messages
// Create a new user status
const userStatus = UserStatusManager.newUserStatus();
log.debug(`[${newConnectionId}] Created new user status`);
log.debug(`[${socket.id}] Created new user status`);

UserStatusManager.addUserStatus(userStatus);
log.debug(`[${newConnectionId}] Added user status to manager`);
log.debug(`[${socket.id}] Added user status to manager`);

break;
}
Expand All @@ -159,12 +107,12 @@ export function onSocketConnection({
}

default: {
log.warn(`[${newConnectionId}] No message type found`);
log.warn(`[${socket.id}] No message type found`);
break;
}
}
} catch (error) {
log.error(`[${newConnectionId}] Error handling socket: ${error}`);
log.error(`[${socket.id}] Error handling socket: ${error}`);
}

// This is a new TCP socket, so it's probably not using HTTP
Expand All @@ -182,14 +130,13 @@ export function onSocketConnection({
}

incomingSocket.on("error", (error) =>
socketErrorHandler({ connectionId: newConnectionId, error }),
socketErrorHandler({ connectionId: socket.id, error }),
);

// Add the data handler to the socket
incomingSocket.on(
"data",
function socketDataHandler(incomingDataAsBuffer: Buffer) {

log.trace(
`Incoming data on port ${localPort}: ${incomingDataAsBuffer.toString(
"hex",
Expand All @@ -212,7 +159,7 @@ export function onSocketConnection({
},
async () => {
portOnDataHandler({
connectionId: newConnectionId,
connectionId: socket.id,
message: rawMessage,
})
.then((response: ServiceResponse) => {
Expand Down
38 changes: 38 additions & 0 deletions packages/gateway/src/socketErrorHandler.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import { describe, it, expect, vi } from "vitest";
import { socketErrorHandler } from "./socketErrorHandler.js";
import { type ServerLogger } from "rusty-motors-shared";

describe("socketErrorHandler", () => {

it("should log a debug message when error code is ECONNRESET", () => {
const connectionId = "12345";
const error = { code: "ECONNRESET" } as NodeJS.ErrnoException;
const mockLogger = {
debug: vi.fn(),
} as unknown as ServerLogger;


socketErrorHandler({ connectionId, error, log: mockLogger });

expect(mockLogger.debug).toHaveBeenCalledWith(
`Connection ${connectionId} reset`,
);
});

it("should throw an error when error code is not handled", () => {
const connectionId = "12345";
const error = {
code: "EUNKNOWN",
message: "Unknown error",
} as NodeJS.ErrnoException;
const mockLogger = {
debug: vi.fn(),
} as unknown as ServerLogger;


expect(() =>
socketErrorHandler({ connectionId, error, log: mockLogger }),
).toThrow(`Socket error: ${error.message} on connection ${connectionId}`);
});

});
30 changes: 30 additions & 0 deletions packages/gateway/src/socketErrorHandler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import { getServerLogger, type ServerLogger } from "rusty-motors-shared";

/**
* Handles socket errors by logging specific error codes or throwing an error.
*
* @param {Object} params - The parameters for the socket error handler.
* @param {string} params.connectionId - The ID of the connection where the error occurred.
* @param {NodeJS.ErrnoException} params.error - The error object containing details of the socket error.
* @param {ServerLogger} [params.log] - Optional logger instance for logging error details. Defaults to a server logger named "socketErrorHandler".
*
* @throws {Error} Throws an error if the socket error code is not handled.
*/
export function socketErrorHandler({
connectionId,
error,
log = getServerLogger({
name: "socketErrorHandler",
}),
}: {
connectionId: string;
error: NodeJS.ErrnoException;
log?: ServerLogger;
}) {
// Handle socket errors
if (error.code == "ECONNRESET") {
log.debug(`Connection ${connectionId} reset`);
return;
}
throw Error(`Socket error: ${error.message} on connection ${connectionId}`);
}
3 changes: 0 additions & 3 deletions packages/shared/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,7 @@ export {
createInitialState,
fetchStateFromDatabase,
addOnDataHandler,
addSocket,
getOnDataHandler,
removeSocket,
wrapSocket,
addEncryption,
getEncryption,
McosSession,
Expand Down
Loading

0 comments on commit f076c91

Please sign in to comment.