diff --git a/packages/plugin-ext/src/common/proxy-handler.ts b/packages/plugin-ext/src/common/proxy-handler.ts index dcfe7d4a2e722..5b2adb3d154ef 100644 --- a/packages/plugin-ext/src/common/proxy-handler.ts +++ b/packages/plugin-ext/src/common/proxy-handler.ts @@ -25,11 +25,19 @@ export interface RpcHandlerOptions { } export interface ProxyHandlerOptions extends RpcHandlerOptions { channelProvider: () => Promise, + initCallback: InitializationCallback, } export interface InvocationHandlerOptions extends RpcHandlerOptions { target: any } + +export interface InitializationCallback { + reportInit(id: string): void + reportInitDone(id: string): void + checkInit(): Promise +} + /** * A proxy handler that will send any method invocation on the proxied object * as a rcp protocol message over a channel. @@ -40,6 +48,7 @@ export class ClientProxyHandler implements ProxyHandler { readonly id: string; private readonly channelProvider: () => Promise; + private readonly initCallback: InitializationCallback; private readonly encoder: RpcMessageEncoder; private readonly decoder: RpcMessageDecoder; @@ -50,11 +59,13 @@ export class ClientProxyHandler implements ProxyHandler { private initializeRpc(): void { // we need to set the flag to true before waiting for the channel provider. Otherwise `get` might // get called again and we'll try to open a channel more than once + this.initCallback.reportInit(this.id); this.isRpcInitialized = true; const clientOptions: RpcProtocolOptions = { encoder: this.encoder, decoder: this.decoder, mode: 'clientOnly' }; this.channelProvider().then(channel => { const rpc = new RpcProtocol(channel, undefined, clientOptions); this.rpcDeferred.resolve(rpc); + this.initCallback.reportInitDone(this.id); }); } @@ -69,13 +80,13 @@ export class ClientProxyHandler implements ProxyHandler { const isNotify = this.isNotification(name); return (...args: any[]) => { const method = name.toString(); - return this.rpcDeferred.promise.then(async (connection: RpcProtocol) => { + return this.initCallback.checkInit().then(() => this.rpcDeferred.promise.then(async (connection: RpcProtocol) => { if (isNotify) { connection.sendNotification(method, args); } else { return await connection.sendRequest(method, args) as Promise; } - }); + })); }; } diff --git a/packages/plugin-ext/src/common/rpc-protocol.ts b/packages/plugin-ext/src/common/rpc-protocol.ts index 107c639ef2c60..fdb71bce887d1 100644 --- a/packages/plugin-ext/src/common/rpc-protocol.ts +++ b/packages/plugin-ext/src/common/rpc-protocol.ts @@ -27,11 +27,12 @@ import { Emitter, Event } from '@theia/core/lib/common/event'; import { ChannelMultiplexer, MessageProvider } from '@theia/core/lib/common/message-rpc/channel'; import { MsgPackMessageDecoder, MsgPackMessageEncoder } from '@theia/core/lib/common/message-rpc/rpc-message-encoder'; import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '@theia/core/lib/common/message-rpc/uint8-array-message-buffer'; -import { ClientProxyHandler, RpcInvocationHandler } from './proxy-handler'; +import { ClientProxyHandler, InitializationCallback, RpcInvocationHandler } from './proxy-handler'; import { MsgPackExtensionManager } from '@theia/core/lib/common/message-rpc/msg-pack-extension-manager'; import { URI as VSCodeURI } from '@theia/core/shared/vscode-uri'; import { BinaryBuffer } from '@theia/core/lib/common/buffer'; import { Range, Position } from '../plugin/types-impl'; +import { Deferred } from '@theia/core/lib/common/promise-util'; export interface MessageConnection { send(msg: string): void; @@ -83,6 +84,7 @@ export class RPCProtocolImpl implements RPCProtocol { private readonly multiplexer: ChannelMultiplexer; private readonly encoder = new MsgPackMessageEncoder(); private readonly decoder = new MsgPackMessageDecoder(); + private readonly initCallback: InitializationCallback; private readonly toDispose = new DisposableCollection( Disposable.create(() => { /* mark as no disposed */ }) @@ -91,6 +93,7 @@ export class RPCProtocolImpl implements RPCProtocol { constructor(channel: Channel) { this.toDispose.push(this.multiplexer = new ChannelMultiplexer(new BatchingChannel(channel))); this.toDispose.push(Disposable.create(() => this.proxies.clear())); + this.initCallback = new InitializationCallbackImpl(); } dispose(): void { @@ -114,7 +117,9 @@ export class RPCProtocolImpl implements RPCProtocol { } protected createProxy(proxyId: string): T { - const handler = new ClientProxyHandler({ id: proxyId, encoder: this.encoder, decoder: this.decoder, channelProvider: () => this.multiplexer.open(proxyId) }); + const handler = new ClientProxyHandler({ + id: proxyId, encoder: this.encoder, decoder: this.decoder, channelProvider: () => this.multiplexer.open(proxyId), initCallback: this.initCallback + }); return new Proxy(Object.create(null), handler); } @@ -149,6 +154,32 @@ export class RPCProtocolImpl implements RPCProtocol { } } +export class InitializationCallbackImpl implements InitializationCallback { + + private readonly runningInits = new Set(); + + private checkInitDeferred: Deferred = new Deferred(); + + reportInit(id: string): void { + if (this.runningInits.size === 0) { + this.checkInitDeferred = new Deferred(); + } + this.runningInits.add(id); + } + + reportInitDone(id: string): void { + this.runningInits.delete(id); + if (this.runningInits.size === 0) { + this.checkInitDeferred.resolve(); + } + } + + checkInit(): Promise { + return this.checkInitDeferred.promise; + } + +} + /** * Wraps and underlying channel to send/receive multiple messages in one go: * - multiple messages to be sent from one stack get sent in bulk at `process.nextTick`.