Skip to content

Commit

Permalink
add abort signal support to our async iterables
Browse files Browse the repository at this point in the history
  • Loading branch information
yaacovCR committed Oct 31, 2024
1 parent 3036d41 commit 9e2f532
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 3 deletions.
33 changes: 32 additions & 1 deletion src/execution/AbortSignalListener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ export class AbortSignalListener {
this.abortSignal.removeEventListener('abort', this.abort);
}

cancellablePromise<T>(originalPromise: Promise<T>): Promise<T> {
cancellablePromise<T>(
originalPromise: Promise<T>,
onCancel?: (() => Promise<unknown>) | undefined,
): Promise<T> {
if (this.abortSignal.aborted) {
// eslint-disable-next-line @typescript-eslint/prefer-promise-reject-errors
return Promise.reject(this.abortSignal.reason);
Expand All @@ -40,14 +43,42 @@ export class AbortSignalListener {
originalPromise.then(
(resolved) => {
this._aborts.delete(abort);
onCancel?.().catch(() => {
// ignore
});
resolve(resolved);
},
(error: unknown) => {
this._aborts.delete(abort);
onCancel?.().catch(() => {
// ignore
});
reject(error);
},
);

return promise;
}

cancellableIterable<T>(iterable: AsyncIterable<T>): AsyncIterable<T> {
const iterator = iterable[Symbol.asyncIterator]();

const earlyReturn =
typeof iterator.return === 'function'
? iterator.return.bind(iterator)
: undefined;

const cancellableAsyncIterator: AsyncIterator<T> = {
next: () => this.cancellablePromise(iterator.next(), earlyReturn),
};

if (earlyReturn) {
cancellableAsyncIterator.return = () =>
this.cancellablePromise(earlyReturn());
}

return {
[Symbol.asyncIterator]: () => cancellableAsyncIterator,
};
}
}
6 changes: 4 additions & 2 deletions src/execution/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1387,7 +1387,9 @@ function completeListValue(
const itemType = returnType.ofType;

if (isAsyncIterable(result)) {
const asyncIterator = result[Symbol.asyncIterator]();
const maybeCancellableIterable =
exeContext.abortSignalListener?.cancellableIterable(result) ?? result;
const asyncIterator = maybeCancellableIterable[Symbol.asyncIterator]();

return completeAsyncIteratorValue(
exeContext,
Expand Down Expand Up @@ -2229,7 +2231,7 @@ function executeSubscription(
// TODO: add test case
/* c8 ignore next */
abortSignalListener?.disconnect();
return resolved;
return abortSignalListener?.cancellableIterable(resolved) ?? resolved;
},
(error: unknown) => {
abortSignalListener?.disconnect();
Expand Down

0 comments on commit 9e2f532

Please sign in to comment.