Skip to content

Commit

Permalink
incremental delivery: add pending notifications (#3897)
Browse files Browse the repository at this point in the history
  • Loading branch information
yaacovCR authored Aug 25, 2023
1 parent 00e2b50 commit fe65bc8
Show file tree
Hide file tree
Showing 4 changed files with 190 additions and 2 deletions.
56 changes: 54 additions & 2 deletions src/execution/IncrementalPublisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import type {
import type { GroupedFieldSet } from './collectFields.js';

interface IncrementalUpdate<TData = unknown, TExtensions = ObjMap<unknown>> {
pending: ReadonlyArray<PendingResult>;
incremental: ReadonlyArray<IncrementalResult<TData, TExtensions>>;
completed: ReadonlyArray<CompletedResult>;
}
Expand Down Expand Up @@ -59,6 +60,7 @@ export interface InitialIncrementalExecutionResult<
TExtensions = ObjMap<unknown>,
> extends ExecutionResult<TData, TExtensions> {
data: TData;
pending: ReadonlyArray<PendingResult>;
hasNext: true;
extensions?: TExtensions;
}
Expand All @@ -68,6 +70,7 @@ export interface FormattedInitialIncrementalExecutionResult<
TExtensions = ObjMap<unknown>,
> extends FormattedExecutionResult<TData, TExtensions> {
data: TData;
pending: ReadonlyArray<PendingResult>;
hasNext: boolean;
extensions?: TExtensions;
}
Expand All @@ -85,6 +88,7 @@ export interface FormattedSubsequentIncrementalExecutionResult<
TExtensions = ObjMap<unknown>,
> {
hasNext: boolean;
pending?: ReadonlyArray<PendingResult>;
incremental?: ReadonlyArray<FormattedIncrementalResult<TData, TExtensions>>;
completed?: ReadonlyArray<FormattedCompletedResult>;
extensions?: TExtensions;
Expand Down Expand Up @@ -141,6 +145,11 @@ export type FormattedIncrementalResult<
| FormattedIncrementalDeferResult<TData, TExtensions>
| FormattedIncrementalStreamResult<TData, TExtensions>;

export interface PendingResult {
path: ReadonlyArray<string | number>;
label?: string;
}

export interface CompletedResult {
path: ReadonlyArray<string | number>;
label?: string;
Expand Down Expand Up @@ -296,10 +305,20 @@ export class IncrementalPublisher {

const errors = initialResultRecord.errors;
const initialResult = errors.length === 0 ? { data } : { errors, data };
if (this._pending.size > 0) {
const pending = this._pending;
if (pending.size > 0) {
const pendingSources = new Set<DeferredFragmentRecord | StreamRecord>();
for (const subsequentResultRecord of pending) {
const pendingSource = isStreamItemsRecord(subsequentResultRecord)
? subsequentResultRecord.streamRecord
: subsequentResultRecord;
pendingSources.add(pendingSource);
}

return {
initialResult: {
...initialResult,
pending: this._pendingSourcesToResults(pendingSources),
hasNext: true,
},
subsequentResults: this._subscribe(),
Expand Down Expand Up @@ -347,6 +366,23 @@ export class IncrementalPublisher {
});
}

private _pendingSourcesToResults(
pendingSources: ReadonlySet<DeferredFragmentRecord | StreamRecord>,
): Array<PendingResult> {
const pendingResults: Array<PendingResult> = [];
for (const pendingSource of pendingSources) {
pendingSource.pendingSent = true;
const pendingResult: PendingResult = {
path: pendingSource.path,
};
if (pendingSource.label !== undefined) {
pendingResult.label = pendingSource.label;
}
pendingResults.push(pendingResult);
}
return pendingResults;
}

private _subscribe(): AsyncGenerator<
SubsequentIncrementalExecutionResult,
void,
Expand Down Expand Up @@ -461,14 +497,18 @@ export class IncrementalPublisher {
private _getIncrementalResult(
completedRecords: ReadonlySet<SubsequentResultRecord>,
): SubsequentIncrementalExecutionResult | undefined {
const { incremental, completed } = this._processPending(completedRecords);
const { pending, incremental, completed } =
this._processPending(completedRecords);

const hasNext = this._pending.size > 0;
if (incremental.length === 0 && completed.length === 0 && hasNext) {
return undefined;
}

const result: SubsequentIncrementalExecutionResult = { hasNext };
if (pending.length) {
result.pending = pending;
}
if (incremental.length) {
result.incremental = incremental;
}
Expand All @@ -482,17 +522,25 @@ export class IncrementalPublisher {
private _processPending(
completedRecords: ReadonlySet<SubsequentResultRecord>,
): IncrementalUpdate {
const newPendingSources = new Set<DeferredFragmentRecord | StreamRecord>();
const incrementalResults: Array<IncrementalResult> = [];
const completedResults: Array<CompletedResult> = [];
for (const subsequentResultRecord of completedRecords) {
for (const child of subsequentResultRecord.children) {
if (child.filtered) {
continue;
}
const pendingSource = isStreamItemsRecord(child)
? child.streamRecord
: child;
if (!pendingSource.pendingSent) {
newPendingSources.add(pendingSource);
}
this._publish(child);
}
if (isStreamItemsRecord(subsequentResultRecord)) {
if (subsequentResultRecord.isFinalRecord) {
newPendingSources.delete(subsequentResultRecord.streamRecord);
completedResults.push(
this._completedRecordToResult(subsequentResultRecord.streamRecord),
);
Expand All @@ -513,6 +561,7 @@ export class IncrementalPublisher {
}
incrementalResults.push(incrementalResult);
} else {
newPendingSources.delete(subsequentResultRecord);
completedResults.push(
this._completedRecordToResult(subsequentResultRecord),
);
Expand All @@ -537,6 +586,7 @@ export class IncrementalPublisher {
}

return {
pending: this._pendingSourcesToResults(newPendingSources),
incremental: incrementalResults,
completed: completedResults,
};
Expand Down Expand Up @@ -690,6 +740,7 @@ export class DeferredFragmentRecord {
deferredGroupedFieldSetRecords: Set<DeferredGroupedFieldSetRecord>;
errors: Array<GraphQLError>;
filtered: boolean;
pendingSent?: boolean;
_pending: Set<DeferredGroupedFieldSetRecord>;

constructor(opts: { path: Path | undefined; label: string | undefined }) {
Expand All @@ -709,6 +760,7 @@ export class StreamRecord {
path: ReadonlyArray<string | number>;
errors: Array<GraphQLError>;
earlyReturn?: (() => Promise<unknown>) | undefined;
pendingSent?: boolean;
constructor(opts: {
label: string | undefined;
path: Path;
Expand Down
Loading

0 comments on commit fe65bc8

Please sign in to comment.