Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RPC] Delay sending messages when there are handlers running initialization #13180

Merged
merged 12 commits into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion packages/plugin-ext/src/common/proxy-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,18 @@ export interface RpcHandlerOptions {
}
export interface ProxyHandlerOptions extends RpcHandlerOptions {
channelProvider: () => Promise<Channel>,
proxySynchronizer: ProxySynchronizer,
}

export interface InvocationHandlerOptions extends RpcHandlerOptions {
target: any
}

export interface ProxySynchronizer {
startProxyInitialization(id: string, init: Promise<void>): void
martin-fleck-at marked this conversation as resolved.
Show resolved Hide resolved
pendingProxyInitializations(): Promise<void>
}

/**
* A proxy handler that will send any method invocation on the proxied object
* as a rcp protocol message over a channel.
Expand All @@ -40,6 +47,7 @@ export class ClientProxyHandler<T extends object> implements ProxyHandler<T> {

readonly id: string;
private readonly channelProvider: () => Promise<Channel>;
private readonly proxySynchronizer: ProxySynchronizer;
private readonly encoder: RpcMessageEncoder;
private readonly decoder: RpcMessageDecoder;

Expand All @@ -50,6 +58,7 @@ export class ClientProxyHandler<T extends object> implements ProxyHandler<T> {
private initializeRpc(): void {
// we need to set the flag to true before waiting for the channel provider. Otherwise `get` might
// get called again and we'll try to open a channel more than once
this.proxySynchronizer.startProxyInitialization(this.id, this.rpcDeferred.promise.then(() => { }));
this.isRpcInitialized = true;
const clientOptions: RpcProtocolOptions = { encoder: this.encoder, decoder: this.decoder, mode: 'clientOnly' };
this.channelProvider().then(channel => {
Expand All @@ -69,7 +78,7 @@ export class ClientProxyHandler<T extends object> implements ProxyHandler<T> {
const isNotify = this.isNotification(name);
return (...args: any[]) => {
const method = name.toString();
return this.rpcDeferred.promise.then(async (connection: RpcProtocol) => {
return this.sendWhenNoInit(async (connection: RpcProtocol) => {
if (isNotify) {
connection.sendNotification(method, args);
} else {
Expand All @@ -79,6 +88,10 @@ export class ClientProxyHandler<T extends object> implements ProxyHandler<T> {
};
}

private sendWhenNoInit(send: (connection: RpcProtocol) => Promise<any>): Promise<any> {
return this.proxySynchronizer.pendingProxyInitializations().then(() => this.rpcDeferred.promise.then(send));
}

/**
* Return whether the given property represents a notification. If true,
* the promise returned from the invocation will resolve immediately to `undefined`
Expand Down
42 changes: 40 additions & 2 deletions packages/plugin-ext/src/common/rpc-protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ import { Emitter, Event } from '@theia/core/lib/common/event';
import { ChannelMultiplexer, MessageProvider } from '@theia/core/lib/common/message-rpc/channel';
import { MsgPackMessageDecoder, MsgPackMessageEncoder } from '@theia/core/lib/common/message-rpc/rpc-message-encoder';
import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '@theia/core/lib/common/message-rpc/uint8-array-message-buffer';
import { ClientProxyHandler, RpcInvocationHandler } from './proxy-handler';
import { ClientProxyHandler, ProxySynchronizer, RpcInvocationHandler } from './proxy-handler';
import { MsgPackExtensionManager } from '@theia/core/lib/common/message-rpc/msg-pack-extension-manager';
import { URI as VSCodeURI } from '@theia/core/shared/vscode-uri';
import { BinaryBuffer } from '@theia/core/lib/common/buffer';
import { Range, Position } from '../plugin/types-impl';
import { Deferred } from '@theia/core/lib/common/promise-util';

export interface MessageConnection {
send(msg: string): void;
Expand Down Expand Up @@ -83,6 +84,7 @@ export class RPCProtocolImpl implements RPCProtocol {
private readonly multiplexer: ChannelMultiplexer;
private readonly encoder = new MsgPackMessageEncoder();
private readonly decoder = new MsgPackMessageDecoder();
private readonly initCallback: ProxySynchronizer;

private readonly toDispose = new DisposableCollection(
Disposable.create(() => { /* mark as no disposed */ })
Expand All @@ -91,6 +93,7 @@ export class RPCProtocolImpl implements RPCProtocol {
constructor(channel: Channel) {
this.toDispose.push(this.multiplexer = new ChannelMultiplexer(new BatchingChannel(channel)));
this.toDispose.push(Disposable.create(() => this.proxies.clear()));
this.initCallback = new ProxySynchronizerImpl();
}

dispose(): void {
Expand All @@ -114,7 +117,9 @@ export class RPCProtocolImpl implements RPCProtocol {
}

protected createProxy<T>(proxyId: string): T {
const handler = new ClientProxyHandler({ id: proxyId, encoder: this.encoder, decoder: this.decoder, channelProvider: () => this.multiplexer.open(proxyId) });
const handler = new ClientProxyHandler({
id: proxyId, encoder: this.encoder, decoder: this.decoder, channelProvider: () => this.multiplexer.open(proxyId), proxySynchronizer: this.initCallback
});
return new Proxy(Object.create(null), handler);
}

Expand Down Expand Up @@ -149,6 +154,39 @@ export class RPCProtocolImpl implements RPCProtocol {
}
}

export class ProxySynchronizerImpl implements ProxySynchronizer {

private readonly runningInitializations = new Set<string>();

private _pendingProxyInitializations: Deferred<void>;

constructor() {
this._pendingProxyInitializations = new Deferred();
/* after creation no init is active */
this._pendingProxyInitializations.resolve();
}

startProxyInitialization(id: string, init: Promise<void>): void {
if (this.runningInitializations.size === 0) {
this._pendingProxyInitializations = new Deferred();
}
init.then(() => this.finishedProxyInitialization(id));
this.runningInitializations.add(id);
}

protected finishedProxyInitialization(id: string): void {
this.runningInitializations.delete(id);
if (this.runningInitializations.size === 0) {
this._pendingProxyInitializations.resolve();
}
}

pendingProxyInitializations(): Promise<void> {
return this._pendingProxyInitializations.promise;
}

}

/**
* Wraps and underlying channel to send/receive multiple messages in one go:
* - multiple messages to be sent from one stack get sent in bulk at `process.nextTick`.
Expand Down
4 changes: 2 additions & 2 deletions packages/plugin-ext/src/plugin/documents.ts
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ export class DocumentsExtImpl implements DocumentsExt {
const uriString = uri.toString();
const data = this.editorsAndDocuments.getDocument(uriString);
if (!data) {
throw new Error('unknown document');
throw new Error('unknown document: ' + uriString);
}
data.acceptIsDirty(isDirty);
this._onDidChangeDocument.fire({
Expand All @@ -172,7 +172,7 @@ export class DocumentsExtImpl implements DocumentsExt {
const uriString = uri.toString();
const data = this.editorsAndDocuments.getDocument(uriString);
if (!data) {
throw new Error('unknown document');
throw new Error('unknown document: ' + uriString);
}
data.acceptIsDirty(isDirty);
data.onEvents(e);
Expand Down
Loading