Skip to content

Commit

Permalink
debt - change listenStream to use a cancellation token (#191003)
Browse files Browse the repository at this point in the history
  • Loading branch information
bpasero authored Aug 22, 2023
1 parent f058931 commit 08a8a80
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 17 deletions.
14 changes: 6 additions & 8 deletions src/vs/base/common/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
* Licensed under the MIT License. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/

import { CancellationToken } from 'vs/base/common/cancellation';
import { onUnexpectedError } from 'vs/base/common/errors';
import { DisposableStore, IDisposable, toDisposable } from 'vs/base/common/lifecycle';
import { DisposableStore, toDisposable } from 'vs/base/common/lifecycle';

/**
* The payload that flows in readable stream events.
Expand Down Expand Up @@ -567,17 +568,16 @@ export interface IStreamListener<T> {
/**
* Helper to listen to all events of a T stream in proper order.
*/
export function listenStream<T>(stream: ReadableStreamEvents<T>, listener: IStreamListener<T>): IDisposable {
let destroyed = false;
export function listenStream<T>(stream: ReadableStreamEvents<T>, listener: IStreamListener<T>, token?: CancellationToken): void {

stream.on('error', error => {
if (!destroyed) {
if (!token?.isCancellationRequested) {
listener.onError(error);
}
});

stream.on('end', () => {
if (!destroyed) {
if (!token?.isCancellationRequested) {
listener.onEnd();
}
});
Expand All @@ -586,12 +586,10 @@ export function listenStream<T>(stream: ReadableStreamEvents<T>, listener: IStre
// into flowing mode. As such it is important to
// add this listener last (DO NOT CHANGE!)
stream.on('data', data => {
if (!destroyed) {
if (!token?.isCancellationRequested) {
listener.onData(data);
}
});

return toDisposable(() => destroyed = true);
}

/**
Expand Down
11 changes: 7 additions & 4 deletions src/vs/base/test/common/stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import * as assert from 'assert';
import { timeout } from 'vs/base/common/async';
import { bufferToReadable, VSBuffer } from 'vs/base/common/buffer';
import { CancellationTokenSource } from 'vs/base/common/cancellation';
import { consumeReadable, consumeStream, isReadable, isReadableBufferedStream, isReadableStream, listenStream, newWriteableStream, peekReadable, peekStream, prefixedReadable, prefixedStream, Readable, ReadableStream, toReadable, toStream, transform } from 'vs/base/common/stream';

suite('Stream', () => {
Expand Down Expand Up @@ -351,14 +352,16 @@ suite('Stream', () => {
assert.strictEqual(end, true);
});

test('listenStream - dispose', () => {
test('listenStream - cancellation', () => {
const stream = newWriteableStream<string>(strings => strings.join());

let error = false;
let end = false;
let data = '';

const disposable = listenStream(stream, {
const cts = new CancellationTokenSource();

listenStream(stream, {
onData: d => {
data = d;
},
Expand All @@ -368,9 +371,9 @@ suite('Stream', () => {
onEnd: () => {
end = true;
}
});
}, cts.token);

disposable.dispose();
cts.cancel();

stream.write('Hello');
assert.strictEqual(data, '');
Expand Down
4 changes: 2 additions & 2 deletions src/vs/workbench/contrib/files/browser/fileImportExport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,7 @@ export class FileDownload {
reject(canceled());
}));

disposables.add(listenStream(sourceStream, {
listenStream(sourceStream, {
onData: data => {
target.write(data.buffer);
this.reportProgress(contents.name, contents.size, data.byteLength, operation);
Expand All @@ -728,7 +728,7 @@ export class FileDownload {
disposables.dispose();
resolve();
}
}));
}, token);
});
}

Expand Down
9 changes: 6 additions & 3 deletions src/vs/workbench/services/textfile/common/encoding.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import { Readable, ReadableStream, newWriteableStream, listenStream } from 'vs/base/common/stream';
import { VSBuffer, VSBufferReadable, VSBufferReadableStream } from 'vs/base/common/buffer';
import { importAMDNodeModule } from 'vs/amdX';
import { CancellationTokenSource } from 'vs/base/common/cancellation';

export const UTF8 = 'utf8';
export const UTF8_with_bom = 'utf8bom';
Expand Down Expand Up @@ -124,6 +125,8 @@ export function toDecodeStream(source: VSBufferReadableStream, options: IDecodeS

let decoder: IDecoderStream | undefined = undefined;

const cts = new CancellationTokenSource();

const createDecoder = async () => {
try {

Expand Down Expand Up @@ -158,14 +161,14 @@ export function toDecodeStream(source: VSBufferReadableStream, options: IDecodeS
} catch (error) {

// Stop handling anything from the source and target
sourceListener?.dispose();
cts.cancel();
target.destroy();

reject(error);
}
};

const sourceListener = listenStream(source, {
listenStream(source, {
onData: async chunk => {

// if the decoder is ready, we just write directly
Expand Down Expand Up @@ -205,7 +208,7 @@ export function toDecodeStream(source: VSBufferReadableStream, options: IDecodeS
// end the target with the remainders of the decoder
target.end(decoder?.end());
}
});
}, cts.token);
});
}

Expand Down

0 comments on commit 08a8a80

Please sign in to comment.