diff --git a/packages/core/src/common/disposable.ts b/packages/core/src/common/disposable.ts index 1920343c9073e..cfb07b03166b0 100644 --- a/packages/core/src/common/disposable.ts +++ b/packages/core/src/common/disposable.ts @@ -133,3 +133,28 @@ export function disposableTimeout(...args: Parameters): Dispo const handle = setTimeout(...args); return { dispose: () => clearTimeout(handle) }; } + +/** + * Wrapper for a {@link Disposable} that is not available immediately. + */ +export class DisposableWrapper implements Disposable { + + private disposed = false; + private disposable: Disposable | undefined = undefined; + + set(disposable: Disposable): void { + if (this.disposed) { + disposable.dispose(); + } else { + this.disposable = disposable; + } + } + + dispose(): void { + this.disposed = true; + if (this.disposable) { + this.disposable.dispose(); + this.disposable = undefined; + } + } +} diff --git a/packages/core/src/common/message-rpc/rpc-protocol.ts b/packages/core/src/common/message-rpc/rpc-protocol.ts index 9ebd529c4a3ed..f3f51decb3270 100644 --- a/packages/core/src/common/message-rpc/rpc-protocol.ts +++ b/packages/core/src/common/message-rpc/rpc-protocol.ts @@ -16,7 +16,7 @@ /* eslint-disable @typescript-eslint/no-explicit-any */ import { CancellationToken, CancellationTokenSource } from '../cancellation'; -import { Disposable, DisposableCollection } from '../disposable'; +import { DisposableWrapper, Disposable, DisposableCollection } from '../disposable'; import { Emitter, Event } from '../event'; import { Deferred } from '../promise-util'; import { Channel } from './channel'; @@ -57,6 +57,7 @@ export class RpcProtocol { static readonly CANCELLATION_TOKEN_KEY = 'add.cancellation.token'; protected readonly pendingRequests: Map> = new Map(); + protected readonly pendingRequestCancellationEventListeners: Map = new Map(); protected nextMessageId: number = 0; @@ -80,6 +81,8 @@ export class RpcProtocol { channel.onClose(event => { this.pendingRequests.forEach(pending => pending.reject(new Error(event.reason))); this.pendingRequests.clear(); + this.pendingRequestCancellationEventListeners.forEach(disposable => disposable.dispose()); + this.pendingRequestCancellationEventListeners.clear(); this.toDispose.dispose(); }); this.toDispose.push(channel.onMessage(readBuffer => this.handleMessage(this.decoder.parse(readBuffer())))); @@ -131,6 +134,7 @@ export class RpcProtocol { } else { throw new Error(`No reply handler for reply with id: ${id}`); } + this.disposeCancellationEventListener(id); } protected handleReplyErr(id: number, error: any): void { @@ -141,6 +145,15 @@ export class RpcProtocol { } else { throw new Error(`No reply handler for error reply with id: ${id}`); } + this.disposeCancellationEventListener(id); + } + + protected disposeCancellationEventListener(id: number): void { + const toDispose = this.pendingRequestCancellationEventListeners.get(id); + if (toDispose) { + this.pendingRequestCancellationEventListeners.delete(id); + toDispose.dispose(); + } } sendRequest(method: string, args: any[]): Promise { @@ -157,6 +170,10 @@ export class RpcProtocol { this.pendingRequests.set(id, reply); + // register disposable before output.commit() even when not available yet + const disposableWrapper = new DisposableWrapper(); + this.pendingRequestCancellationEventListeners.set(id, disposableWrapper); + const output = this.channel.getWriteBuffer(); this.encoder.request(output, id, method, args); output.commit(); @@ -164,7 +181,10 @@ export class RpcProtocol { if (cancellationToken?.isCancellationRequested) { this.sendCancel(id); } else { - cancellationToken?.onCancellationRequested(() => this.sendCancel(id)); + const disposable = cancellationToken?.onCancellationRequested(() => this.sendCancel(id)); + if (disposable) { + disposableWrapper.set(disposable); + } } return reply.promise;