From fc07c69253ed626fedc6a25fbed8ce624d7b4bab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=87=8E=E5=A3=B0?= Date: Wed, 22 May 2024 18:04:54 +0800 Subject: [PATCH 01/14] refactor: channel can use custom serializer --- .../connection/__test__/browser/index.test.ts | 6 +- .../common/fury-extends/one-of.test.ts | 16 +- .../connection/__test__/node/index.test.ts | 7 +- .../connection/benchmarks/gateway.bench.ts | 10 +- .../src/browser/ws-channel-handler.ts | 49 ++-- .../connection/src/common/channel/index.ts | 1 + .../connection/src/common/channel/types.ts | 82 ++++++ .../src/common/connection/drivers/simple.ts | 10 +- packages/connection/src/common/index.ts | 2 + .../connection/src/common/serializer/fury.ts | 61 ++++ .../connection/src/common/serializer/index.ts | 23 ++ .../connection/src/common/serializer/types.ts | 4 + .../connection/src/common/server-handler.ts | 50 ++-- packages/connection/src/common/ws-channel.ts | 277 +++++------------- .../core-browser/src/bootstrap/connection.ts | 7 +- 15 files changed, 317 insertions(+), 288 deletions(-) create mode 100644 packages/connection/src/common/channel/index.ts create mode 100644 packages/connection/src/common/channel/types.ts create mode 100644 packages/connection/src/common/serializer/fury.ts create mode 100644 packages/connection/src/common/serializer/index.ts create mode 100644 packages/connection/src/common/serializer/types.ts diff --git a/packages/connection/__test__/browser/index.test.ts b/packages/connection/__test__/browser/index.test.ts index ca0c1c668c..786f8288d1 100644 --- a/packages/connection/__test__/browser/index.test.ts +++ b/packages/connection/__test__/browser/index.test.ts @@ -1,9 +1,9 @@ +import { furySerializer } from '@opensumi/ide-connection'; import { ReconnectingWebSocketConnection } from '@opensumi/ide-connection/lib/common/connection/drivers/reconnecting-websocket'; import { sleep } from '@opensumi/ide-core-common'; import { Server, WebSocket } from '@opensumi/mock-socket'; import { WSChannelHandler } from '../../src/browser/ws-channel-handler'; -import { parse, stringify } from '../../src/common/ws-channel'; (global as any).WebSocket = WebSocket; const randomPortFn = () => Math.floor(Math.random() * 10000) + 10000; @@ -23,10 +23,10 @@ describe('connection browser', () => { mockServer.on('connection', (socket) => { socket.on('message', (msg) => { - const msgObj = parse(msg as Uint8Array); + const msgObj = furySerializer.deserialize(msg as Uint8Array); if (msgObj.kind === 'open') { socket.send( - stringify({ + furySerializer.serialize({ id: msgObj.id, kind: 'server-ready', token: '', diff --git a/packages/connection/__test__/common/fury-extends/one-of.test.ts b/packages/connection/__test__/common/fury-extends/one-of.test.ts index 45feff0af2..d193a95e27 100644 --- a/packages/connection/__test__/common/fury-extends/one-of.test.ts +++ b/packages/connection/__test__/common/fury-extends/one-of.test.ts @@ -1,13 +1,11 @@ /* eslint-disable no-console */ -import { - OpenMessage, - PingMessage, - PongMessage, - ServerReadyMessage, - parse, - stringify, -} from '../../../src/common/ws-channel'; +import { furySerializer } from '@opensumi/ide-connection'; + +import { OpenMessage, PingMessage, PongMessage, ServerReadyMessage } from '../../../lib'; + +const parse = furySerializer.deserialize; +const stringify = furySerializer.serialize; describe('oneOf', () => { function testIt(obj: any) { @@ -23,7 +21,6 @@ describe('oneOf', () => { it('should serialize and deserialize', () => { const obj = { kind: 'ping', - clientId: '123', id: '456', } as PingMessage; @@ -31,7 +28,6 @@ describe('oneOf', () => { const obj2 = { kind: 'pong', - clientId: '123', id: '456', } as PongMessage; diff --git a/packages/connection/__test__/node/index.test.ts b/packages/connection/__test__/node/index.test.ts index 9d3594b336..269ed7d4b9 100644 --- a/packages/connection/__test__/node/index.test.ts +++ b/packages/connection/__test__/node/index.test.ts @@ -2,13 +2,14 @@ import http from 'http'; import WebSocket from 'ws'; +import { furySerializer, wrapSerializer } from '@opensumi/ide-connection/lib/common/serializer'; import { WSWebSocketConnection } from '@opensumi/ide-connection/src/common/connection'; import { SumiConnection } from '@opensumi/ide-connection/src/common/rpc/connection'; import { Deferred } from '@opensumi/ide-core-common'; import { RPCService } from '../../src'; import { RPCServiceCenter, initRPCService } from '../../src/common'; -import { WSChannel, parse } from '../../src/common/ws-channel'; +import { WSChannel } from '../../src/common/ws-channel'; import { CommonChannelHandler, WebSocketServerRoute, commonChannelPathHandler } from '../../src/node'; const wssPort = 7788; @@ -58,11 +59,11 @@ describe('connection', () => { }); const clientId = 'TEST_CLIENT'; const wsConnection = new WSWebSocketConnection(connection); - const channel = new WSChannel(wsConnection, { + const channel = new WSChannel(wrapSerializer(wsConnection, furySerializer), { id: 'TEST_CHANNEL_ID', }); connection.on('message', (msg: Uint8Array) => { - const msgObj = parse(msg); + const msgObj = furySerializer.deserialize(msg); if (msgObj.kind === 'server-ready') { if (msgObj.id === 'TEST_CHANNEL_ID') { channel.dispatch(msgObj); diff --git a/packages/connection/benchmarks/gateway.bench.ts b/packages/connection/benchmarks/gateway.bench.ts index c285088bd6..ef82962d3c 100644 --- a/packages/connection/benchmarks/gateway.bench.ts +++ b/packages/connection/benchmarks/gateway.bench.ts @@ -4,22 +4,18 @@ import crypto from 'crypto'; // @ts-ignore import { Bench } from 'tinybench'; +import { ChannelMessage, ErrorMessage, ErrorMessageCode, PingMessage, PongMessage } from '../src/common/channel'; import { oneOf } from '../src/common/fury-extends/one-of'; import { BinaryProtocol, - ChannelMessage, CloseProtocol, DataProtocol, - ErrorMessage, - ErrorMessageCode, ErrorProtocol, OpenProtocol, - PingMessage, PingProtocol, - PongMessage, PongProtocol, ServerReadyProtocol, -} from '../src/common/ws-channel'; +} from '../src/common/serializer/fury'; const bench = new Bench({ time: 2000, @@ -52,13 +48,11 @@ function testItJson(obj: any) { const obj = { kind: 'ping', - clientId: '123', id: '456', } as PingMessage; const obj2 = { kind: 'pong', - clientId: '123', id: '456', } as PongMessage; const obj3 = { diff --git a/packages/connection/src/browser/ws-channel-handler.ts b/packages/connection/src/browser/ws-channel-handler.ts index bb977d5209..f3526ae4d9 100644 --- a/packages/connection/src/browser/ws-channel-handler.ts +++ b/packages/connection/src/browser/ws-channel-handler.ts @@ -1,9 +1,16 @@ import { EventEmitter } from '@opensumi/events'; import { Barrier, Deferred, DisposableStore, IReporterService, MultiMap, REPORT_NAME } from '@opensumi/ide-core-common'; +import { ChannelMessage } from '../common/channel/types'; import { IRuntimeSocketConnection } from '../common/connection'; +import { IConnectionShape } from '../common/connection/types'; +import { ISerializer, furySerializer, wrapSerializer } from '../common/serializer'; import { ConnectionInfo, WSCloseInfo } from '../common/types'; -import { ErrorMessageCode, WSChannel, parse, pingMessage } from '../common/ws-channel'; +import { WSChannel } from '../common/ws-channel'; + +export interface WSChannelHandlerOptions { + serializer?: ISerializer; +} /** * Channel Handler in browser @@ -12,6 +19,8 @@ export class WSChannelHandler { private _disposables = new DisposableStore(); private _onChannelCreatedEmitter = this._disposables.add(new EventEmitter>()); + + wrappedConnection: IConnectionShape; public onChannelCreated(path: string, listener: (channel: WSChannel) => void) { return this._onChannelCreatedEmitter.on(path, listener); } @@ -30,10 +39,17 @@ export class WSChannelHandler { LOG_TAG: string; - constructor(public connection: IRuntimeSocketConnection, logger: any, clientId: string) { + constructor( + public connection: IRuntimeSocketConnection, + logger: any, + clientId: string, + options: WSChannelHandlerOptions = {}, + ) { this.logger = logger || this.logger; this.clientId = clientId; this.LOG_TAG = `[WSChannelHandler] [client-id:${this.clientId}]`; + const serializer = options.serializer || furySerializer; + this.wrappedConnection = wrapSerializer(this.connection, serializer); } // 为解决建立连接之后,替换成可落盘的 logger replaceLogger(logger: any) { @@ -49,37 +65,24 @@ export class WSChannelHandler { clearTimeout(this.heartbeatMessageTimer); } this.heartbeatMessageTimer = global.setTimeout(() => { - this.connection.send(pingMessage); + this.channelMap.forEach((channel) => { + channel.ping(); + }); + this.heartbeatMessage(); }, 10 * 1000); } public async initHandler() { - this.connection.onMessage((message) => { + this.wrappedConnection.onMessage((msg) => { // 一个心跳周期内如果有收到消息,则不需要再发送心跳 this.heartbeatMessage(); - const msg = parse(message); - switch (msg.kind) { case 'pong': - // pong 没有 msg.id, 且不需要分发, 不处理 - break; - case 'error': - this.logger.error(this.LOG_TAG, `receive error: id: ${msg.id}, code: ${msg.code}, error: ${msg.message}`); - switch (msg.code) { - case ErrorMessageCode.ChannelNotFound: - if (this.channelMap.has(msg.id)) { - // 如果远程报错 channel 不存在但是本机存在,则重新打开 - const channel = this.channelMap.get(msg.id)!; - if (channel.channelPath) { - channel.pause(); - channel.open(channel.channelPath, this.clientId); - } - } - break; - } + // pong 不需要分发, 不处理 break; + default: { const channel = this.channelMap.get(msg.id); if (channel) { @@ -138,7 +141,7 @@ export class WSChannelHandler { this.logger.log(this.LOG_TAG, `channel ${key} already exists, dispose it`); } - const channel = new WSChannel(this.connection, { + const channel = new WSChannel(this.wrappedConnection, { id: key, logger: this.logger, ensureServerReady: true, diff --git a/packages/connection/src/common/channel/index.ts b/packages/connection/src/common/channel/index.ts new file mode 100644 index 0000000000..fcb073fefc --- /dev/null +++ b/packages/connection/src/common/channel/index.ts @@ -0,0 +1 @@ +export * from './types'; diff --git a/packages/connection/src/common/channel/types.ts b/packages/connection/src/common/channel/types.ts new file mode 100644 index 0000000000..9ce1f43fb0 --- /dev/null +++ b/packages/connection/src/common/channel/types.ts @@ -0,0 +1,82 @@ +export type ChannelMessage = + | PingMessage + | PongMessage + | OpenMessage + | ServerReadyMessage + | DataMessage + | BinaryMessage + | CloseMessage + | ErrorMessage; + +/** + * `ping` and `pong` are used to detect whether the connection is alive. + */ +export interface PingMessage { + kind: 'ping'; + id: string; +} + +/** + * when server receive a `ping` message, it should reply a `pong` message, vice versa. + */ +export interface PongMessage { + kind: 'pong'; + id: string; +} + +/** + * `data` message indicate that the channel has received some data. + * the `content` field is the data, it should be a string. + */ +export interface DataMessage { + kind: 'data'; + id: string; + content: string; +} + +export interface BinaryMessage { + kind: 'binary'; + id: string; + binary: Uint8Array; +} + +export interface CloseMessage { + kind: 'close'; + id: string; + code: number; + reason: string; +} + +/** + * `open` message is used to open a new channel. + * `path` is used to identify which handler should be used to handle the channel. + * `clientId` is used to identify the client. + */ +export interface OpenMessage { + kind: 'open'; + id: string; + path: string; + clientId: string; + connectionToken: string; +} + +export enum ErrorMessageCode { + ChannelNotFound = 1, +} + +export interface ErrorMessage { + kind: 'error'; + id: string; + code: ErrorMessageCode; + message: string; +} + +/** + * when server receive a `open` message, it should reply a `server-ready` message. + * this is indicate that the channel is ready to use. + */ +export interface ServerReadyMessage { + kind: 'server-ready'; + id: string; + token: string; +} diff --git a/packages/connection/src/common/connection/drivers/simple.ts b/packages/connection/src/common/connection/drivers/simple.ts index 62b47bedf6..680833a644 100644 --- a/packages/connection/src/common/connection/drivers/simple.ts +++ b/packages/connection/src/common/connection/drivers/simple.ts @@ -2,19 +2,19 @@ import { IDisposable } from '@opensumi/ide-core-common'; import { BaseConnection } from './base'; -export class SimpleConnection extends BaseConnection { +export class SimpleConnection extends BaseConnection { constructor( public options: { - send?: (data: Uint8Array) => void; - onMessage?: (cb: (data: Uint8Array) => void) => IDisposable; + send?: (data: T) => void; + onMessage?: (cb: (data: T) => void) => IDisposable; } = {}, ) { super(); } - send(data: Uint8Array): void { + send(data: T): void { this.options.send?.(data); } - onMessage(cb: (data: Uint8Array) => void): IDisposable { + onMessage(cb: (data: T) => void): IDisposable { if (this.options.onMessage) { return this.options.onMessage(cb); } diff --git a/packages/connection/src/common/index.ts b/packages/connection/src/common/index.ts index 394d4833a9..ca54cac91a 100644 --- a/packages/connection/src/common/index.ts +++ b/packages/connection/src/common/index.ts @@ -5,3 +5,5 @@ export * from './capturer'; export * from './ws-channel'; export * from './connect'; export * from './types'; +export * from './connection'; +export * from './serializer'; diff --git a/packages/connection/src/common/serializer/fury.ts b/packages/connection/src/common/serializer/fury.ts new file mode 100644 index 0000000000..ec5889e068 --- /dev/null +++ b/packages/connection/src/common/serializer/fury.ts @@ -0,0 +1,61 @@ +import { Type } from '@furyjs/fury'; + +import { ChannelMessage } from '../channel/types'; +import { oneOf } from '../fury-extends/one-of'; + +import { ISerializer } from './types'; + +export const PingProtocol = Type.object('ping', { + id: Type.string(), +}); + +export const PongProtocol = Type.object('pong', { + id: Type.string(), +}); + +export const OpenProtocol = Type.object('open', { + clientId: Type.string(), + id: Type.string(), + path: Type.string(), + connectionToken: Type.string(), +}); + +export const ServerReadyProtocol = Type.object('server-ready', { + id: Type.string(), + token: Type.string(), +}); + +export const ErrorProtocol = Type.object('error', { + id: Type.string(), + code: Type.uint16(), + message: Type.string(), +}); + +export const DataProtocol = Type.object('data', { + id: Type.string(), + content: Type.string(), +}); + +export const BinaryProtocol = Type.object('binary', { + id: Type.string(), + binary: Type.binary(), +}); + +export const CloseProtocol = Type.object('close', { + id: Type.string(), + code: Type.uint32(), + reason: Type.string(), +}); + +const serializer = oneOf([ + PingProtocol, + PongProtocol, + OpenProtocol, + ServerReadyProtocol, + DataProtocol, + BinaryProtocol, + CloseProtocol, + ErrorProtocol, +]); + +export const furySerializer: ISerializer = serializer; diff --git a/packages/connection/src/common/serializer/index.ts b/packages/connection/src/common/serializer/index.ts new file mode 100644 index 0000000000..b17dbbfe0f --- /dev/null +++ b/packages/connection/src/common/serializer/index.ts @@ -0,0 +1,23 @@ +import { IConnectionShape } from '../connection/types'; + +import { ISerializer } from './types'; + +export * from './fury'; +export * from './types'; + +export const wrapSerializer = ( + connection: IConnectionShape, + serializer: ISerializer, +): IConnectionShape => ({ + onceClose(cb) { + return connection.onceClose(cb); + }, + onMessage(cb) { + return connection.onMessage((data) => { + cb(serializer.deserialize(data)); + }); + }, + send(data) { + connection.send(serializer.serialize(data)); + }, +}); diff --git a/packages/connection/src/common/serializer/types.ts b/packages/connection/src/common/serializer/types.ts new file mode 100644 index 0000000000..173e18881c --- /dev/null +++ b/packages/connection/src/common/serializer/types.ts @@ -0,0 +1,4 @@ +export interface ISerializer { + serialize(data: FROM): TO; + deserialize(data: TO): FROM; +} diff --git a/packages/connection/src/common/server-handler.ts b/packages/connection/src/common/server-handler.ts index 78ba31de9b..20b2bfbbf9 100644 --- a/packages/connection/src/common/server-handler.ts +++ b/packages/connection/src/common/server-handler.ts @@ -1,14 +1,9 @@ +import { ChannelMessage, ErrorMessageCode } from './channel/types'; import { IConnectionShape } from './connection/types'; +import { furySerializer, wrapSerializer } from './serializer'; +import { ISerializer } from './serializer/types'; import { ILogger } from './types'; -import { - ChannelMessage, - ErrorMessageCode, - WSChannel, - WSServerChannel, - parse, - pongMessage, - stringify, -} from './ws-channel'; +import { WSChannel, WSServerChannel } from './ws-channel'; export interface IPathHandler { dispose: (channel: WSChannel, connectionId: string) => void; @@ -105,12 +100,21 @@ export class CommonChannelPathHandler { export const commonChannelPathHandler = new CommonChannelPathHandler(); +export interface ChannelHandlerOptions { + serializer?: ISerializer; +} + export abstract class BaseCommonChannelHandler { protected channelMap: Map = new Map(); protected heartbeatTimer: NodeJS.Timeout | null = null; - constructor(public handlerId: string, protected logger: ILogger = console) {} + serializer: ISerializer = furySerializer; + constructor(public handlerId: string, protected logger: ILogger = console, options: ChannelHandlerOptions = {}) { + if (options.serializer) { + this.serializer = options.serializer; + } + } abstract doHeartbeat(connection: any): void; @@ -129,24 +133,20 @@ export abstract class BaseCommonChannelHandler { let clientId: string; this.heartbeat(connection); + const wrappedConnection = wrapSerializer(connection, this.serializer); + const getOrCreateChannel = (id: string, clientId: string) => { let channel = this.channelMap.get(id); if (!channel) { - channel = new WSServerChannel(connection, { id, clientId, logger: this.logger }); + channel = new WSServerChannel(wrappedConnection, { id, clientId, logger: this.logger }); this.channelMap.set(id, channel); } return channel; }; - connection.onMessage((data: Uint8Array) => { - let msg: ChannelMessage; + wrappedConnection.onMessage((msg: ChannelMessage) => { try { - msg = parse(data); - switch (msg.kind) { - case 'ping': - connection.send(pongMessage); - break; case 'open': { const { id, path, connectionToken } = msg; clientId = msg.clientId; @@ -163,14 +163,12 @@ export abstract class BaseCommonChannelHandler { if (channel) { channel.dispatch(msg); } else { - connection.send( - stringify({ - kind: 'error', - id, - code: ErrorMessageCode.ChannelNotFound, - message: `channel ${id} not found`, - }), - ); + wrappedConnection.send({ + kind: 'error', + id, + code: ErrorMessageCode.ChannelNotFound, + message: `channel ${id} not found`, + }); this.logger.warn(`channel ${id} is not found`); } diff --git a/packages/connection/src/common/ws-channel.ts b/packages/connection/src/common/ws-channel.ts index 657374a8a4..cfdb1de022 100644 --- a/packages/connection/src/common/ws-channel.ts +++ b/packages/connection/src/common/ws-channel.ts @@ -1,5 +1,3 @@ -import { Type } from '@furyjs/fury'; - import { EventEmitter } from '@opensumi/events'; import { DisposableCollection, @@ -9,52 +7,11 @@ import { randomString, } from '@opensumi/ide-core-common'; +import { ChannelMessage, ErrorMessageCode } from './channel/types'; import { IConnectionShape } from './connection/types'; -import { oneOf } from './fury-extends/one-of'; import { ISumiConnectionOptions, SumiConnection } from './rpc/connection'; import { ILogger } from './types'; -/** - * `ping` and `pong` are used to detect whether the connection is alive. - */ -export interface PingMessage { - kind: 'ping'; - id: string; - clientId: string; -} - -/** - * when server receive a `ping` message, it should reply a `pong` message, vice versa. - */ -export interface PongMessage { - kind: 'pong'; - id: string; - clientId: string; -} - -/** - * `data` message indicate that the channel has received some data. - * the `content` field is the data, it should be a string. - */ -export interface DataMessage { - kind: 'data'; - id: string; - content: string; -} - -export interface BinaryMessage { - kind: 'binary'; - id: string; - binary: Uint8Array; -} - -export interface CloseMessage { - kind: 'close'; - id: string; - code: number; - reason: string; -} - export interface IWSChannelCreateOptions { /** * every channel's unique id, it only used in client to server architecture. @@ -80,25 +37,23 @@ export class WSChannel { protected onBinaryQueue = this._disposables.add(new EventQueue()); - protected sendQueue: Uint8Array[] = []; + protected sendQueue: ChannelMessage[] = []; protected _isServerReady = false; protected _ensureServerReady: boolean | undefined; public id: string; public channelPath: string; + public clientId: string; + + protected LOG_TAG = '[WSChannel]'; logger: ILogger = console; - static forClient(connection: IConnectionShape, options: IWSChannelCreateOptions) { + static forClient(connection: IConnectionShape, options: IWSChannelCreateOptions) { const disposable = new DisposableCollection(); const channel = new WSChannel(connection, options); - - disposable.push( - connection.onMessage((data) => { - channel.dispatch(parse(data)); - }), - ); + disposable.push(channel.listen()); connection.onceClose(() => { disposable.dispose(); @@ -107,10 +62,10 @@ export class WSChannel { return channel; } - constructor(public connection: IConnectionShape, options: IWSChannelCreateOptions) { + constructor(public connection: IConnectionShape, options: IWSChannelCreateOptions) { const { id, logger, ensureServerReady } = options; this.id = id; - + this.LOG_TAG = `[WSChannel id=${this.id}]`; if (logger) { this.logger = logger; } @@ -120,7 +75,7 @@ export class WSChannel { this._disposables.add(this.emitter.on('binary', (data) => this.onBinaryQueue.push(data))); } - protected inqueue(data: Uint8Array) { + protected inqueue(data: ChannelMessage) { if (this._ensureServerReady && !this._isServerReady) { if (!this.sendQueue) { this.sendQueue = []; @@ -182,6 +137,20 @@ export class WSChannel { case 'binary': this.emitter.emit('binary', msg.binary); break; + case 'error': + this.logger.error(this.LOG_TAG, `receive error: id: ${msg.id}, code: ${msg.code}, error: ${msg.message}`); + switch (msg.code) { + case ErrorMessageCode.ChannelNotFound: + // 有 channelPath 说明该 channel 曾经被打开过 + // 重新打开 channel + if (this.channelPath) { + // 暂停消息发送直到 server-ready + this.pause(); + this.open(this.channelPath, this.clientId); + } + break; + } + break; } } @@ -192,6 +161,9 @@ export class WSChannel { */ open(path: string, clientId: string, connectionToken = randomString(16)) { this.channelPath = path; + this.clientId = clientId; + + this.LOG_TAG = `[WSChannel id=${this.id} path=${path}]`; if (this.stateTracer.has(connectionToken)) { this.logger.warn( @@ -202,15 +174,13 @@ export class WSChannel { this.stateTracer.record(connectionToken); - this.connection.send( - stringify({ - kind: 'open', - id: this.id, - path, - clientId, - connectionToken, - }), - ); + this.connection.send({ + kind: 'open', + id: this.id, + path, + clientId, + connectionToken, + }); if (this._ensureServerReady) { this.ensureOpenSend(path, clientId, connectionToken); @@ -237,23 +207,19 @@ export class WSChannel { } send(content: string) { - this.inqueue( - stringify({ - kind: 'data', - id: this.id, - content, - }), - ); + this.inqueue({ + kind: 'data', + id: this.id, + content, + }); } sendBinary(data: Uint8Array) { - this.inqueue( - stringify({ - kind: 'binary', - id: this.id, - binary: data, - }), - ); + this.inqueue({ + kind: 'binary', + id: this.id, + binary: data, + }); } onError() {} close(code?: number, reason?: string) { @@ -286,6 +252,12 @@ export class WSChannel { return conn; } + listen() { + return this.connection.onMessage((data) => { + this.dispatch(data); + }); + } + dispose() { if (this.timer) { clearTimeout(this.timer); @@ -293,6 +265,13 @@ export class WSChannel { this.sendQueue = []; this._disposables.dispose(); } + + ping() { + this.connection.send({ + kind: 'ping', + id: this.id, + }); + } } interface IWSServerChannelCreateOptions extends IWSChannelCreateOptions { @@ -306,18 +285,16 @@ export class WSServerChannel extends WSChannel { messageQueue: ChannelMessage[] = []; clientId: string; - constructor(public connection: IConnectionShape, options: IWSServerChannelCreateOptions) { + constructor(public connection: IConnectionShape, options: IWSServerChannelCreateOptions) { super(connection, options); this.clientId = options.clientId; } serverReady(token: string) { - this.connection.send( - stringify({ - kind: 'server-ready', - id: this.id, - token, - }), - ); + this.connection.send({ + kind: 'server-ready', + id: this.id, + token, + }); } dispatch(msg: ChannelMessage) { @@ -328,128 +305,12 @@ export class WSServerChannel extends WSChannel { case 'binary': this.emitter.emit('binary', msg.binary); break; + case 'ping': + this.connection.send({ + kind: 'pong', + id: this.id, + }); + break; } } } - -export type ChannelMessage = - | PingMessage - | PongMessage - | OpenMessage - | ServerReadyMessage - | DataMessage - | BinaryMessage - | CloseMessage - | ErrorMessage; - -export const PingProtocol = Type.object('ping', { - clientId: Type.string(), - id: Type.string(), -}); - -export const PongProtocol = Type.object('pong', { - clientId: Type.string(), - id: Type.string(), -}); - -/** - * `open` message is used to open a new channel. - * `path` is used to identify which handler should be used to handle the channel. - * `clientId` is used to identify the client. - */ -export interface OpenMessage { - kind: 'open'; - id: string; - path: string; - clientId: string; - connectionToken: string; -} - -export const OpenProtocol = Type.object('open', { - clientId: Type.string(), - id: Type.string(), - path: Type.string(), - connectionToken: Type.string(), -}); - -/** - * when server receive a `open` message, it should reply a `server-ready` message. - * this is indicate that the channel is ready to use. - */ -export interface ServerReadyMessage { - kind: 'server-ready'; - id: string; - token: string; -} - -export const ServerReadyProtocol = Type.object('server-ready', { - id: Type.string(), - token: Type.string(), -}); - -export enum ErrorMessageCode { - ChannelNotFound = 1, -} - -export interface ErrorMessage { - kind: 'error'; - id: string; - code: ErrorMessageCode; - message: string; -} - -export const ErrorProtocol = Type.object('error', { - id: Type.string(), - code: Type.uint16(), - message: Type.string(), -}); - -export const DataProtocol = Type.object('data', { - id: Type.string(), - content: Type.string(), -}); - -export const BinaryProtocol = Type.object('binary', { - id: Type.string(), - binary: Type.binary(), -}); - -export const CloseProtocol = Type.object('close', { - id: Type.string(), - code: Type.uint32(), - reason: Type.string(), -}); - -const serializer = oneOf([ - PingProtocol, - PongProtocol, - OpenProtocol, - ServerReadyProtocol, - DataProtocol, - BinaryProtocol, - CloseProtocol, - ErrorProtocol, -]); - -export function stringify(obj: ChannelMessage): Uint8Array { - return serializer.serialize(obj); -} - -export function parse(input: Uint8Array): ChannelMessage { - return serializer.deserialize(input) as any; -} - -const _pingMessage: PingMessage = { - kind: 'ping', - id: '', - clientId: '', -}; - -const _pongMessage: PongMessage = { - kind: 'pong', - id: '', - clientId: '', -}; - -export const pingMessage = stringify(_pingMessage); -export const pongMessage = stringify(_pongMessage); diff --git a/packages/core-browser/src/bootstrap/connection.ts b/packages/core-browser/src/bootstrap/connection.ts index 5c8330b581..1e0c8c66ef 100644 --- a/packages/core-browser/src/bootstrap/connection.ts +++ b/packages/core-browser/src/bootstrap/connection.ts @@ -1,5 +1,5 @@ import { Injector, Provider } from '@opensumi/di'; -import { RPCServiceCenter, WSChannel, initRPCService } from '@opensumi/ide-connection'; +import { ISerializer, RPCServiceCenter, WSChannel, initRPCService } from '@opensumi/ide-connection'; import { WSChannelHandler } from '@opensumi/ide-connection/lib/browser'; import { IRuntimeSocketConnection } from '@opensumi/ide-connection/lib/common/connection'; import { RPCServiceChannelPath } from '@opensumi/ide-connection/lib/common/server-handler'; @@ -28,12 +28,15 @@ export async function createConnectionService( onReconnect: () => void, connection: IRuntimeSocketConnection, clientId: string, + serializer?: ISerializer, ) { const reporterService: IReporterService = injector.get(IReporterService); const eventBus = injector.get(IEventBus); const stateService = injector.get(ClientAppStateService); - const channelHandler = new WSChannelHandler(connection, initialLogger, clientId); + const channelHandler = new WSChannelHandler(connection, initialLogger, clientId, { + serializer, + }); channelHandler.setReporter(reporterService); const onOpen = () => { From 80d609e1d167ceb45372c55ffd98c79ae8ee7af8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=87=8E=E5=A3=B0?= Date: Thu, 23 May 2024 13:49:37 +0800 Subject: [PATCH 02/14] feat: add raw serialier --- packages/connection/src/common/serializer/raw.ts | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 packages/connection/src/common/serializer/raw.ts diff --git a/packages/connection/src/common/serializer/raw.ts b/packages/connection/src/common/serializer/raw.ts new file mode 100644 index 0000000000..e982de6440 --- /dev/null +++ b/packages/connection/src/common/serializer/raw.ts @@ -0,0 +1,8 @@ +import { ChannelMessage } from '../channel'; + +import { ISerializer } from './types'; + +export const rawSerializer: ISerializer = { + serialize: (message) => message, + deserialize: (message) => message, +}; From e1dd2002d12da904a0111cddac77af1abce721b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=87=8E=E5=A3=B0?= Date: Wed, 29 May 2024 11:59:40 +0800 Subject: [PATCH 03/14] feat: can use custom message io --- .../src/common/rpc-service/registry.ts | 12 +- .../connection/src/common/rpc/connection.ts | 50 +++---- .../connection/src/common/rpc/message-io.ts | 133 ++++++++++++++++-- .../connection/src/common/rpc/multiplexer.ts | 4 +- packages/connection/src/common/ws-channel.ts | 7 +- .../core-browser/src/bootstrap/connection.ts | 10 +- .../__tests__/browser/mock.service.ts | 3 +- 7 files changed, 166 insertions(+), 53 deletions(-) diff --git a/packages/connection/src/common/rpc-service/registry.ts b/packages/connection/src/common/rpc-service/registry.ts index c6b269a466..aecb43f667 100644 --- a/packages/connection/src/common/rpc-service/registry.ts +++ b/packages/connection/src/common/rpc-service/registry.ts @@ -1,6 +1,6 @@ import { DisposableStore, Emitter, IDisposable } from '@opensumi/ide-core-common'; -import { MessageIO, TSumiProtocol, TSumiProtocolMethod } from '../rpc'; +import { BaseMessageIO, TSumiProtocol, TSumiProtocolMethod } from '../rpc'; import { RPCServiceMethod } from '../types'; const skipMethods = new Set(['constructor']); @@ -119,13 +119,21 @@ export class ProtocolRegistry { this.emitter.fire(serviceNames); } - applyTo(io: MessageIO) { + applyTo(io: BaseMessageIO) { + if (!io.loadProtocolMethod) { + return; + } + for (const protocol of this.protocolMap.values()) { io.loadProtocolMethod(protocol); } this._disposables.add( this.onProtocolUpdate((methods) => { + if (!io.loadProtocolMethod) { + return; + } + for (const method of methods) { const protocol = this.protocolMap.get(method); if (protocol) { diff --git a/packages/connection/src/common/rpc/connection.ts b/packages/connection/src/common/rpc/connection.ts index 71931ff1df..00a68947d7 100644 --- a/packages/connection/src/common/rpc/connection.ts +++ b/packages/connection/src/common/rpc/connection.ts @@ -5,7 +5,6 @@ import { DisposableStore, IDisposable, canceled, - parseError, } from '@opensumi/ide-utils'; import { SumiReadableStream, isReadableStream, listenReadable } from '@opensumi/ide-utils/lib/stream'; @@ -15,9 +14,8 @@ import { METHOD_NOT_REGISTERED } from '../constants'; import { ILogger } from '../types'; import { MethodTimeoutError } from './errors'; -import { MessageIO, OperationType, Status } from './message-io'; +import { BaseMessageIO, MessageIO, OperationType, RPCErrorMessage, RPCResponseMessage } from './message-io'; import { - IRequestHeaders, IResponseHeaders, TGenericNotificationHandler, TGenericRequestHandler, @@ -38,6 +36,8 @@ export interface ISumiConnectionOptions { * The name of the connection, used for debugging(and can see in opensumi-devtools). */ name?: string; + + io?: BaseMessageIO; } const chunkedResponseHeaders: IResponseHeaders = { @@ -61,7 +61,7 @@ export class SumiConnection implements IDisposable { protected activeRequestPool = new Map>(); - public io = new MessageIO(); + public io: BaseMessageIO; protected logger: ILogger; protected capturer: Capturer; @@ -73,6 +73,8 @@ export class SumiConnection implements IDisposable { this.logger = getDebugLogger(); } + this.io = options.io || new MessageIO(); + this.capturer = new Capturer(options.name || 'sumi'); this.disposable.add(this.capturer); } @@ -193,35 +195,28 @@ export class SumiConnection implements IDisposable { } listen() { - const { reader } = this.io; - this.disposable.add( this.socket.onMessage((data) => { - reader.reset(data); - // skip version, currently only have version 1 - reader.skip(1); - - const opType = reader.uint8() as OperationType; - const requestId = reader.uint32(); + const message = this.io.readMessage(data); + const opType = message.kind; + const requestId = message.requestId; if (this._timeoutHandles.has(requestId)) { - // Ignore some jest test scenarios where clearTimeout is not defined. - if (typeof clearTimeout === 'function') { - // @ts-ignore - clearTimeout(this._timeoutHandles.get(requestId)); - } + clearTimeout(this._timeoutHandles.get(requestId)); this._timeoutHandles.delete(requestId); } switch (opType) { + case OperationType.Error: case OperationType.Response: { - const method = reader.stringOfVarUInt32(); - const status = reader.uint16(); + const { headers, method } = message; + const err = (message as RPCErrorMessage).error; + const result = (message as RPCResponseMessage).result; const runCallback = (headers: IResponseHeaders, error?: any, result?: any) => { const callback = this._callbacks.get(requestId); if (!callback) { - this.logger.error(`Cannot find callback for request ${requestId}: ${method}, status: ${status}`); + this.logger.error(`Cannot find callback for request ${requestId}: ${method}`); return; } @@ -230,17 +225,6 @@ export class SumiConnection implements IDisposable { callback(headers, error, result); }; - const headers = this.io.responseHeadersSerializer.read(); - let err: any; - let result: any; - if (status === Status.Err) { - // todo: move to processor - const content = reader.stringOfVarUInt32(); - err = parseError(content); - } else { - result = this.io.getProcessor(method).readResponse(); - } - if (headers && headers.chunked) { let activeReq: SumiReadableStream; if (this.activeRequestPool.has(requestId)) { @@ -275,9 +259,7 @@ export class SumiConnection implements IDisposable { case OperationType.Notification: // fall through case OperationType.Request: { - const method = reader.stringOfVarUInt32(); - const headers = this.io.requestHeadersSerializer.read() as IRequestHeaders; - const args = this.io.getProcessor(method).readRequest(); + const { method, headers, args } = message; if (headers.cancelable) { const tokenSource = new CancellationTokenSource(); diff --git a/packages/connection/src/common/rpc/message-io.ts b/packages/connection/src/common/rpc/message-io.ts index 4b348d304d..80d02e961f 100644 --- a/packages/connection/src/common/rpc/message-io.ts +++ b/packages/connection/src/common/rpc/message-io.ts @@ -1,8 +1,9 @@ import Fury, { Serializer, Type, TypeDescription } from '@furyjs/fury'; import { generateSerializer } from '@furyjs/fury/dist/lib/gen'; +import { PlatformBuffer } from '@furyjs/fury/dist/lib/platformBuffer'; import { BinaryReader, BinaryWriter } from '@furyjs/fury/dist/lib/type'; -import { stringifyError } from '@opensumi/ide-core-common/lib/utils'; +import { parseError, stringifyError } from '@opensumi/ide-core-common/lib/utils'; import { AnySerializer, IObjectTransfer } from '../fury-extends/any'; import { furyFactory } from '../fury-extends/shared'; @@ -24,11 +25,7 @@ export enum OperationType { Notification, Response, Cancel, -} - -export enum Status { - OK, - Err, + Error, } export const HeadersProto = { @@ -45,6 +42,7 @@ const PacketPrefix = { Notification: (OperationType.Notification << 8) | ProtoVersionV1, Response: (OperationType.Response << 8) | ProtoVersionV1, Cancel: (OperationType.Cancel << 8) | ProtoVersionV1, + Error: (OperationType.Error << 8) | ProtoVersionV1, } as const; class SumiProtocolSerializer implements IProtocolSerializer { @@ -105,7 +103,66 @@ export class AnyProtocolSerializer implements IProtocolSerializer { } } -export class MessageIO { +export interface RPCRequestMessage { + kind: OperationType.Request; + requestId: number; + method: string; + headers: IRequestHeaders; + args: any[]; +} + +export interface RPCNotificationMessage { + kind: OperationType.Notification; + requestId: number; + method: string; + headers: IRequestHeaders; + args: any[]; +} + +export interface RPCResponseMessage { + kind: OperationType.Response; + requestId: number; + method: string; + headers: Record; + result: any; +} + +export interface RPCErrorMessage { + kind: OperationType.Error; + requestId: number; + method: string; + headers: Record; + error: any; +} + +export interface RPCCancelMessage { + kind: OperationType.Cancel; + requestId: number; +} + +export type RPCMessage = + | RPCRequestMessage + | RPCNotificationMessage + | RPCResponseMessage + | RPCErrorMessage + | RPCCancelMessage; + +export abstract class BaseMessageIO { + abstract loadProtocolMethod?( + methodProtocol: TSumiProtocolMethod, + options?: { nameConverter?: (str: string) => string }, + ): void; + + abstract Request(requestId: number, method: string, headers: IRequestHeaders, args: any[]): T; + abstract Notification(requestId: number, method: string, headers: IRequestHeaders, args: any[]): T; + abstract Cancel(requestId: number): T; + abstract Response(requestId: number, method: string, headers: Record, result: any): T; + abstract Error(requestId: number, method: string, headers: Record, error: any): T; + + abstract readMessage(data: T): RPCMessage; +} + +export class MessageIO extends BaseMessageIO { fury: Fury; reader: BinaryReader; writer: BinaryWriter; @@ -118,6 +175,7 @@ export class MessageIO { responseHeadersSerializer: Serializer; constructor() { + super(); const fury = furyFactory(); this.fury = fury.fury; this.reader = fury.reader; @@ -216,7 +274,6 @@ export class MessageIO { writer.uint16(PacketPrefix.Response); writer.uint32(requestId); writer.stringOfVarUInt32(method); - writer.uint16(Status.OK); this.responseHeadersSerializer.write(headers); this.getProcessor(method).writeResponse(result); @@ -227,13 +284,69 @@ export class MessageIO { const { writer } = this; writer.reset(); - writer.uint16(PacketPrefix.Response); + writer.uint16(PacketPrefix.Error); writer.uint32(requestId); writer.stringOfVarUInt32(method); - writer.uint16(Status.Err); this.responseHeadersSerializer.write(headers); writer.stringOfVarUInt32(stringifyError(error)); return writer.dump(); } + + readMessage(data: PlatformBuffer): RPCMessage { + const { reader } = this; + reader.reset(data); + + // skip version, currently only have version 1 + reader.skip(1); + const opType = reader.uint8() as OperationType; + const requestId = reader.uint32(); + + switch (opType) { + case OperationType.Request: + case OperationType.Notification: { + const method = reader.stringOfVarUInt32(); + const headers = this.requestHeadersSerializer.read() as IRequestHeaders; + const args = this.getProcessor(method).readRequest(); + return { + kind: opType, + requestId, + method, + headers, + args, + }; + } + case OperationType.Error: { + const method = reader.stringOfVarUInt32(); + const headers = this.responseHeadersSerializer.read() as IResponseHeaders; + const error = parseError(reader.stringOfVarUInt32()); + return { + kind: OperationType.Error, + requestId, + method, + headers, + error, + }; + } + case OperationType.Response: { + const method = reader.stringOfVarUInt32(); + const headers = this.responseHeadersSerializer.read() as IResponseHeaders; + const result = this.getProcessor(method).readResponse(); + return { + kind: OperationType.Response, + requestId, + method, + headers, + result, + }; + } + case OperationType.Cancel: + return { + kind: OperationType.Cancel, + requestId, + }; + default: + throw new Error(`Unknown message type: ${opType}`); + } + } } diff --git a/packages/connection/src/common/rpc/multiplexer.ts b/packages/connection/src/common/rpc/multiplexer.ts index 13416decc7..89b3a704f8 100644 --- a/packages/connection/src/common/rpc/multiplexer.ts +++ b/packages/connection/src/common/rpc/multiplexer.ts @@ -2,7 +2,7 @@ import { BaseConnection } from '../connection'; import { ExtObjectTransfer } from '../fury-extends/any'; import { ISumiConnectionOptions, SumiConnection } from './connection'; -import { AnyProtocolSerializer } from './message-io'; +import { AnyProtocolSerializer, MessageIO } from './message-io'; import { TSumiProtocol } from './types'; export class ProxyIdentifier { @@ -57,12 +57,14 @@ export class SumiConnectionMultiplexer extends SumiConnection implements IRPCPro protected readonly _locals: Map; protected readonly _proxies: Map; protected _knownProtocols: Record; + io: MessageIO; constructor(protected socket: BaseConnection, protected options: ISumiMultiplexerConnectionOptions = {}) { super(socket, options); this._locals = new Map(); this._proxies = new Map(); this._knownProtocols = options.knownProtocols || {}; + this.io = new MessageIO(); this.io.setAnySerializer(new AnyProtocolSerializer(this.io.writer, this.io.reader, ExtObjectTransfer)); this.onRequestNotFound((rpcName: string, args: any[]) => this.invoke(rpcName, args)); diff --git a/packages/connection/src/common/ws-channel.ts b/packages/connection/src/common/ws-channel.ts index cfdb1de022..42b4953df6 100644 --- a/packages/connection/src/common/ws-channel.ts +++ b/packages/connection/src/common/ws-channel.ts @@ -10,6 +10,7 @@ import { import { ChannelMessage, ErrorMessageCode } from './channel/types'; import { IConnectionShape } from './connection/types'; import { ISumiConnectionOptions, SumiConnection } from './rpc/connection'; +import { furySerializer, wrapSerializer } from './serializer'; import { ILogger } from './types'; export interface IWSChannelCreateOptions { @@ -50,9 +51,11 @@ export class WSChannel { logger: ILogger = console; - static forClient(connection: IConnectionShape, options: IWSChannelCreateOptions) { + static forClient(connection: IConnectionShape, options: IWSChannelCreateOptions) { const disposable = new DisposableCollection(); - const channel = new WSChannel(connection, options); + + const wrappedConnection = wrapSerializer(connection, furySerializer); + const channel = new WSChannel(wrappedConnection, options); disposable.push(channel.listen()); connection.onceClose(() => { diff --git a/packages/core-browser/src/bootstrap/connection.ts b/packages/core-browser/src/bootstrap/connection.ts index 1e0c8c66ef..db517f8a2a 100644 --- a/packages/core-browser/src/bootstrap/connection.ts +++ b/packages/core-browser/src/bootstrap/connection.ts @@ -2,6 +2,7 @@ import { Injector, Provider } from '@opensumi/di'; import { ISerializer, RPCServiceCenter, WSChannel, initRPCService } from '@opensumi/ide-connection'; import { WSChannelHandler } from '@opensumi/ide-connection/lib/browser'; import { IRuntimeSocketConnection } from '@opensumi/ide-connection/lib/common/connection'; +import { ISumiConnectionOptions } from '@opensumi/ide-connection/lib/common/rpc/connection'; import { RPCServiceChannelPath } from '@opensumi/ide-connection/lib/common/server-handler'; import { BasicModule, @@ -79,9 +80,14 @@ export async function createConnectionService( bindConnectionService(injector, modules, channel); } -export function bindConnectionService(injector: Injector, modules: ModuleConstructor[], channel: WSChannel) { +export function bindConnectionService( + injector: Injector, + modules: ModuleConstructor[], + channel: WSChannel, + options: ISumiConnectionOptions = {}, +) { const clientCenter = new RPCServiceCenter(); - const disposable = clientCenter.setSumiConnection(channel.createSumiConnection()); + const disposable = clientCenter.setSumiConnection(channel.createSumiConnection(options)); initConnectionService(injector, modules, clientCenter); return disposable; } diff --git a/packages/terminal-next/__tests__/browser/mock.service.ts b/packages/terminal-next/__tests__/browser/mock.service.ts index 51b3bee824..3de8cefbb9 100644 --- a/packages/terminal-next/__tests__/browser/mock.service.ts +++ b/packages/terminal-next/__tests__/browser/mock.service.ts @@ -4,8 +4,7 @@ import { Terminal } from 'xterm'; import { Injectable } from '@opensumi/di'; import { WSChannel } from '@opensumi/ide-connection'; import { WSWebSocketConnection } from '@opensumi/ide-connection/lib/common/connection'; -import { Disposable, PreferenceProvider, PreferenceResolveResult } from '@opensumi/ide-core-browser'; -import { PreferenceService } from '@opensumi/ide-core-browser'; +import { Disposable, PreferenceProvider, PreferenceResolveResult, PreferenceService } from '@opensumi/ide-core-browser'; import { Deferred, Emitter, IDisposable, OperatingSystem, PreferenceScope, URI, uuid } from '@opensumi/ide-core-common'; import { Color, RGBA } from '@opensumi/ide-theme/lib/common/color'; From 6982dc4d352d14b417ee63810c89037285c359df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=87=8E=E5=A3=B0?= Date: Wed, 29 May 2024 13:50:21 +0800 Subject: [PATCH 04/14] refactor: update code --- packages/connection/src/common/index.ts | 1 + packages/connection/src/common/rpc-service/index.ts | 1 + packages/connection/src/common/ws-channel.ts | 3 ++- .../core-browser/src/application/runtime/base-socket.ts | 2 -- packages/core-browser/src/bootstrap/app.ts | 9 ++------- 5 files changed, 6 insertions(+), 10 deletions(-) diff --git a/packages/connection/src/common/index.ts b/packages/connection/src/common/index.ts index ca54cac91a..5268f510d4 100644 --- a/packages/connection/src/common/index.ts +++ b/packages/connection/src/common/index.ts @@ -1,4 +1,5 @@ export * from './rpc-service/proxy'; +export * from './rpc-service'; export * from './rpc/multiplexer'; export * from './rpcProtocol'; export * from './capturer'; diff --git a/packages/connection/src/common/rpc-service/index.ts b/packages/connection/src/common/rpc-service/index.ts index 2cd528f81f..138fe2d873 100644 --- a/packages/connection/src/common/rpc-service/index.ts +++ b/packages/connection/src/common/rpc-service/index.ts @@ -1,5 +1,6 @@ export * from './stub'; export * from './center'; +export * from './registry'; export abstract class RPCService { rpcClient?: T[]; diff --git a/packages/connection/src/common/ws-channel.ts b/packages/connection/src/common/ws-channel.ts index 42b4953df6..94b7a556b8 100644 --- a/packages/connection/src/common/ws-channel.ts +++ b/packages/connection/src/common/ws-channel.ts @@ -147,7 +147,7 @@ export class WSChannel { // 有 channelPath 说明该 channel 曾经被打开过 // 重新打开 channel if (this.channelPath) { - // 暂停消息发送直到 server-ready + // 暂停消息发送, 直到收到 server-ready this.pause(); this.open(this.channelPath, this.clientId); } @@ -292,6 +292,7 @@ export class WSServerChannel extends WSChannel { super(connection, options); this.clientId = options.clientId; } + serverReady(token: string) { this.connection.send({ kind: 'server-ready', diff --git a/packages/core-browser/src/application/runtime/base-socket.ts b/packages/core-browser/src/application/runtime/base-socket.ts index 3d616c15f9..8b48344acd 100644 --- a/packages/core-browser/src/application/runtime/base-socket.ts +++ b/packages/core-browser/src/application/runtime/base-socket.ts @@ -5,5 +5,3 @@ export abstract class BaseConnectionHelper { abstract createConnection(): IRuntimeSocketConnection; } - -export const CONNECTION_HELPER_TOKEN = Symbol('CONNECTION_HELPER_TOKEN'); diff --git a/packages/core-browser/src/bootstrap/app.ts b/packages/core-browser/src/bootstrap/app.ts index 7d38d09fcd..a11154afe2 100644 --- a/packages/core-browser/src/bootstrap/app.ts +++ b/packages/core-browser/src/bootstrap/app.ts @@ -46,7 +46,7 @@ import { IElectronMainLifeCycleService } from '@opensumi/ide-core-common/lib/ele import { ClientAppStateService } from '../application'; import { ESupportRuntime, ElectronConnectionHelper, WebConnectionHelper } from '../application/runtime'; -import { CONNECTION_HELPER_TOKEN } from '../application/runtime/base-socket'; +import { BaseConnectionHelper } from '../application/runtime/base-socket'; import { BrowserRuntime } from '../application/runtime/browser'; import { ElectronRendererRuntime } from '../application/runtime/electron-renderer'; import { RendererRuntime } from '../application/runtime/types'; @@ -252,7 +252,7 @@ export class ClientApp implements IClientApp, IDisposable { } protected async createConnection(type: `${ESupportRuntime}`) { - let connectionHelper: ElectronConnectionHelper | WebConnectionHelper; + let connectionHelper: BaseConnectionHelper; switch (type) { case ESupportRuntime.Electron: @@ -270,11 +270,6 @@ export class ClientApp implements IClientApp, IDisposable { throw new Error(`Unknown backend type: ${type}`); } - this.injector.addProviders({ - token: CONNECTION_HELPER_TOKEN, - useValue: connectionHelper, - }); - const connection: IRuntimeSocketConnection = connectionHelper.createConnection(); const clientId: string = this.config.clientId ?? connectionHelper.getDefaultClientId(); From 9dde139af0f26309f70aa0b9b21e009e1d61a737 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=87=8E=E5=A3=B0?= Date: Wed, 29 May 2024 13:54:13 +0800 Subject: [PATCH 05/14] test: fix testcase --- .../__test__/common/message-io.test.ts | 20 ++++++------------- packages/connection/src/common/index.ts | 1 + 2 files changed, 7 insertions(+), 14 deletions(-) diff --git a/packages/connection/__test__/common/message-io.test.ts b/packages/connection/__test__/common/message-io.test.ts index 3df4614a3d..cf14da58b7 100644 --- a/packages/connection/__test__/common/message-io.test.ts +++ b/packages/connection/__test__/common/message-io.test.ts @@ -1,4 +1,4 @@ -import { MessageIO } from '@opensumi/ide-connection/lib/common/rpc'; +import { MessageIO, RPCResponseMessage } from '@opensumi/ide-connection/lib/common/rpc'; import { protocols } from './rpc/utils'; @@ -30,23 +30,15 @@ describe('message io', () => { const buf = repo.Response(0, protocols.add.protocol.method, {}, 3); expect(buf.byteLength).toBeGreaterThan(20); - repo.reader.reset(buf); - // version + op type + id - repo.reader.skip(1 + 1 + 4); - // method - const method = repo.reader.stringOfVarUInt32(); + const response = repo.readMessage(buf) as RPCResponseMessage; + + const { method, headers, result } = response; + expect(method).toBe(protocols.add.protocol.method); - // status - const status = repo.reader.uint16(); - expect(status).toBe(0); - // headers - const headers = repo.responseHeadersSerializer.read(); expect(headers).toEqual({ chunked: null, }); - // response - const response = repo.getProcessor(protocols.add.protocol.method).readResponse(); - expect(response).toEqual(3); + expect(result).toEqual(3); const buf2 = repo.Response(0, 'any1', {}, null); expect(buf2.byteLength).toBeGreaterThan(20); diff --git a/packages/connection/src/common/index.ts b/packages/connection/src/common/index.ts index 5268f510d4..ffb8f654a9 100644 --- a/packages/connection/src/common/index.ts +++ b/packages/connection/src/common/index.ts @@ -8,3 +8,4 @@ export * from './connect'; export * from './types'; export * from './connection'; export * from './serializer'; +export * from './channel'; From 215fabb1f99e60170cc8ad3d9b700378a6765776 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=87=8E=E5=A3=B0?= Date: Wed, 29 May 2024 13:56:29 +0800 Subject: [PATCH 06/14] fix: avoid recreate message io --- packages/connection/src/common/rpc/multiplexer.ts | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/packages/connection/src/common/rpc/multiplexer.ts b/packages/connection/src/common/rpc/multiplexer.ts index 89b3a704f8..69d02b57ac 100644 --- a/packages/connection/src/common/rpc/multiplexer.ts +++ b/packages/connection/src/common/rpc/multiplexer.ts @@ -34,6 +34,12 @@ export interface IRPCProtocol { get(identifier: ProxyIdentifier): T; } +function createExtMessageIO() { + const io = new MessageIO(); + io.setAnySerializer(new AnyProtocolSerializer(io.writer, io.reader, ExtObjectTransfer)); + return io; +} + /** * A connection multiplexer that allows to register multiple local RPC services and to create proxies for them. */ @@ -60,12 +66,13 @@ export class SumiConnectionMultiplexer extends SumiConnection implements IRPCPro io: MessageIO; constructor(protected socket: BaseConnection, protected options: ISumiMultiplexerConnectionOptions = {}) { - super(socket, options); + super(socket, { + ...options, + io: createExtMessageIO(), + }); this._locals = new Map(); this._proxies = new Map(); this._knownProtocols = options.knownProtocols || {}; - this.io = new MessageIO(); - this.io.setAnySerializer(new AnyProtocolSerializer(this.io.writer, this.io.reader, ExtObjectTransfer)); this.onRequestNotFound((rpcName: string, args: any[]) => this.invoke(rpcName, args)); From 958198f871ee5e9a14558863e821508fcba2ef08 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=87=8E=E5=A3=B0?= Date: Wed, 29 May 2024 14:55:43 +0800 Subject: [PATCH 07/14] test: fix testcase --- packages/connection/__test__/browser/index.test.ts | 6 ------ 1 file changed, 6 deletions(-) diff --git a/packages/connection/__test__/browser/index.test.ts b/packages/connection/__test__/browser/index.test.ts index 786f8288d1..a359385ad8 100644 --- a/packages/connection/__test__/browser/index.test.ts +++ b/packages/connection/__test__/browser/index.test.ts @@ -17,7 +17,6 @@ describe('connection browser', () => { const fakeWSURL = `ws://127.0.0.1:${randomPort}`; const mockServer = new Server(fakeWSURL); - let receivedHeartbeat = false; let data1Received = false; let data2Received = false; @@ -40,8 +39,6 @@ describe('connection browser', () => { if (data === 'data2') { data2Received = true; } - } else if (msgObj.kind === 'ping') { - receivedHeartbeat = true; } }); }); @@ -53,9 +50,6 @@ describe('connection browser', () => { ); await wsChannelHandler.initHandler(); - await sleep(11000); - expect(receivedHeartbeat).toBe(true); - receivedHeartbeat = false; const channel = await wsChannelHandler.openChannel('test'); From ec290850530354148bb2e2ac6d9126c701a7e337 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=87=8E=E5=A3=B0?= Date: Wed, 29 May 2024 16:06:43 +0800 Subject: [PATCH 08/14] refactor: use connection factory --- .../connection/__test__/browser/index.test.ts | 1 - .../__test__/node/channel-handler.test.ts | 2 +- .../src/browser/ws-channel-handler.ts | 12 ++--- .../__tests__/bootstrap/connection.test.ts | 14 ++---- .../src/application/runtime/base-socket.ts | 32 +++++++++++++ .../src/application/runtime/index.ts | 38 +++++++++++++++ .../src/bootstrap/app.interface.ts | 6 +-- packages/core-browser/src/bootstrap/app.ts | 47 ++++--------------- .../core-browser/src/bootstrap/connection.ts | 34 ++++---------- .../src/bootstrap/inner-providers.ts | 5 ++ .../src/react-providers/config-provider.tsx | 5 ++ 11 files changed, 111 insertions(+), 85 deletions(-) diff --git a/packages/connection/__test__/browser/index.test.ts b/packages/connection/__test__/browser/index.test.ts index a359385ad8..90cf2bc0fa 100644 --- a/packages/connection/__test__/browser/index.test.ts +++ b/packages/connection/__test__/browser/index.test.ts @@ -45,7 +45,6 @@ describe('connection browser', () => { const wsChannelHandler = new WSChannelHandler( ReconnectingWebSocketConnection.forURL(fakeWSURL), - console, 'test-client-id', ); diff --git a/packages/connection/__test__/node/channel-handler.test.ts b/packages/connection/__test__/node/channel-handler.test.ts index f6a692ee70..83620a6ea4 100644 --- a/packages/connection/__test__/node/channel-handler.test.ts +++ b/packages/connection/__test__/node/channel-handler.test.ts @@ -47,7 +47,7 @@ describe('channel handler', () => { const socket = new net.Socket(); socket.connect(ipcPath); const connection = new NetSocketConnection(socket); - const browserChannel = new WSChannelHandler(connection, console, clientId); + const browserChannel = new WSChannelHandler(connection, clientId); await browserChannel.initHandler(); diff --git a/packages/connection/src/browser/ws-channel-handler.ts b/packages/connection/src/browser/ws-channel-handler.ts index f3526ae4d9..476a79c3de 100644 --- a/packages/connection/src/browser/ws-channel-handler.ts +++ b/packages/connection/src/browser/ws-channel-handler.ts @@ -5,11 +5,12 @@ import { ChannelMessage } from '../common/channel/types'; import { IRuntimeSocketConnection } from '../common/connection'; import { IConnectionShape } from '../common/connection/types'; import { ISerializer, furySerializer, wrapSerializer } from '../common/serializer'; -import { ConnectionInfo, WSCloseInfo } from '../common/types'; +import { ConnectionInfo, ILogger, WSCloseInfo } from '../common/types'; import { WSChannel } from '../common/ws-channel'; export interface WSChannelHandlerOptions { - serializer?: ISerializer; + logger?: ILogger; + channelSerializer?: ISerializer; } /** @@ -27,7 +28,7 @@ export class WSChannelHandler { private channelMap: Map = new Map(); private channelCloseEventMap = new MultiMap(); - private logger = console; + private logger: ILogger = console; public clientId: string; private heartbeatMessageTimer: NodeJS.Timeout | null; private reporterService: IReporterService; @@ -41,14 +42,13 @@ export class WSChannelHandler { constructor( public connection: IRuntimeSocketConnection, - logger: any, clientId: string, options: WSChannelHandlerOptions = {}, ) { - this.logger = logger || this.logger; + this.logger = options.logger || this.logger; this.clientId = clientId; this.LOG_TAG = `[WSChannelHandler] [client-id:${this.clientId}]`; - const serializer = options.serializer || furySerializer; + const serializer = options.channelSerializer || furySerializer; this.wrappedConnection = wrapSerializer(this.connection, serializer); } // 为解决建立连接之后,替换成可落盘的 logger diff --git a/packages/core-browser/__tests__/bootstrap/connection.test.ts b/packages/core-browser/__tests__/bootstrap/connection.test.ts index d04b812933..b87b9da2ac 100644 --- a/packages/core-browser/__tests__/bootstrap/connection.test.ts +++ b/packages/core-browser/__tests__/bootstrap/connection.test.ts @@ -1,9 +1,10 @@ +import { WSChannelHandler } from '@opensumi/ide-connection/lib/browser'; import { ReconnectingWebSocketConnection } from '@opensumi/ide-connection/lib/common/connection/drivers/reconnecting-websocket'; import { BrowserConnectionErrorEvent, IEventBus } from '@opensumi/ide-core-common'; +import { createBrowserInjector } from '@opensumi/ide-dev-tool/src/injector-helper'; +import { MockInjector } from '@opensumi/ide-dev-tool/src/mock-injector'; import { Server, WebSocket } from '@opensumi/mock-socket'; -import { createBrowserInjector } from '../../../../tools/dev-tool/src/injector-helper'; -import { MockInjector } from '../../../../tools/dev-tool/src/mock-injector'; import { ClientAppStateService } from '../../src/application'; import { createConnectionService } from '../../src/bootstrap/connection'; (global as any).WebSocket = WebSocket; @@ -30,13 +31,8 @@ describe('packages/core-browser/src/bootstrap/connection.test.ts', () => { done(); }); stateService = injector.get(ClientAppStateService); - createConnectionService( - injector, - [], - () => {}, - ReconnectingWebSocketConnection.forURL(fakeWSURL), - 'test-client-id', - ); + const channelHandler = new WSChannelHandler(ReconnectingWebSocketConnection.forURL(fakeWSURL), 'test-client-id'); + createConnectionService(injector, [], channelHandler); stateService.state = 'core_module_initialized'; new Promise((resolve) => { setTimeout(() => { diff --git a/packages/core-browser/src/application/runtime/base-socket.ts b/packages/core-browser/src/application/runtime/base-socket.ts index 8b48344acd..f4f24adf23 100644 --- a/packages/core-browser/src/application/runtime/base-socket.ts +++ b/packages/core-browser/src/application/runtime/base-socket.ts @@ -1,7 +1,39 @@ +import { Autowired, INJECTOR_TOKEN, Injectable, Injector } from '@opensumi/di'; +import { ISerializer, WSChannel } from '@opensumi/ide-connection'; +import { WSChannelHandler } from '@opensumi/ide-connection/lib/browser'; import { IRuntimeSocketConnection } from '@opensumi/ide-connection/lib/common/connection'; +import { IReporterService, getDebugLogger } from '@opensumi/ide-core-common'; +import { ModuleConstructor, createConnectionService } from '../../bootstrap'; +import { AppConfig } from '../../react-providers'; + +const initialLogger = getDebugLogger(); + +@Injectable({ multiple: true }) export abstract class BaseConnectionHelper { + @Autowired(INJECTOR_TOKEN) + protected injector: Injector; + + @Autowired(AppConfig) + protected appConfig: AppConfig; + + @Autowired(IReporterService) + reporterService: IReporterService; + + protected channelSerializer: ISerializer | undefined = undefined; + abstract getDefaultClientId(): string; abstract createConnection(): IRuntimeSocketConnection; + + async createRPCServiceChannel(modules: ModuleConstructor[]): Promise { + const connection: IRuntimeSocketConnection = this.createConnection(); + const clientId: string = this.appConfig.clientId ?? this.getDefaultClientId(); + const channelHandler = new WSChannelHandler(connection, clientId, { + channelSerializer: this.channelSerializer, + logger: initialLogger, + }); + + return createConnectionService(this.injector, modules, channelHandler); + } } diff --git a/packages/core-browser/src/application/runtime/index.ts b/packages/core-browser/src/application/runtime/index.ts index 51101c0b63..c33821ccf2 100644 --- a/packages/core-browser/src/application/runtime/index.ts +++ b/packages/core-browser/src/application/runtime/index.ts @@ -1,4 +1,42 @@ +import { Injector } from '@opensumi/di'; + +import { AppConfig } from '../../react-providers'; + +import { BaseConnectionHelper } from './base-socket'; +import { WebConnectionHelper } from './browser/socket'; +import { ESupportRuntime } from './constants'; +import { ElectronConnectionHelper } from './electron-renderer/socket'; + export * from './browser/socket'; export * from './electron-renderer/socket'; +export * from './base-socket'; export * from './constants'; + +export type ConnectionHelperFactory = ReturnType; +export function ConnectionHelperFactory(injector: Injector) { + return (type: string) => { + const appConfig = injector.get(AppConfig) as AppConfig; + + let connectionHelper: BaseConnectionHelper; + + switch (type) { + case ESupportRuntime.Electron: + connectionHelper = injector.get(ElectronConnectionHelper); + break; + case ESupportRuntime.Web: + connectionHelper = injector.get(WebConnectionHelper, [ + { + connectionPath: appConfig.connectionPath, + connectionProtocols: appConfig.connectionProtocols, + }, + ]); + break; + default: { + throw new Error(`Unknown backend type: ${type}`); + } + } + + return connectionHelper; + }; +} diff --git a/packages/core-browser/src/bootstrap/app.interface.ts b/packages/core-browser/src/bootstrap/app.interface.ts index 7a485d7238..456130f00b 100644 --- a/packages/core-browser/src/bootstrap/app.interface.ts +++ b/packages/core-browser/src/bootstrap/app.interface.ts @@ -1,5 +1,4 @@ import { ConstructorOf } from '@opensumi/di'; -import { UrlProvider } from '@opensumi/ide-core-common'; import { BrowserModule } from '../browser-module'; import { ClientAppContribution } from '../common/common.define'; @@ -31,10 +30,7 @@ export interface IClientAppOpts extends Partial { contributions?: ContributionConstructor[]; // 前端模块实例声明 modulesInstances?: BrowserModule[]; - // 自定义前后端通信路径 - connectionPath?: UrlProvider; - // 支持的通信协议类型 - connectionProtocols?: string[]; + // 定义用于 OpenSumi 视图插件内的图标集合 iconStyleSheets?: IconInfo[]; /** diff --git a/packages/core-browser/src/bootstrap/app.ts b/packages/core-browser/src/bootstrap/app.ts index a11154afe2..119db472c4 100644 --- a/packages/core-browser/src/bootstrap/app.ts +++ b/packages/core-browser/src/bootstrap/app.ts @@ -6,7 +6,6 @@ import '@opensumi/monaco-editor-core/esm/vs/editor/editor.main'; import { Injector } from '@opensumi/di'; import { WSChannelHandler } from '@opensumi/ide-connection/lib/browser'; -import { IRuntimeSocketConnection } from '@opensumi/ide-connection/lib/common/connection'; import { AppLifeCycleServiceToken, CommandRegistry, @@ -29,7 +28,6 @@ import { StorageResolverContribution, SupportLogNamespace, URI, - UrlProvider, asExtensionCandidate, createContributionProvider, getDebugLogger, @@ -45,7 +43,7 @@ import { import { IElectronMainLifeCycleService } from '@opensumi/ide-core-common/lib/electron'; import { ClientAppStateService } from '../application'; -import { ESupportRuntime, ElectronConnectionHelper, WebConnectionHelper } from '../application/runtime'; +import { ConnectionHelperFactory, ESupportRuntime } from '../application/runtime'; import { BaseConnectionHelper } from '../application/runtime/base-socket'; import { BrowserRuntime } from '../application/runtime/browser'; import { ElectronRendererRuntime } from '../application/runtime/electron-renderer'; @@ -74,7 +72,7 @@ import { electronEnv } from '../utils'; import { IClientAppOpts, IPreferences, IconInfo, IconMap, LayoutConfig, ModuleConstructor } from './app.interface'; import { IAppRenderer, renderClientApp } from './app.view'; -import { bindConnectionServiceDeprecated, createConnectionService } from './connection'; +import { bindConnectionServiceDeprecated } from './connection'; import { injectInnerProviders } from './inner-providers'; import './polyfills'; @@ -93,7 +91,6 @@ export class ClientApp implements IClientApp, IDisposable { public runtime: ElectronRendererRuntime | BrowserRuntime; private logger: ILogServiceClient; - private connectionPath: UrlProvider; private keybindingRegistry: KeybindingRegistry; private keybindingService: KeybindingService; private modules: ModuleConstructor[]; @@ -106,7 +103,6 @@ export class ClientApp implements IClientApp, IDisposable { constructor(protected opts: IClientAppOpts) { const { modules, - connectionPath, iconStyleSheets, useCdnIcon, editorBackgroundImage, @@ -145,6 +141,8 @@ export class ClientApp implements IClientApp, IDisposable { rpcMessageTimeout: opts.rpcMessageTimeout || -1, }; + this.config.connectionPath = opts.connectionPath || `${this.config.wsPath}/service`; + const layoutViewSizeConfig = this.injector.get(LayoutViewSizeConfig); layoutViewSizeConfig.init(opts.layoutViewSize); this.config.layoutViewSize = layoutViewSizeConfig; @@ -176,7 +174,6 @@ export class ClientApp implements IClientApp, IDisposable { this.config = this.runtime.mergeAppConfig(this.config); - this.connectionPath = connectionPath || `${this.config.wsPath}/service`; this.initBaseProvider(); this.initFields(); this.appendIconStyleSheets(iconStyleSheets, useCdnIcon); @@ -252,36 +249,12 @@ export class ClientApp implements IClientApp, IDisposable { } protected async createConnection(type: `${ESupportRuntime}`) { - let connectionHelper: BaseConnectionHelper; - - switch (type) { - case ESupportRuntime.Electron: - connectionHelper = this.injector.get(ElectronConnectionHelper); - break; - case ESupportRuntime.Web: - connectionHelper = this.injector.get(WebConnectionHelper, [ - { - connectionPath: this.connectionPath, - connectionProtocols: this.opts.connectionProtocols, - }, - ]); - break; - default: - throw new Error(`Unknown backend type: ${type}`); - } - - const connection: IRuntimeSocketConnection = connectionHelper.createConnection(); - const clientId: string = this.config.clientId ?? connectionHelper.getDefaultClientId(); - - await createConnectionService( - this.injector, - this.modules, - () => { - this.onReconnectContributions(); - }, - connection, - clientId, - ); + const factory = this.injector.get(ConnectionHelperFactory) as ConnectionHelperFactory; + const connectionHelper: BaseConnectionHelper = factory(type); + const channel = await connectionHelper.createRPCServiceChannel(this.modules); + channel.onReopen(() => { + this.onReconnectContributions(); + }); // create logger after connection established this.logger = this.getLogger(); diff --git a/packages/core-browser/src/bootstrap/connection.ts b/packages/core-browser/src/bootstrap/connection.ts index db517f8a2a..c42cace8ce 100644 --- a/packages/core-browser/src/bootstrap/connection.ts +++ b/packages/core-browser/src/bootstrap/connection.ts @@ -1,7 +1,6 @@ import { Injector, Provider } from '@opensumi/di'; -import { ISerializer, RPCServiceCenter, WSChannel, initRPCService } from '@opensumi/ide-connection'; +import { RPCServiceCenter, initRPCService } from '@opensumi/ide-connection'; import { WSChannelHandler } from '@opensumi/ide-connection/lib/browser'; -import { IRuntimeSocketConnection } from '@opensumi/ide-connection/lib/common/connection'; import { ISumiConnectionOptions } from '@opensumi/ide-connection/lib/common/rpc/connection'; import { RPCServiceChannelPath } from '@opensumi/ide-connection/lib/common/server-handler'; import { @@ -11,7 +10,6 @@ import { BrowserConnectionOpenEvent, IEventBus, IReporterService, - getDebugLogger, } from '@opensumi/ide-core-common'; import { BackService } from '@opensumi/ide-core-common/lib/module'; @@ -21,25 +19,18 @@ import { ModuleConstructor } from './app.interface'; import type { MessageConnection } from '@opensumi/vscode-jsonrpc/lib/common/connection'; -const initialLogger = getDebugLogger(); - export async function createConnectionService( injector: Injector, modules: ModuleConstructor[], - onReconnect: () => void, - connection: IRuntimeSocketConnection, - clientId: string, - serializer?: ISerializer, + channelHandler: WSChannelHandler, + options: ISumiConnectionOptions = {}, ) { const reporterService: IReporterService = injector.get(IReporterService); + channelHandler.setReporter(reporterService); + const eventBus = injector.get(IEventBus); const stateService = injector.get(ClientAppStateService); - const channelHandler = new WSChannelHandler(connection, initialLogger, clientId, { - serializer, - }); - channelHandler.setReporter(reporterService); - const onOpen = () => { stateService.reachedState('core_module_initialized').then(() => { eventBus.fire(new BrowserConnectionOpenEvent()); @@ -75,21 +66,12 @@ export async function createConnectionService( }); const channel = await channelHandler.openChannel(RPCServiceChannelPath); - channel.onReopen(() => onReconnect()); - bindConnectionService(injector, modules, channel); -} - -export function bindConnectionService( - injector: Injector, - modules: ModuleConstructor[], - channel: WSChannel, - options: ISumiConnectionOptions = {}, -) { const clientCenter = new RPCServiceCenter(); - const disposable = clientCenter.setSumiConnection(channel.createSumiConnection(options)); + clientCenter.setSumiConnection(channel.createSumiConnection(options)); initConnectionService(injector, modules, clientCenter); - return disposable; + + return channel; } /** diff --git a/packages/core-browser/src/bootstrap/inner-providers.ts b/packages/core-browser/src/bootstrap/inner-providers.ts index 7d27c00f5a..88bfe31df9 100644 --- a/packages/core-browser/src/bootstrap/inner-providers.ts +++ b/packages/core-browser/src/bootstrap/inner-providers.ts @@ -38,6 +38,7 @@ import { import { AIReporter } from '../ai-native/ai-reporter'; import { ClientAppStateService } from '../application/application-state-service'; import { ApplicationService } from '../application/application.service'; +import { ConnectionHelperFactory } from '../application/runtime'; import { AuthenticationService } from '../authentication/authentication.service'; import { ClientAppContribution } from '../common'; import { ISplitPanelService, SplitPanelService } from '../components/layout/split-panel.service'; @@ -264,6 +265,10 @@ export function injectInnerProviders(injector: Injector) { token: IDesignStyleService, useClass: DesignStyleService, }, + { + token: ConnectionHelperFactory, + useFactory: ConnectionHelperFactory, + }, ]; injector.addProviders(...providers); } diff --git a/packages/core-browser/src/react-providers/config-provider.tsx b/packages/core-browser/src/react-providers/config-provider.tsx index 8dce045c80..3e3a480519 100644 --- a/packages/core-browser/src/react-providers/config-provider.tsx +++ b/packages/core-browser/src/react-providers/config-provider.tsx @@ -277,6 +277,11 @@ export interface AppConfig { * Define the default size (height) of each layout block in the IDE */ layoutViewSize?: Partial; + + // 自定义前后端通信路径 + connectionPath?: UrlProvider; + // 支持的通信协议类型 + connectionProtocols?: string[]; } export interface ICollaborationClientOpts { From 37dcf2e9e45f696bce2b1f9ecc032aebf56bf406 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=87=8E=E5=A3=B0?= Date: Wed, 29 May 2024 16:25:45 +0800 Subject: [PATCH 09/14] fix: timeout not work --- .../__test__/common/message-io.test.ts | 2 +- .../src/common/rpc-service/registry.ts | 4 +-- .../connection/src/common/rpc/connection.ts | 27 ++++++++++--------- .../connection/src/common/rpc/message-io.ts | 12 ++++----- 4 files changed, 23 insertions(+), 22 deletions(-) diff --git a/packages/connection/__test__/common/message-io.test.ts b/packages/connection/__test__/common/message-io.test.ts index cf14da58b7..ebba3e57aa 100644 --- a/packages/connection/__test__/common/message-io.test.ts +++ b/packages/connection/__test__/common/message-io.test.ts @@ -30,7 +30,7 @@ describe('message io', () => { const buf = repo.Response(0, protocols.add.protocol.method, {}, 3); expect(buf.byteLength).toBeGreaterThan(20); - const response = repo.readMessage(buf) as RPCResponseMessage; + const response = repo.parse(buf) as RPCResponseMessage; const { method, headers, result } = response; diff --git a/packages/connection/src/common/rpc-service/registry.ts b/packages/connection/src/common/rpc-service/registry.ts index aecb43f667..711f05755a 100644 --- a/packages/connection/src/common/rpc-service/registry.ts +++ b/packages/connection/src/common/rpc-service/registry.ts @@ -1,6 +1,6 @@ import { DisposableStore, Emitter, IDisposable } from '@opensumi/ide-core-common'; -import { BaseMessageIO, TSumiProtocol, TSumiProtocolMethod } from '../rpc'; +import { IMessageIO, TSumiProtocol, TSumiProtocolMethod } from '../rpc'; import { RPCServiceMethod } from '../types'; const skipMethods = new Set(['constructor']); @@ -119,7 +119,7 @@ export class ProtocolRegistry { this.emitter.fire(serviceNames); } - applyTo(io: BaseMessageIO) { + applyTo(io: IMessageIO) { if (!io.loadProtocolMethod) { return; } diff --git a/packages/connection/src/common/rpc/connection.ts b/packages/connection/src/common/rpc/connection.ts index 70c3c1b8ac..dafd683eb6 100644 --- a/packages/connection/src/common/rpc/connection.ts +++ b/packages/connection/src/common/rpc/connection.ts @@ -14,7 +14,7 @@ import { METHOD_NOT_REGISTERED } from '../constants'; import { ILogger } from '../types'; import { MethodTimeoutError } from './errors'; -import { BaseMessageIO, MessageIO, OperationType, RPCErrorMessage, RPCResponseMessage } from './message-io'; +import { IMessageIO, MessageIO, OperationType, RPCErrorMessage, RPCResponseMessage } from './message-io'; import { IResponseHeaders, TGenericNotificationHandler, @@ -37,7 +37,7 @@ export interface ISumiConnectionOptions { */ name?: string; - io?: BaseMessageIO; + io?: IMessageIO; } const chunkedResponseHeaders: IResponseHeaders = { @@ -55,13 +55,13 @@ export class SumiConnection implements IDisposable { private _requestId = 0; private _callbacks = new Map(); - private readonly _timeoutHandles = new Map(); + private readonly _reqTimeoutHandles = new Map(); private readonly _cancellationTokenSources = new Map(); private readonly _knownCanceledRequests = new Set(); protected activeRequestPool = new Map>(); - public io: BaseMessageIO; + public io: IMessageIO; protected logger: ILogger; protected capturer: Capturer; @@ -115,7 +115,7 @@ export class SumiConnection implements IDisposable { const timeoutHandle = setTimeout(() => { this._handleTimeout(method, requestId); }, this.options.timeout); - this._timeoutHandles.set(requestId, timeoutHandle); + this._reqTimeoutHandles.set(requestId, timeoutHandle); } const cancellationToken: CancellationToken | undefined = @@ -148,13 +148,13 @@ export class SumiConnection implements IDisposable { } private _handleTimeout(method: string, requestId: number) { - if (!this._callbacks.has(requestId) || !this._timeoutHandles.has(requestId)) { + if (!this._callbacks.has(requestId) || !this._reqTimeoutHandles.has(requestId)) { return; } const callback = this._callbacks.get(requestId)!; this._callbacks.delete(requestId); - this._timeoutHandles.delete(requestId); + this._reqTimeoutHandles.delete(requestId); callback(nullHeaders, new MethodTimeoutError(method)); } @@ -197,15 +197,11 @@ export class SumiConnection implements IDisposable { listen() { this.disposable.add( this.socket.onMessage((data) => { - const message = this.io.readMessage(data); + const message = this.io.parse(data); + const opType = message.kind; const requestId = message.requestId; - if (this._timeoutHandles.has(requestId)) { - clearTimeout(this._timeoutHandles.get(requestId)); - this._timeoutHandles.delete(requestId); - } - switch (opType) { case OperationType.Error: case OperationType.Response: { @@ -213,6 +209,11 @@ export class SumiConnection implements IDisposable { const err = (message as RPCErrorMessage).error; const result = (message as RPCResponseMessage).result; + if (this._reqTimeoutHandles.has(requestId)) { + clearTimeout(this._reqTimeoutHandles.get(requestId)); + this._reqTimeoutHandles.delete(requestId); + } + const runCallback = (headers: IResponseHeaders, error?: any, result?: any) => { const callback = this._callbacks.get(requestId); if (!callback) { diff --git a/packages/connection/src/common/rpc/message-io.ts b/packages/connection/src/common/rpc/message-io.ts index 80d02e961f..3726f0bc8a 100644 --- a/packages/connection/src/common/rpc/message-io.ts +++ b/packages/connection/src/common/rpc/message-io.ts @@ -123,7 +123,7 @@ export interface RPCResponseMessage { kind: OperationType.Response; requestId: number; method: string; - headers: Record; + headers: IResponseHeaders; result: any; } @@ -131,7 +131,7 @@ export interface RPCErrorMessage { kind: OperationType.Error; requestId: number; method: string; - headers: Record; + headers: IResponseHeaders; error: any; } @@ -147,7 +147,7 @@ export type RPCMessage = | RPCErrorMessage | RPCCancelMessage; -export abstract class BaseMessageIO { +export abstract class IMessageIO { abstract loadProtocolMethod?( methodProtocol: TSumiProtocolMethod, options?: { nameConverter?: (str: string) => string }, @@ -159,10 +159,10 @@ export abstract class BaseMessageIO { abstract Response(requestId: number, method: string, headers: Record, result: any): T; abstract Error(requestId: number, method: string, headers: Record, error: any): T; - abstract readMessage(data: T): RPCMessage; + abstract parse(data: T): RPCMessage; } -export class MessageIO extends BaseMessageIO { +export class MessageIO extends IMessageIO { fury: Fury; reader: BinaryReader; writer: BinaryWriter; @@ -293,7 +293,7 @@ export class MessageIO extends BaseMessageIO { return writer.dump(); } - readMessage(data: PlatformBuffer): RPCMessage { + parse(data: PlatformBuffer): RPCMessage { const { reader } = this; reader.reset(data); From 4b448b2a5d1f37a07bdd86187d7b129a68b0cfdc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=87=8E=E5=A3=B0?= Date: Wed, 29 May 2024 16:36:39 +0800 Subject: [PATCH 10/14] refactor: update code --- packages/connection/src/browser/ws-channel-handler.ts | 4 ++-- packages/core-browser/src/application/runtime/base-socket.ts | 5 +---- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/packages/connection/src/browser/ws-channel-handler.ts b/packages/connection/src/browser/ws-channel-handler.ts index 476a79c3de..cb486dacce 100644 --- a/packages/connection/src/browser/ws-channel-handler.ts +++ b/packages/connection/src/browser/ws-channel-handler.ts @@ -10,7 +10,7 @@ import { WSChannel } from '../common/ws-channel'; export interface WSChannelHandlerOptions { logger?: ILogger; - channelSerializer?: ISerializer; + serializer?: ISerializer; } /** @@ -48,7 +48,7 @@ export class WSChannelHandler { this.logger = options.logger || this.logger; this.clientId = clientId; this.LOG_TAG = `[WSChannelHandler] [client-id:${this.clientId}]`; - const serializer = options.channelSerializer || furySerializer; + const serializer = options.serializer || furySerializer; this.wrappedConnection = wrapSerializer(this.connection, serializer); } // 为解决建立连接之后,替换成可落盘的 logger diff --git a/packages/core-browser/src/application/runtime/base-socket.ts b/packages/core-browser/src/application/runtime/base-socket.ts index f4f24adf23..fa94dec865 100644 --- a/packages/core-browser/src/application/runtime/base-socket.ts +++ b/packages/core-browser/src/application/runtime/base-socket.ts @@ -1,5 +1,5 @@ import { Autowired, INJECTOR_TOKEN, Injectable, Injector } from '@opensumi/di'; -import { ISerializer, WSChannel } from '@opensumi/ide-connection'; +import { WSChannel } from '@opensumi/ide-connection'; import { WSChannelHandler } from '@opensumi/ide-connection/lib/browser'; import { IRuntimeSocketConnection } from '@opensumi/ide-connection/lib/common/connection'; import { IReporterService, getDebugLogger } from '@opensumi/ide-core-common'; @@ -20,8 +20,6 @@ export abstract class BaseConnectionHelper { @Autowired(IReporterService) reporterService: IReporterService; - protected channelSerializer: ISerializer | undefined = undefined; - abstract getDefaultClientId(): string; abstract createConnection(): IRuntimeSocketConnection; @@ -30,7 +28,6 @@ export abstract class BaseConnectionHelper { const connection: IRuntimeSocketConnection = this.createConnection(); const clientId: string = this.appConfig.clientId ?? this.getDefaultClientId(); const channelHandler = new WSChannelHandler(connection, clientId, { - channelSerializer: this.channelSerializer, logger: initialLogger, }); From 392b16eee345d59ee782cbe63ba9e3601a71c358 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=87=8E=E5=A3=B0?= Date: Wed, 29 May 2024 16:48:07 +0800 Subject: [PATCH 11/14] feat: add raw message io --- .../connection/src/common/rpc/message-io.ts | 49 +++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/packages/connection/src/common/rpc/message-io.ts b/packages/connection/src/common/rpc/message-io.ts index 3726f0bc8a..42ce15728a 100644 --- a/packages/connection/src/common/rpc/message-io.ts +++ b/packages/connection/src/common/rpc/message-io.ts @@ -350,3 +350,52 @@ export class MessageIO extends IMessageIO { } } } + +export class RawMessageIO implements IMessageIO { + Request(requestId: number, method: string, headers: IRequestHeaders, args: any[]): RPCRequestMessage { + return { + kind: OperationType.Request, + requestId, + method, + headers, + args, + }; + } + Notification(requestId: number, method: string, headers: IRequestHeaders, args: any[]): RPCNotificationMessage { + return { + kind: OperationType.Notification, + requestId, + method, + headers, + args, + }; + } + Cancel(requestId: number): RPCCancelMessage { + return { + kind: OperationType.Cancel, + requestId, + }; + } + Response(requestId: number, method: string, headers: Record, result: any): RPCResponseMessage { + return { + kind: OperationType.Response, + requestId, + headers, + method, + result, + }; + } + Error(requestId: number, method: string, headers: Record, error: any): RPCErrorMessage { + return { + kind: OperationType.Error, + requestId, + method, + headers, + error, + }; + } + + parse(data: any): RPCMessage { + return data; + } +} From 9b2edfdad853de822b569c03ced496608012af27 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=87=8E=E5=A3=B0?= Date: Wed, 29 May 2024 17:21:01 +0800 Subject: [PATCH 12/14] refactor: update code --- .../__test__/common/message-io.test.ts | 12 +++---- .../connection/__test__/common/rpc/utils.ts | 7 +++-- .../connection/__test__/common/ws-channel.ts | 21 +++++++++++++ .../__test__/node/ws-channel.test.ts | 5 +-- .../connection/src/common/rpc/multiplexer.ts | 1 + packages/connection/src/common/ws-channel.ts | 31 ++----------------- .../__tests__/browser/mock.service.ts | 3 +- .../terminal-next/__tests__/browser/proxy.ts | 3 +- 8 files changed, 41 insertions(+), 42 deletions(-) create mode 100644 packages/connection/__test__/common/ws-channel.ts diff --git a/packages/connection/__test__/common/message-io.test.ts b/packages/connection/__test__/common/message-io.test.ts index ebba3e57aa..5a47632508 100644 --- a/packages/connection/__test__/common/message-io.test.ts +++ b/packages/connection/__test__/common/message-io.test.ts @@ -25,12 +25,12 @@ describe('message io', () => { expect(args).toEqual([1, 2]); }); it('should be able to create a response', () => { - const repo = new MessageIO(); - repo.loadProtocolMethod(protocols.add.protocol); - const buf = repo.Response(0, protocols.add.protocol.method, {}, 3); + const io = new MessageIO(); + io.loadProtocolMethod(protocols.add.protocol); + const buf = io.Response(0, protocols.add.protocol.method, {}, 3); expect(buf.byteLength).toBeGreaterThan(20); - const response = repo.parse(buf) as RPCResponseMessage; + const response = io.parse(buf) as RPCResponseMessage; const { method, headers, result } = response; @@ -40,10 +40,10 @@ describe('message io', () => { }); expect(result).toEqual(3); - const buf2 = repo.Response(0, 'any1', {}, null); + const buf2 = io.Response(0, 'any1', {}, null); expect(buf2.byteLength).toBeGreaterThan(20); - const buf3 = repo.Response(0, 'any2', {}, new Uint8Array(10)); + const buf3 = io.Response(0, 'any2', {}, new Uint8Array(10)); expect(buf3.byteLength).toBeGreaterThan(20); }); }); diff --git a/packages/connection/__test__/common/rpc/utils.ts b/packages/connection/__test__/common/rpc/utils.ts index aae09a1fe3..fc38ef08a4 100644 --- a/packages/connection/__test__/common/rpc/utils.ts +++ b/packages/connection/__test__/common/rpc/utils.ts @@ -5,7 +5,7 @@ import { MessageChannel, MessagePort } from 'worker_threads'; import { Type, TypeDescription } from '@furyjs/fury'; -import { ProxyJson, WSChannel } from '@opensumi/ide-connection'; +import { ProxyJson } from '@opensumi/ide-connection'; import { NetSocketConnection } from '@opensumi/ide-connection/lib/common/connection'; import { createWebSocketConnection } from '@opensumi/ide-connection/lib/common/message'; import { Deferred, isUint8Array } from '@opensumi/ide-core-common'; @@ -17,6 +17,7 @@ import { SumiConnection } from '../../../src/common/rpc/connection'; import { MessageIO } from '../../../src/common/rpc/message-io'; import { ProxySumi } from '../../../src/common/rpc-service/proxy/sumi'; import { ServiceRegistry } from '../../../src/common/rpc-service/registry'; +import { createWSChannelForClient } from '../ws-channel'; function createRandomBuffer(size: number): Buffer { const randomContent = randomBytes(size); @@ -132,10 +133,10 @@ export function createMessagePortWSChannel() { const channel = new MessageChannel(); const { port1, port2 } = channel; - const channel1 = WSChannel.forClient(new NodeMessagePortConnection(port1), { + const channel1 = createWSChannelForClient(new NodeMessagePortConnection(port1), { id: '1', }); - const channel2 = WSChannel.forClient(new NodeMessagePortConnection(port2), { + const channel2 = createWSChannelForClient(new NodeMessagePortConnection(port2), { id: '2', }); diff --git a/packages/connection/__test__/common/ws-channel.ts b/packages/connection/__test__/common/ws-channel.ts new file mode 100644 index 0000000000..a50466e8fb --- /dev/null +++ b/packages/connection/__test__/common/ws-channel.ts @@ -0,0 +1,21 @@ +import { DisposableCollection } from '@opensumi/ide-core-common'; + +import { IWSChannelCreateOptions, WSChannel, furySerializer, wrapSerializer } from '../../src/common'; +import { IConnectionShape } from '../../src/common/connection/types'; + +export function createWSChannelForClient(connection: IConnectionShape, options: IWSChannelCreateOptions) { + const disposable = new DisposableCollection(); + + const wrappedConnection = wrapSerializer(connection, furySerializer); + const channel = new WSChannel(wrappedConnection, options); + disposable.push( + wrappedConnection.onMessage((data) => { + channel.dispatch(data); + }), + ); + connection.onceClose(() => { + disposable.dispose(); + }); + + return channel; +} diff --git a/packages/connection/__test__/node/ws-channel.test.ts b/packages/connection/__test__/node/ws-channel.test.ts index b21e799ea9..f71828a1f8 100644 --- a/packages/connection/__test__/node/ws-channel.test.ts +++ b/packages/connection/__test__/node/ws-channel.test.ts @@ -1,16 +1,17 @@ import net from 'net'; -import { IWSChannelCreateOptions, WSChannel } from '@opensumi/ide-connection'; +import { IWSChannelCreateOptions } from '@opensumi/ide-connection'; import { normalizedIpcHandlerPathAsync } from '@opensumi/ide-core-common/lib/utils/ipc'; import { copy } from '../../src/common/buffers/buffers'; import { NetSocketConnection } from '../../src/common/connection'; +import { createWSChannelForClient } from '../common/ws-channel'; const total = 1000; const createWSChannel = (socket: net.Socket, options: IWSChannelCreateOptions) => { const wsConnection = new NetSocketConnection(socket); - return WSChannel.forClient(wsConnection, options); + return createWSChannelForClient(wsConnection, options); }; describe('ws channel node', () => { diff --git a/packages/connection/src/common/rpc/multiplexer.ts b/packages/connection/src/common/rpc/multiplexer.ts index 69d02b57ac..7d8bbee01d 100644 --- a/packages/connection/src/common/rpc/multiplexer.ts +++ b/packages/connection/src/common/rpc/multiplexer.ts @@ -63,6 +63,7 @@ export class SumiConnectionMultiplexer extends SumiConnection implements IRPCPro protected readonly _locals: Map; protected readonly _proxies: Map; protected _knownProtocols: Record; + io: MessageIO; constructor(protected socket: BaseConnection, protected options: ISumiMultiplexerConnectionOptions = {}) { diff --git a/packages/connection/src/common/ws-channel.ts b/packages/connection/src/common/ws-channel.ts index 94b7a556b8..3bf59f994f 100644 --- a/packages/connection/src/common/ws-channel.ts +++ b/packages/connection/src/common/ws-channel.ts @@ -1,16 +1,9 @@ import { EventEmitter } from '@opensumi/events'; -import { - DisposableCollection, - DisposableStore, - EventQueue, - StateTracer, - randomString, -} from '@opensumi/ide-core-common'; +import { DisposableStore, EventQueue, StateTracer, randomString } from '@opensumi/ide-core-common'; import { ChannelMessage, ErrorMessageCode } from './channel/types'; import { IConnectionShape } from './connection/types'; import { ISumiConnectionOptions, SumiConnection } from './rpc/connection'; -import { furySerializer, wrapSerializer } from './serializer'; import { ILogger } from './types'; export interface IWSChannelCreateOptions { @@ -51,24 +44,10 @@ export class WSChannel { logger: ILogger = console; - static forClient(connection: IConnectionShape, options: IWSChannelCreateOptions) { - const disposable = new DisposableCollection(); - - const wrappedConnection = wrapSerializer(connection, furySerializer); - const channel = new WSChannel(wrappedConnection, options); - disposable.push(channel.listen()); - - connection.onceClose(() => { - disposable.dispose(); - }); - - return channel; - } - constructor(public connection: IConnectionShape, options: IWSChannelCreateOptions) { const { id, logger, ensureServerReady } = options; this.id = id; - this.LOG_TAG = `[WSChannel id=${this.id}]`; + this.LOG_TAG = `[WSChannel id:${this.id}]`; if (logger) { this.logger = logger; } @@ -255,12 +234,6 @@ export class WSChannel { return conn; } - listen() { - return this.connection.onMessage((data) => { - this.dispatch(data); - }); - } - dispose() { if (this.timer) { clearTimeout(this.timer); diff --git a/packages/terminal-next/__tests__/browser/mock.service.ts b/packages/terminal-next/__tests__/browser/mock.service.ts index 3de8cefbb9..95f96bd845 100644 --- a/packages/terminal-next/__tests__/browser/mock.service.ts +++ b/packages/terminal-next/__tests__/browser/mock.service.ts @@ -3,6 +3,7 @@ import { Terminal } from 'xterm'; import { Injectable } from '@opensumi/di'; import { WSChannel } from '@opensumi/ide-connection'; +import { createWSChannelForClient } from '@opensumi/ide-connection/__test__/common/ws-channel'; import { WSWebSocketConnection } from '@opensumi/ide-connection/lib/common/connection'; import { Disposable, PreferenceProvider, PreferenceResolveResult, PreferenceService } from '@opensumi/ide-core-browser'; import { Deferred, Emitter, IDisposable, OperatingSystem, PreferenceScope, URI, uuid } from '@opensumi/ide-core-common'; @@ -69,7 +70,7 @@ export class MockTerminalService implements ITerminalService { launchConfig: IShellLaunchConfig, ): Promise { const sock = new WebSocket(localhost(getPort())); - const channel = WSChannel.forClient(new WSWebSocketConnection(sock), { + const channel = createWSChannelForClient(new WSWebSocketConnection(sock), { id: sessionId, }); diff --git a/packages/terminal-next/__tests__/browser/proxy.ts b/packages/terminal-next/__tests__/browser/proxy.ts index ae20a252d3..e1f499c9f6 100644 --- a/packages/terminal-next/__tests__/browser/proxy.ts +++ b/packages/terminal-next/__tests__/browser/proxy.ts @@ -8,6 +8,7 @@ import * as pty from 'node-pty'; import WebSocket from 'ws'; import { WSChannel } from '@opensumi/ide-connection'; +import { createWSChannelForClient } from '@opensumi/ide-connection/__test__/common/ws-channel'; import { WSWebSocketConnection } from '@opensumi/ide-connection/lib/common/connection'; import { uuid } from '@opensumi/ide-core-browser'; @@ -146,7 +147,7 @@ export function handleStdinMessage(json: PtyStdIn) { export function createWsServer() { const server = new WebSocket.Server({ port: getPort() }); server.on('connection', (socket) => { - const channel = WSChannel.forClient(new WSWebSocketConnection(socket), { + const channel = createWSChannelForClient(new WSWebSocketConnection(socket), { id: 'ws-server', }); From 0a4f29c78cd61252f6e47b0e8bc0fc4ecf8f910c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=87=8E=E5=A3=B0?= Date: Thu, 30 May 2024 11:11:16 +0800 Subject: [PATCH 13/14] chore: update re-export --- packages/connection/src/common/serializer/index.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/connection/src/common/serializer/index.ts b/packages/connection/src/common/serializer/index.ts index b17dbbfe0f..743176394c 100644 --- a/packages/connection/src/common/serializer/index.ts +++ b/packages/connection/src/common/serializer/index.ts @@ -3,6 +3,7 @@ import { IConnectionShape } from '../connection/types'; import { ISerializer } from './types'; export * from './fury'; +export * from './raw'; export * from './types'; export const wrapSerializer = ( From d18b853f038abff276897cb3dee04cc6d9bc5fe5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=87=8E=E5=A3=B0?= Date: Thu, 30 May 2024 17:40:15 +0800 Subject: [PATCH 14/14] test: update cov --- packages/connection/__test__/common/fury-extends/any.test.ts | 5 ++--- .../connection/__test__/common/fury-extends/one-of.test.ts | 4 +--- packages/connection/__test__/common/rpc/registry.test.ts | 2 +- 3 files changed, 4 insertions(+), 7 deletions(-) diff --git a/packages/connection/__test__/common/fury-extends/any.test.ts b/packages/connection/__test__/common/fury-extends/any.test.ts index 7d6c74e673..c3bbb0dc01 100644 --- a/packages/connection/__test__/common/fury-extends/any.test.ts +++ b/packages/connection/__test__/common/fury-extends/any.test.ts @@ -1,7 +1,6 @@ -import { ExtObjectTransfer } from '@opensumi/ide-connection/lib/common/fury-extends/any'; +import { AnySerializer, ExtObjectTransfer } from '@opensumi/ide-connection/src/common/fury-extends/any'; -import { AnySerializer } from '../../../lib/common/fury-extends/any'; -import { furyFactory } from '../../../lib/common/fury-extends/shared'; +import { furyFactory } from '../../../src/common/fury-extends/shared'; describe('any serializer', () => { it('can serialize and deserialize any type', () => { diff --git a/packages/connection/__test__/common/fury-extends/one-of.test.ts b/packages/connection/__test__/common/fury-extends/one-of.test.ts index d193a95e27..cbc8aba1c7 100644 --- a/packages/connection/__test__/common/fury-extends/one-of.test.ts +++ b/packages/connection/__test__/common/fury-extends/one-of.test.ts @@ -1,8 +1,6 @@ /* eslint-disable no-console */ -import { furySerializer } from '@opensumi/ide-connection'; - -import { OpenMessage, PingMessage, PongMessage, ServerReadyMessage } from '../../../lib'; +import { OpenMessage, PingMessage, PongMessage, ServerReadyMessage, furySerializer } from '@opensumi/ide-connection'; const parse = furySerializer.deserialize; const stringify = furySerializer.serialize; diff --git a/packages/connection/__test__/common/rpc/registry.test.ts b/packages/connection/__test__/common/rpc/registry.test.ts index 88a62bf96d..d496742dfa 100644 --- a/packages/connection/__test__/common/rpc/registry.test.ts +++ b/packages/connection/__test__/common/rpc/registry.test.ts @@ -1,4 +1,4 @@ -import { ServiceRegistry, getServiceMethods } from '@opensumi/ide-connection/lib/common/rpc-service/registry'; +import { ServiceRegistry, getServiceMethods } from '@opensumi/ide-connection/src/common/rpc-service/registry'; import { Deferred } from '@opensumi/ide-core-common'; describe('registry should work', () => {