Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: channel can use custom serializer #3711

Merged
merged 15 commits into from
May 31, 2024
13 changes: 3 additions & 10 deletions packages/connection/__test__/browser/index.test.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { furySerializer } from '@opensumi/ide-connection';
import { ReconnectingWebSocketConnection } from '@opensumi/ide-connection/lib/common/connection/drivers/reconnecting-websocket';
import { sleep } from '@opensumi/ide-core-common';
import { Server, WebSocket } from '@opensumi/mock-socket';

import { WSChannelHandler } from '../../src/browser/ws-channel-handler';
import { parse, stringify } from '../../src/common/ws-channel';
(global as any).WebSocket = WebSocket;

const randomPortFn = () => Math.floor(Math.random() * 10000) + 10000;
Expand All @@ -17,16 +17,15 @@ describe('connection browser', () => {
const fakeWSURL = `ws://127.0.0.1:${randomPort}`;
const mockServer = new Server(fakeWSURL);

let receivedHeartbeat = false;
let data1Received = false;
let data2Received = false;

mockServer.on('connection', (socket) => {
socket.on('message', (msg) => {
const msgObj = parse(msg as Uint8Array);
const msgObj = furySerializer.deserialize(msg as Uint8Array);
if (msgObj.kind === 'open') {
socket.send(
stringify({
furySerializer.serialize({
id: msgObj.id,
kind: 'server-ready',
token: '',
Expand All @@ -40,22 +39,16 @@ describe('connection browser', () => {
if (data === 'data2') {
data2Received = true;
}
} else if (msgObj.kind === 'ping') {
receivedHeartbeat = true;
}
});
});

const wsChannelHandler = new WSChannelHandler(
ReconnectingWebSocketConnection.forURL(fakeWSURL),
console,
'test-client-id',
);

await wsChannelHandler.initHandler();
await sleep(11000);
expect(receivedHeartbeat).toBe(true);
receivedHeartbeat = false;

const channel = await wsChannelHandler.openChannel('test');

Expand Down
5 changes: 2 additions & 3 deletions packages/connection/__test__/common/fury-extends/any.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { ExtObjectTransfer } from '@opensumi/ide-connection/lib/common/fury-extends/any';
import { AnySerializer, ExtObjectTransfer } from '@opensumi/ide-connection/src/common/fury-extends/any';

import { AnySerializer } from '../../../lib/common/fury-extends/any';
import { furyFactory } from '../../../lib/common/fury-extends/shared';
import { furyFactory } from '../../../src/common/fury-extends/shared';

describe('any serializer', () => {
it('can serialize and deserialize any type', () => {
Expand Down
14 changes: 4 additions & 10 deletions packages/connection/__test__/common/fury-extends/one-of.test.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
/* eslint-disable no-console */

import {
OpenMessage,
PingMessage,
PongMessage,
ServerReadyMessage,
parse,
stringify,
} from '../../../src/common/ws-channel';
import { OpenMessage, PingMessage, PongMessage, ServerReadyMessage, furySerializer } from '@opensumi/ide-connection';

const parse = furySerializer.deserialize;
const stringify = furySerializer.serialize;

describe('oneOf', () => {
function testIt(obj: any) {
Expand All @@ -23,15 +19,13 @@ describe('oneOf', () => {
it('should serialize and deserialize', () => {
const obj = {
kind: 'ping',
clientId: '123',
id: '456',
} as PingMessage;

testIt(obj);

const obj2 = {
kind: 'pong',
clientId: '123',
id: '456',
} as PongMessage;

Expand Down
30 changes: 11 additions & 19 deletions packages/connection/__test__/common/message-io.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { MessageIO } from '@opensumi/ide-connection/lib/common/rpc';
import { MessageIO, RPCResponseMessage } from '@opensumi/ide-connection/lib/common/rpc';

import { protocols } from './rpc/utils';

Expand All @@ -25,33 +25,25 @@ describe('message io', () => {
expect(args).toEqual([1, 2]);
});
it('should be able to create a response', () => {
const repo = new MessageIO();
repo.loadProtocolMethod(protocols.add.protocol);
const buf = repo.Response(0, protocols.add.protocol.method, {}, 3);
const io = new MessageIO();
io.loadProtocolMethod(protocols.add.protocol);
const buf = io.Response(0, protocols.add.protocol.method, {}, 3);
expect(buf.byteLength).toBeGreaterThan(20);

repo.reader.reset(buf);
// version + op type + id
repo.reader.skip(1 + 1 + 4);
// method
const method = repo.reader.stringOfVarUInt32();
const response = io.parse(buf) as RPCResponseMessage;

const { method, headers, result } = response;

expect(method).toBe(protocols.add.protocol.method);
// status
const status = repo.reader.uint16();
expect(status).toBe(0);
// headers
const headers = repo.responseHeadersSerializer.read();
expect(headers).toEqual({
chunked: null,
});
// response
const response = repo.getProcessor(protocols.add.protocol.method).readResponse();
expect(response).toEqual(3);
expect(result).toEqual(3);

const buf2 = repo.Response(0, 'any1', {}, null);
const buf2 = io.Response(0, 'any1', {}, null);
expect(buf2.byteLength).toBeGreaterThan(20);

const buf3 = repo.Response(0, 'any2', {}, new Uint8Array(10));
const buf3 = io.Response(0, 'any2', {}, new Uint8Array(10));
expect(buf3.byteLength).toBeGreaterThan(20);
});
});
2 changes: 1 addition & 1 deletion packages/connection/__test__/common/rpc/registry.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { ServiceRegistry, getServiceMethods } from '@opensumi/ide-connection/lib/common/rpc-service/registry';
import { ServiceRegistry, getServiceMethods } from '@opensumi/ide-connection/src/common/rpc-service/registry';
import { Deferred } from '@opensumi/ide-core-common';

describe('registry should work', () => {
Expand Down
7 changes: 4 additions & 3 deletions packages/connection/__test__/common/rpc/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { MessageChannel, MessagePort } from 'worker_threads';

import { Type, TypeDescription } from '@furyjs/fury';

import { ProxyJson, WSChannel } from '@opensumi/ide-connection';
import { ProxyJson } from '@opensumi/ide-connection';
import { NetSocketConnection } from '@opensumi/ide-connection/lib/common/connection';
import { createWebSocketConnection } from '@opensumi/ide-connection/lib/common/message';
import { Deferred, isUint8Array } from '@opensumi/ide-core-common';
Expand All @@ -17,6 +17,7 @@ import { SumiConnection } from '../../../src/common/rpc/connection';
import { MessageIO } from '../../../src/common/rpc/message-io';
import { ProxySumi } from '../../../src/common/rpc-service/proxy/sumi';
import { ServiceRegistry } from '../../../src/common/rpc-service/registry';
import { createWSChannelForClient } from '../ws-channel';

function createRandomBuffer(size: number): Buffer {
const randomContent = randomBytes(size);
Expand Down Expand Up @@ -132,10 +133,10 @@ export function createMessagePortWSChannel() {
const channel = new MessageChannel();
const { port1, port2 } = channel;

const channel1 = WSChannel.forClient(new NodeMessagePortConnection(port1), {
const channel1 = createWSChannelForClient(new NodeMessagePortConnection(port1), {
id: '1',
});
const channel2 = WSChannel.forClient(new NodeMessagePortConnection(port2), {
const channel2 = createWSChannelForClient(new NodeMessagePortConnection(port2), {
id: '2',
});

Expand Down
21 changes: 21 additions & 0 deletions packages/connection/__test__/common/ws-channel.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { DisposableCollection } from '@opensumi/ide-core-common';

import { IWSChannelCreateOptions, WSChannel, furySerializer, wrapSerializer } from '../../src/common';
import { IConnectionShape } from '../../src/common/connection/types';

export function createWSChannelForClient(connection: IConnectionShape<Uint8Array>, options: IWSChannelCreateOptions) {
const disposable = new DisposableCollection();

const wrappedConnection = wrapSerializer(connection, furySerializer);
const channel = new WSChannel(wrappedConnection, options);
disposable.push(
wrappedConnection.onMessage((data) => {
channel.dispatch(data);
}),
);
connection.onceClose(() => {
disposable.dispose();
});

return channel;
}
2 changes: 1 addition & 1 deletion packages/connection/__test__/node/channel-handler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ describe('channel handler', () => {
const socket = new net.Socket();
socket.connect(ipcPath);
const connection = new NetSocketConnection(socket);
const browserChannel = new WSChannelHandler(connection, console, clientId);
const browserChannel = new WSChannelHandler(connection, clientId);

await browserChannel.initHandler();

Expand Down
7 changes: 4 additions & 3 deletions packages/connection/__test__/node/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ import http from 'http';

import WebSocket from 'ws';

import { furySerializer, wrapSerializer } from '@opensumi/ide-connection/lib/common/serializer';
import { WSWebSocketConnection } from '@opensumi/ide-connection/src/common/connection';
import { SumiConnection } from '@opensumi/ide-connection/src/common/rpc/connection';
import { Deferred } from '@opensumi/ide-core-common';

import { RPCService } from '../../src';
import { RPCServiceCenter, initRPCService } from '../../src/common';
import { WSChannel, parse } from '../../src/common/ws-channel';
import { WSChannel } from '../../src/common/ws-channel';
import { CommonChannelHandler, WebSocketServerRoute, commonChannelPathHandler } from '../../src/node';

const wssPort = 7788;
Expand Down Expand Up @@ -58,11 +59,11 @@ describe('connection', () => {
});
const clientId = 'TEST_CLIENT';
const wsConnection = new WSWebSocketConnection(connection);
const channel = new WSChannel(wsConnection, {
const channel = new WSChannel(wrapSerializer(wsConnection, furySerializer), {
id: 'TEST_CHANNEL_ID',
});
connection.on('message', (msg: Uint8Array) => {
const msgObj = parse(msg);
const msgObj = furySerializer.deserialize(msg);
if (msgObj.kind === 'server-ready') {
if (msgObj.id === 'TEST_CHANNEL_ID') {
channel.dispatch(msgObj);
Expand Down
5 changes: 3 additions & 2 deletions packages/connection/__test__/node/ws-channel.test.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
import net from 'net';

import { IWSChannelCreateOptions, WSChannel } from '@opensumi/ide-connection';
import { IWSChannelCreateOptions } from '@opensumi/ide-connection';
import { normalizedIpcHandlerPathAsync } from '@opensumi/ide-core-common/lib/utils/ipc';

import { copy } from '../../src/common/buffers/buffers';
import { NetSocketConnection } from '../../src/common/connection';
import { createWSChannelForClient } from '../common/ws-channel';

const total = 1000;

const createWSChannel = (socket: net.Socket, options: IWSChannelCreateOptions) => {
const wsConnection = new NetSocketConnection(socket);
return WSChannel.forClient(wsConnection, options);
return createWSChannelForClient(wsConnection, options);
};

describe('ws channel node', () => {
Expand Down
10 changes: 2 additions & 8 deletions packages/connection/benchmarks/gateway.bench.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,18 @@ import crypto from 'crypto';
// @ts-ignore
import { Bench } from 'tinybench';

import { ChannelMessage, ErrorMessage, ErrorMessageCode, PingMessage, PongMessage } from '../src/common/channel';
import { oneOf } from '../src/common/fury-extends/one-of';
import {
BinaryProtocol,
ChannelMessage,
CloseProtocol,
DataProtocol,
ErrorMessage,
ErrorMessageCode,
ErrorProtocol,
OpenProtocol,
PingMessage,
PingProtocol,
PongMessage,
PongProtocol,
ServerReadyProtocol,
} from '../src/common/ws-channel';
} from '../src/common/serializer/fury';

const bench = new Bench({
time: 2000,
Expand Down Expand Up @@ -52,13 +48,11 @@ function testItJson(obj: any) {

const obj = {
kind: 'ping',
clientId: '123',
id: '456',
} as PingMessage;

const obj2 = {
kind: 'pong',
clientId: '123',
id: '456',
} as PongMessage;
const obj3 = {
Expand Down
Loading
Loading