diff --git a/packages/plugin-ext/src/common/proxy-handler.ts b/packages/plugin-ext/src/common/proxy-handler.ts index 5b2adb3d154ef..632a220fed9b3 100644 --- a/packages/plugin-ext/src/common/proxy-handler.ts +++ b/packages/plugin-ext/src/common/proxy-handler.ts @@ -25,17 +25,16 @@ export interface RpcHandlerOptions { } export interface ProxyHandlerOptions extends RpcHandlerOptions { channelProvider: () => Promise, - initCallback: InitializationCallback, + proxySynchronizer: ProxySynchronizer, } export interface InvocationHandlerOptions extends RpcHandlerOptions { target: any } -export interface InitializationCallback { - reportInit(id: string): void - reportInitDone(id: string): void - checkInit(): Promise +export interface ProxySynchronizer { + startProxyInitialization(id: string, init: Promise): void + pendingProxyInitializations(): Promise } /** @@ -48,7 +47,7 @@ export class ClientProxyHandler implements ProxyHandler { readonly id: string; private readonly channelProvider: () => Promise; - private readonly initCallback: InitializationCallback; + private readonly proxySynchronizer: ProxySynchronizer; private readonly encoder: RpcMessageEncoder; private readonly decoder: RpcMessageDecoder; @@ -59,13 +58,12 @@ export class ClientProxyHandler implements ProxyHandler { private initializeRpc(): void { // we need to set the flag to true before waiting for the channel provider. Otherwise `get` might // get called again and we'll try to open a channel more than once - this.initCallback.reportInit(this.id); + this.proxySynchronizer.startProxyInitialization(this.id, this.rpcDeferred.promise.then(() => { })); this.isRpcInitialized = true; const clientOptions: RpcProtocolOptions = { encoder: this.encoder, decoder: this.decoder, mode: 'clientOnly' }; this.channelProvider().then(channel => { const rpc = new RpcProtocol(channel, undefined, clientOptions); this.rpcDeferred.resolve(rpc); - this.initCallback.reportInitDone(this.id); }); } @@ -80,7 +78,7 @@ export class ClientProxyHandler implements ProxyHandler { const isNotify = this.isNotification(name); return (...args: any[]) => { const method = name.toString(); - return this.initCallback.checkInit().then(() => this.rpcDeferred.promise.then(async (connection: RpcProtocol) => { + return this.proxySynchronizer.pendingProxyInitializations().then(() => this.rpcDeferred.promise.then(async (connection: RpcProtocol) => { if (isNotify) { connection.sendNotification(method, args); } else { diff --git a/packages/plugin-ext/src/common/rpc-protocol.ts b/packages/plugin-ext/src/common/rpc-protocol.ts index 1a2aaa470a8ab..d58b4173c59e8 100644 --- a/packages/plugin-ext/src/common/rpc-protocol.ts +++ b/packages/plugin-ext/src/common/rpc-protocol.ts @@ -27,7 +27,7 @@ import { Emitter, Event } from '@theia/core/lib/common/event'; import { ChannelMultiplexer, MessageProvider } from '@theia/core/lib/common/message-rpc/channel'; import { MsgPackMessageDecoder, MsgPackMessageEncoder } from '@theia/core/lib/common/message-rpc/rpc-message-encoder'; import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '@theia/core/lib/common/message-rpc/uint8-array-message-buffer'; -import { ClientProxyHandler, InitializationCallback, RpcInvocationHandler } from './proxy-handler'; +import { ClientProxyHandler, ProxySynchronizer, RpcInvocationHandler } from './proxy-handler'; import { MsgPackExtensionManager } from '@theia/core/lib/common/message-rpc/msg-pack-extension-manager'; import { URI as VSCodeURI } from '@theia/core/shared/vscode-uri'; import { BinaryBuffer } from '@theia/core/lib/common/buffer'; @@ -84,7 +84,7 @@ export class RPCProtocolImpl implements RPCProtocol { private readonly multiplexer: ChannelMultiplexer; private readonly encoder = new MsgPackMessageEncoder(); private readonly decoder = new MsgPackMessageDecoder(); - private readonly initCallback: InitializationCallback; + private readonly initCallback: ProxySynchronizer; private readonly toDispose = new DisposableCollection( Disposable.create(() => { /* mark as no disposed */ }) @@ -118,7 +118,7 @@ export class RPCProtocolImpl implements RPCProtocol { protected createProxy(proxyId: string): T { const handler = new ClientProxyHandler({ - id: proxyId, encoder: this.encoder, decoder: this.decoder, channelProvider: () => this.multiplexer.open(proxyId), initCallback: this.initCallback + id: proxyId, encoder: this.encoder, decoder: this.decoder, channelProvider: () => this.multiplexer.open(proxyId), proxySynchronizer: this.initCallback }); return new Proxy(Object.create(null), handler); } @@ -154,7 +154,7 @@ export class RPCProtocolImpl implements RPCProtocol { } } -export class InitializationCallbackImpl implements InitializationCallback { +export class InitializationCallbackImpl implements ProxySynchronizer { private readonly runningInits = new Set(); @@ -166,21 +166,22 @@ export class InitializationCallbackImpl implements InitializationCallback { this.checkInitDeferred.resolve(); } - reportInit(id: string): void { + startProxyInitialization(id: string, init: Promise): void { if (this.runningInits.size === 0) { this.checkInitDeferred = new Deferred(); } + init.then(() => this.finishProxyInitialization(id)); this.runningInits.add(id); } - reportInitDone(id: string): void { + protected finishProxyInitialization(id: string): void { this.runningInits.delete(id); if (this.runningInits.size === 0) { this.checkInitDeferred.resolve(); } } - checkInit(): Promise { + pendingProxyInitializations(): Promise { return this.checkInitDeferred.promise; }