From 8bb9afa7a05eb814b3973785205482e58894f2fa Mon Sep 17 00:00:00 2001 From: Tobias Ortmayr Date: Tue, 30 May 2023 12:09:49 +0200 Subject: [PATCH] core: improve rpc protocol - Ensure that pending requests are properly rejected when the underlying service channel is closed. - Remove obsolete id-property from `NotificationMessage`s. Ids are only required fro matching request-response pairs. - Rename `JsonRpcProxyFactory` and related types to `RpcProxyFactory` (Rpc*). The naming scheme was a remainder of the old vscode jsonr-rpc based protocol. By simply using the `Rpc` suffix the class names are less misleading and protocol agnostic. - Keep deprecated declarations of the old `JsonRpc*` namespace. The components are heavily used by adopters so we should maintain this deprecated symbols for a while to enable graceful migration without hard API breaks. - Use a deferred in the `RpcProxyFactory` for protocol initialization Fixes #12581 --- .../common/message-rpc/rpc-message-encoder.ts | 7 +- .../src/common/message-rpc/rpc-protocol.ts | 14 +- .../src/common/messaging/proxy-factory.ts | 149 +++++++++++------- 3 files changed, 107 insertions(+), 63 deletions(-) diff --git a/packages/core/src/common/message-rpc/rpc-message-encoder.ts b/packages/core/src/common/message-rpc/rpc-message-encoder.ts index b6886d908f002..f975c96dca021 100644 --- a/packages/core/src/common/message-rpc/rpc-message-encoder.ts +++ b/packages/core/src/common/message-rpc/rpc-message-encoder.ts @@ -51,7 +51,6 @@ export interface RequestMessage { export interface NotificationMessage { type: RpcMessageType.Notification; - id: number; method: string; args: any[]; } @@ -111,7 +110,7 @@ export interface RpcMessageDecoder { export interface RpcMessageEncoder { cancel(buf: WriteBuffer, requestId: number): void; - notification(buf: WriteBuffer, requestId: number, method: string, args: any[]): void + notification(buf: WriteBuffer, method: string, args: any[]): void request(buf: WriteBuffer, requestId: number, method: string, args: any[]): void @@ -130,8 +129,8 @@ export class MsgPackMessageEncoder implements RpcMessageEncoder { cancel(buf: WriteBuffer, requestId: number): void { this.encode(buf, { type: RpcMessageType.Cancel, id: requestId }); } - notification(buf: WriteBuffer, requestId: number, method: string, args: any[]): void { - this.encode(buf, { type: RpcMessageType.Notification, id: requestId, method, args }); + notification(buf: WriteBuffer, method: string, args: any[]): void { + this.encode(buf, { type: RpcMessageType.Notification, method, args }); } request(buf: WriteBuffer, requestId: number, method: string, args: any[]): void { this.encode(buf, { type: RpcMessageType.Request, id: requestId, method, args }); diff --git a/packages/core/src/common/message-rpc/rpc-protocol.ts b/packages/core/src/common/message-rpc/rpc-protocol.ts index 4dbf422e0b877..b57ac30043ac7 100644 --- a/packages/core/src/common/message-rpc/rpc-protocol.ts +++ b/packages/core/src/common/message-rpc/rpc-protocol.ts @@ -77,7 +77,11 @@ export class RpcProtocol { this.encoder = options.encoder ?? new MsgPackMessageEncoder(); this.decoder = options.decoder ?? new MsgPackMessageDecoder(); this.toDispose.push(this.onNotificationEmitter); - channel.onClose(() => this.toDispose.dispose()); + channel.onClose(event => { + this.pendingRequests.forEach(pending => pending.reject(new Error(event.reason))); + this.pendingRequests.clear(); + this.toDispose.dispose(); + }); this.toDispose.push(channel.onMessage(readBuffer => this.handleMessage(this.decoder.parse(readBuffer())))); this.mode = options.mode ?? 'default'; @@ -98,7 +102,7 @@ export class RpcProtocol { return; } case RpcMessageType.Notification: { - this.handleNotify(message.id, message.method, message.args); + this.handleNotify(message.method, message.args); return; } } @@ -116,7 +120,7 @@ export class RpcProtocol { } } // If the message was not handled until here, it is incompatible with the mode. - console.warn(`Received message incompatible with this RPCProtocol's mode '${this.mode}'. Type: ${message.type}. ID: ${message.id}.`); + console.warn(`Received message incompatible with this RPCProtocol's mode '${this.mode}'. Type: ${message.type}`); } protected handleReply(id: number, value: any): void { @@ -179,7 +183,7 @@ export class RpcProtocol { } const output = this.channel.getWriteBuffer(); - this.encoder.notification(output, this.nextMessageId++, method, args); + this.encoder.notification(output, method, args); output.commit(); } @@ -226,7 +230,7 @@ export class RpcProtocol { } } - protected async handleNotify(id: number, method: string, args: any[]): Promise { + protected async handleNotify(method: string, args: any[]): Promise { if (this.toDispose.disposed) { return; } diff --git a/packages/core/src/common/messaging/proxy-factory.ts b/packages/core/src/common/messaging/proxy-factory.ts index a18f54305af96..4dffcb0d34e52 100644 --- a/packages/core/src/common/messaging/proxy-factory.ts +++ b/packages/core/src/common/messaging/proxy-factory.ts @@ -23,28 +23,30 @@ import { Emitter, Event } from '../event'; import { Channel } from '../message-rpc/channel'; import { RequestHandler, RpcProtocol } from '../message-rpc/rpc-protocol'; import { ConnectionHandler } from './handler'; +import { Deferred } from '../promise-util'; -export type JsonRpcServer = Disposable & { +export type RpcServer = Disposable & { /** * If this server is a proxy to a remote server then * a client is used as a local object - * to handle JSON-RPC messages from the remote server. + * to handle RPC messages from the remote server. */ setClient(client: Client | undefined): void; getClient?(): Client | undefined; }; -export interface JsonRpcConnectionEventEmitter { +export interface RpcConnectionEventEmitter { readonly onDidOpenConnection: Event; readonly onDidCloseConnection: Event; } -export type JsonRpcProxy = T & JsonRpcConnectionEventEmitter; -export class JsonRpcConnectionHandler implements ConnectionHandler { +export type RpcProxy = T & RpcConnectionEventEmitter; + +export class RpcConnectionHandler implements ConnectionHandler { constructor( readonly path: string, - readonly targetFactory: (proxy: JsonRpcProxy) => any, - readonly factoryConstructor: new () => JsonRpcProxyFactory = JsonRpcProxyFactory + readonly targetFactory: (proxy: RpcProxy) => any, + readonly factoryConstructor: new () => RpcProxyFactory = RpcProxyFactory ) { } onConnection(connection: Channel): void { @@ -54,20 +56,30 @@ export class JsonRpcConnectionHandler implements ConnectionHan factory.listen(connection); } } + /** - * Factory for creating a new {@link RpcConnection} for a given chanel and {@link RequestHandler}. + * Factory for creating a new {@link RpcProtocol} for a given chanel and {@link RequestHandler}. */ -export type RpcConnectionFactory = (channel: Channel, requestHandler: RequestHandler) => RpcProtocol; +export type RpcProtocolFactory = (channel: Channel, requestHandler: RequestHandler) => RpcProtocol; -const defaultRPCConnectionFactory: RpcConnectionFactory = (channel, requestHandler) => new RpcProtocol(channel, requestHandler); +const defaultRpcProtocolFactory: RpcProtocolFactory = (channel, requestHandler) => new RpcProtocol(channel, requestHandler); + +export interface RpcProxyFactoryOptions { + rpcProtocolFactory?: RpcProtocolFactory, + /** + * Used to identify proxy calls that should be sent as notification. If method starts with one of the given + * keys it is treated as a notification + */ + notificationKeys?: string[] +} /** - * Factory for JSON-RPC proxy objects. + * Factory for RPC proxy objects. * - * A JSON-RPC proxy exposes the programmatic interface of an object through - * JSON-RPC. This allows remote programs to call methods of this objects by - * sending JSON-RPC requests. This takes place over a bi-directional stream, - * where both ends can expose an object and both can call methods each other's + * A RPC proxy exposes the programmatic interface of an object through + * Theia's RPC protocol. This allows remote programs to call methods of this objects by + * sending RPC requests. This takes place over a bi-directional stream, + * where both ends can expose an object and both can call methods on each other's * exposed object. * * For example, assuming we have an object of the following type on one end: @@ -76,87 +88,87 @@ const defaultRPCConnectionFactory: RpcConnectionFactory = (channel, requestHandl * bar(baz: number): number { return baz + 1 } * } * - * which we want to expose through a JSON-RPC interface. We would do: + * which we want to expose through a JSON interface. We would do: * * let target = new Foo() - * let factory = new JsonRpcProxyFactory('/foo', target) + * let factory = new RpcProxyFactory('/foo', target) * factory.onConnection(connection) * * The party at the other end of the `connection`, in order to remotely call * methods on this object would do: * - * let factory = new JsonRpcProxyFactory('/foo') + * let factory = new RpcProxyFactory('/foo') * factory.onConnection(connection) * let proxy = factory.createProxy(); * let result = proxy.bar(42) * // result is equal to 43 * * One the wire, it would look like this: - * - * --> {"jsonrpc": "2.0", "id": 0, "method": "bar", "params": {"baz": 42}} - * <-- {"jsonrpc": "2.0", "id": 0, "result": 43} + * --> { "type":"1", "id": 1, "method": "bar", "args": [42]} + * <-- { "type":"3", "id": 1, "res": 43} * * Note that in the code of the caller, we didn't pass a target object to - * JsonRpcProxyFactory, because we don't want/need to expose an object. + * RpcProxyFactory, because we don't want/need to expose an object. * If we had passed a target object, the other side could've called methods on * it. * - * @param - The type of the object to expose to JSON-RPC. + * @param - The type of the object to expose to RPC. */ -export class JsonRpcProxyFactory implements ProxyHandler { +export class RpcProxyFactory implements ProxyHandler { protected readonly onDidOpenConnectionEmitter = new Emitter(); protected readonly onDidCloseConnectionEmitter = new Emitter(); - protected connectionPromiseResolve: (connection: RpcProtocol) => void; - protected connectionPromise: Promise; + protected rpcDeferred = new Deferred(); /** - * Build a new JsonRpcProxyFactory. + * Build a new RpcProxyFactory. * - * @param target - The object to expose to JSON-RPC methods calls. If this + * @param target - The object to expose to RPC methods calls. If this * is omitted, the proxy won't be able to handle requests, only send them. */ - constructor(public target?: any, protected rpcConnectionFactory = defaultRPCConnectionFactory) { + constructor(public target?: any, protected rpcProtocolFactory = defaultRpcProtocolFactory) { this.waitForConnection(); } protected waitForConnection(): void { - this.connectionPromise = new Promise(resolve => - this.connectionPromiseResolve = resolve - ); - this.connectionPromise.then(connection => { - connection.channel.onClose(() => { + this.rpcDeferred.promise.then(protocol => { + protocol.channel.onClose(() => { this.onDidCloseConnectionEmitter.fire(undefined); // Wait for connection in case the backend reconnects this.waitForConnection(); }); + protocol.channel.onError(err => { + if (this.rpcDeferred.state !== 'resolved') { + this.rpcDeferred.reject(err); + } + }); this.onDidOpenConnectionEmitter.fire(undefined); }); } /** - * Connect a MessageConnection to the factory. + * Connect a {@link Channel} to the factory by creating an {@link RpcProtocol} on top of it. * - * This connection will be used to send/receive JSON-RPC requests and + * This protocol will be used to send/receive RPC requests and * response. */ listen(channel: Channel): void { - const connection = this.rpcConnectionFactory(channel, (meth, args) => this.onRequest(meth, ...args)); - connection.onNotification(event => this.onNotification(event.method, ...event.args)); + const protocol = this.rpcProtocolFactory(channel, (meth, args) => this.onRequest(meth, ...args)); + protocol.onNotification(event => this.onNotification(event.method, ...event.args)); - this.connectionPromiseResolve(connection); + this.rpcDeferred.resolve(protocol); } /** - * Process an incoming JSON-RPC method call. + * Process an incoming RPC method call. * - * onRequest is called when the JSON-RPC connection received a method call - * request. It calls the corresponding method on [[target]]. + * onRequest is called when the RPC connection received a method call + * request. It calls the corresponding method on [[target]]. * * The return value is a Promise object that is resolved with the return - * value of the method call, if it is successful. The promise is rejected + * value of the method call, if it is successful. The promise is rejected * if the called method does not exist or if it throws. * * @returns A promise of the method call completion. @@ -181,7 +193,7 @@ export class JsonRpcProxyFactory implements ProxyHandler { } /** - * Process an incoming JSON-RPC notification. + * Process an incoming RPC notification. * * Same as [[onRequest]], but called on incoming notifications rather than * methods calls. @@ -194,37 +206,37 @@ export class JsonRpcProxyFactory implements ProxyHandler { /** * Create a Proxy exposing the interface of an object of type T. This Proxy - * can be used to do JSON-RPC method calls on the remote target object as + * can be used to do RPC method calls on the remote target object as * if it was local. * - * If `T` implements `JsonRpcServer` then a client is used as a target object for a remote target object. + * If `T` implements `RpcServer` then a client is used as a target object for a remote target object. */ - createProxy(): JsonRpcProxy { + createProxy(): RpcProxy { const result = new Proxy(this as any, this); return result as any; } /** - * Get a callable object that executes a JSON-RPC method call. + * Get a callable object that executes a RPC method call. * * Getting a property on the Proxy object returns a callable that, when - * called, executes a JSON-RPC call. The name of the property defines the + * called, executes a RPC call. The name of the property defines the * method to be called. The callable takes a variable number of arguments, - * which are passed in the JSON-RPC method call. + * which are passed in the RPC method call. * * For example, if you have a Proxy object: * - * let fooProxyFactory = JsonRpcProxyFactory('/foo') + * let fooProxyFactory = RpcProxyFactory('/foo') * let fooProxy = fooProxyFactory.createProxy() * * accessing `fooProxy.bar` will return a callable that, when called, - * executes a JSON-RPC method call to method `bar`. Therefore, doing + * executes a RPC method call to method `bar`. Therefore, doing * `fooProxy.bar()` will call the `bar` method on the remote Foo object. * * @param target - unused. * @param p - The property accessed on the Proxy object. * @param receiver - unused. - * @returns A callable that executes the JSON-RPC call. + * @returns A callable that executes the RPC call. */ get(target: T, p: PropertyKey, receiver: any): any { if (p === 'setClient') { @@ -249,7 +261,7 @@ export class JsonRpcProxyFactory implements ProxyHandler { return (...args: any[]) => { const method = p.toString(); const capturedError = new Error(`Request '${method}' failed`); - return this.connectionPromise.then(connection => + return this.rpcDeferred.promise.then(connection => new Promise((resolve, reject) => { try { if (isNotify) { @@ -308,3 +320,32 @@ export class JsonRpcProxyFactory implements ProxyHandler { } +/** + * @deprecated since 1.39.0 use `RpcConnectionEventEmitter` instead + */ +export type JsonRpcConnectionEventEmitter = RpcConnectionEventEmitter; + +/** + * @deprecated since 1.39.0 use `RpcServer` instead + */ +export type JsonRpcServer = RpcServer; + +/** + * @deprecated since 1.39.0 use `RpcProxy` instead + */ +export type JsonRpcProxy = RpcProxy; + +/** + * @deprecated since 1.39.0 use `RpcConnectionHandler` instead + */ +export class JsonRpcConnectionHandler extends RpcConnectionHandler { + +} + +/** + * @deprecated since 1.39.0 use `JsonRpcProxyFactory` instead + */ +export class JsonRpcProxyFactory extends RpcProxyFactory { + +} +