diff --git a/packages/plugin-ext/src/common/proxy-handler.ts b/packages/plugin-ext/src/common/proxy-handler.ts index dcfe7d4a2e722..9a3f362ca7deb 100644 --- a/packages/plugin-ext/src/common/proxy-handler.ts +++ b/packages/plugin-ext/src/common/proxy-handler.ts @@ -25,11 +25,18 @@ export interface RpcHandlerOptions { } export interface ProxyHandlerOptions extends RpcHandlerOptions { channelProvider: () => Promise, + proxySynchronizer: ProxySynchronizer, } export interface InvocationHandlerOptions extends RpcHandlerOptions { target: any } + +export interface ProxySynchronizer { + startProxyInitialization(id: string, init: Promise): void + pendingProxyInitializations(): Promise +} + /** * A proxy handler that will send any method invocation on the proxied object * as a rcp protocol message over a channel. @@ -40,6 +47,7 @@ export class ClientProxyHandler implements ProxyHandler { readonly id: string; private readonly channelProvider: () => Promise; + private readonly proxySynchronizer: ProxySynchronizer; private readonly encoder: RpcMessageEncoder; private readonly decoder: RpcMessageDecoder; @@ -50,6 +58,7 @@ export class ClientProxyHandler implements ProxyHandler { private initializeRpc(): void { // we need to set the flag to true before waiting for the channel provider. Otherwise `get` might // get called again and we'll try to open a channel more than once + this.proxySynchronizer.startProxyInitialization(this.id, this.rpcDeferred.promise.then(() => { })); this.isRpcInitialized = true; const clientOptions: RpcProtocolOptions = { encoder: this.encoder, decoder: this.decoder, mode: 'clientOnly' }; this.channelProvider().then(channel => { @@ -69,7 +78,7 @@ export class ClientProxyHandler implements ProxyHandler { const isNotify = this.isNotification(name); return (...args: any[]) => { const method = name.toString(); - return this.rpcDeferred.promise.then(async (connection: RpcProtocol) => { + return this.sendWhenNoInit(async (connection: RpcProtocol) => { if (isNotify) { connection.sendNotification(method, args); } else { @@ -79,6 +88,10 @@ export class ClientProxyHandler implements ProxyHandler { }; } + private sendWhenNoInit(send: (connection: RpcProtocol) => Promise): Promise { + return this.proxySynchronizer.pendingProxyInitializations().then(() => this.rpcDeferred.promise.then(send)); + } + /** * Return whether the given property represents a notification. If true, * the promise returned from the invocation will resolve immediately to `undefined` diff --git a/packages/plugin-ext/src/common/rpc-protocol.ts b/packages/plugin-ext/src/common/rpc-protocol.ts index 107c639ef2c60..6f2a8792437b8 100644 --- a/packages/plugin-ext/src/common/rpc-protocol.ts +++ b/packages/plugin-ext/src/common/rpc-protocol.ts @@ -27,11 +27,12 @@ import { Emitter, Event } from '@theia/core/lib/common/event'; import { ChannelMultiplexer, MessageProvider } from '@theia/core/lib/common/message-rpc/channel'; import { MsgPackMessageDecoder, MsgPackMessageEncoder } from '@theia/core/lib/common/message-rpc/rpc-message-encoder'; import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '@theia/core/lib/common/message-rpc/uint8-array-message-buffer'; -import { ClientProxyHandler, RpcInvocationHandler } from './proxy-handler'; +import { ClientProxyHandler, ProxySynchronizer, RpcInvocationHandler } from './proxy-handler'; import { MsgPackExtensionManager } from '@theia/core/lib/common/message-rpc/msg-pack-extension-manager'; import { URI as VSCodeURI } from '@theia/core/shared/vscode-uri'; import { BinaryBuffer } from '@theia/core/lib/common/buffer'; import { Range, Position } from '../plugin/types-impl'; +import { Deferred } from '@theia/core/lib/common/promise-util'; export interface MessageConnection { send(msg: string): void; @@ -83,6 +84,7 @@ export class RPCProtocolImpl implements RPCProtocol { private readonly multiplexer: ChannelMultiplexer; private readonly encoder = new MsgPackMessageEncoder(); private readonly decoder = new MsgPackMessageDecoder(); + private readonly initCallback: ProxySynchronizer; private readonly toDispose = new DisposableCollection( Disposable.create(() => { /* mark as no disposed */ }) @@ -91,6 +93,7 @@ export class RPCProtocolImpl implements RPCProtocol { constructor(channel: Channel) { this.toDispose.push(this.multiplexer = new ChannelMultiplexer(new BatchingChannel(channel))); this.toDispose.push(Disposable.create(() => this.proxies.clear())); + this.initCallback = new ProxySynchronizerImpl(); } dispose(): void { @@ -114,7 +117,9 @@ export class RPCProtocolImpl implements RPCProtocol { } protected createProxy(proxyId: string): T { - const handler = new ClientProxyHandler({ id: proxyId, encoder: this.encoder, decoder: this.decoder, channelProvider: () => this.multiplexer.open(proxyId) }); + const handler = new ClientProxyHandler({ + id: proxyId, encoder: this.encoder, decoder: this.decoder, channelProvider: () => this.multiplexer.open(proxyId), proxySynchronizer: this.initCallback + }); return new Proxy(Object.create(null), handler); } @@ -149,6 +154,39 @@ export class RPCProtocolImpl implements RPCProtocol { } } +export class ProxySynchronizerImpl implements ProxySynchronizer { + + private readonly runningInitializations = new Set(); + + private _pendingProxyInitializations: Deferred; + + constructor() { + this._pendingProxyInitializations = new Deferred(); + /* after creation no init is active */ + this._pendingProxyInitializations.resolve(); + } + + startProxyInitialization(id: string, init: Promise): void { + if (this.runningInitializations.size === 0) { + this._pendingProxyInitializations = new Deferred(); + } + init.then(() => this.finishedProxyInitialization(id)); + this.runningInitializations.add(id); + } + + protected finishedProxyInitialization(id: string): void { + this.runningInitializations.delete(id); + if (this.runningInitializations.size === 0) { + this._pendingProxyInitializations.resolve(); + } + } + + pendingProxyInitializations(): Promise { + return this._pendingProxyInitializations.promise; + } + +} + /** * Wraps and underlying channel to send/receive multiple messages in one go: * - multiple messages to be sent from one stack get sent in bulk at `process.nextTick`. diff --git a/packages/plugin-ext/src/plugin/documents.ts b/packages/plugin-ext/src/plugin/documents.ts index 3eee7ade3a3d6..dc6bbbefe3ce6 100644 --- a/packages/plugin-ext/src/plugin/documents.ts +++ b/packages/plugin-ext/src/plugin/documents.ts @@ -158,7 +158,7 @@ export class DocumentsExtImpl implements DocumentsExt { const uriString = uri.toString(); const data = this.editorsAndDocuments.getDocument(uriString); if (!data) { - throw new Error('unknown document'); + throw new Error('unknown document: ' + uriString); } data.acceptIsDirty(isDirty); this._onDidChangeDocument.fire({ @@ -172,7 +172,7 @@ export class DocumentsExtImpl implements DocumentsExt { const uriString = uri.toString(); const data = this.editorsAndDocuments.getDocument(uriString); if (!data) { - throw new Error('unknown document'); + throw new Error('unknown document: ' + uriString); } data.acceptIsDirty(isDirty); data.onEvents(e);