Skip to content

Commit

Permalink
introduce completeValue helpers within stream execution
Browse files Browse the repository at this point in the history
Motivation:
 = code reuse
 = helpers are optimized to decrease the number of ticks.

This results in changes to order in promise resolution with changes to
the value of hasNext.
  • Loading branch information
yaacovCR committed Oct 12, 2022
1 parent 5e7b220 commit 91bc8c0
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 91 deletions.
5 changes: 0 additions & 5 deletions src/execution/__tests__/stream-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -483,11 +483,6 @@ describe('Execute: stream directive', () => {
},
],
},
],
hasNext: true,
},
{
incremental: [
{
items: [{ name: 'Leia', id: '3' }],
path: ['friendList', 2],
Expand Down
195 changes: 109 additions & 86 deletions src/execution/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -905,6 +905,83 @@ function completeValue(
);
}

async function completePromiseCatchingErrors(
exeContext: ExecutionContext,
returnType: GraphQLOutputType,
fieldNodes: ReadonlyArray<FieldNode>,
info: GraphQLResolveInfo,
path: Path,
result: Promise<unknown>,
asyncPayloadRecord?: AsyncPayloadRecord,
): Promise<unknown> {
try {
const resolved = await result;
let completed = completeValue(
exeContext,
returnType,
fieldNodes,
info,
path,
resolved,
asyncPayloadRecord,
);
if (isPromise(completed)) {
// see: https://github.com/tc39/proposal-faster-promise-adoption
// it is faster to await a promise prior to returning it from an async function
completed = await completed;
}
return completed;
} catch (rawError) {
const errors = asyncPayloadRecord?.errors ?? exeContext.errors;
const error = locatedError(rawError, fieldNodes, pathToArray(path));
const handledError = handleFieldError(error, returnType, errors);
filterSubsequentPayloads(exeContext, path, asyncPayloadRecord);
return handledError;
}
}

function completeValueCatchingErrors(
exeContext: ExecutionContext,
returnType: GraphQLOutputType,
fieldNodes: ReadonlyArray<FieldNode>,
info: GraphQLResolveInfo,
path: Path,
result: unknown,
asyncPayloadRecord?: AsyncPayloadRecord,
): PromiseOrValue<unknown> {
let completedValue: PromiseOrValue<unknown>;
try {
completedValue = completeValue(
exeContext,
returnType,
fieldNodes,
info,
path,
result,
asyncPayloadRecord,
);
} catch (rawError) {
const errors = asyncPayloadRecord?.errors ?? exeContext.errors;
const error = locatedError(rawError, fieldNodes, pathToArray(path));
const handledError = handleFieldError(error, returnType, errors);
filterSubsequentPayloads(exeContext, path, asyncPayloadRecord);
return handledError;
}

if (isPromise(completedValue)) {
// Note: we don't rely on a `catch` method, but we do expect "thenable"
// to take a second callback for the error case.
completedValue = completedValue.then(undefined, (rawError) => {
const errors = asyncPayloadRecord?.errors ?? exeContext.errors;
const error = locatedError(rawError, fieldNodes, pathToArray(path));
const handledError = handleFieldError(error, returnType, errors);
filterSubsequentPayloads(exeContext, path, asyncPayloadRecord);
return handledError;
});
}
return completedValue;
}

/**
* Returns an object containing the `@stream` arguments if a field should be
* streamed based on the experimental flag, stream directive present and
Expand Down Expand Up @@ -1867,69 +1944,17 @@ function executeStreamField(
parentContext,
exeContext,
});
let completedItem: PromiseOrValue<unknown>;
try {
try {
if (isPromise(item)) {
completedItem = item.then((resolved) =>
completeValue(
exeContext,
itemType,
fieldNodes,
info,
itemPath,
resolved,
asyncPayloadRecord,
),
);
} else {
completedItem = completeValue(
exeContext,
itemType,
fieldNodes,
info,
itemPath,
item,
asyncPayloadRecord,
);
}

if (isPromise(completedItem)) {
// Note: we don't rely on a `catch` method, but we do expect "thenable"
// to take a second callback for the error case.
completedItem = completedItem.then(undefined, (rawError) => {
const error = locatedError(
rawError,
fieldNodes,
pathToArray(itemPath),
);
const handledError = handleFieldError(
error,
itemType,
asyncPayloadRecord.errors,
);
filterSubsequentPayloads(exeContext, itemPath, asyncPayloadRecord);
return handledError;
});
}
} catch (rawError) {
const error = locatedError(rawError, fieldNodes, pathToArray(itemPath));
completedItem = handleFieldError(
error,
itemType,
asyncPayloadRecord.errors,
);
filterSubsequentPayloads(exeContext, itemPath, asyncPayloadRecord);
}
} catch (error) {
asyncPayloadRecord.errors.push(error);
filterSubsequentPayloads(exeContext, path, asyncPayloadRecord);
asyncPayloadRecord.addItems(null);
return asyncPayloadRecord;
}

let completedItems: PromiseOrValue<Array<unknown> | null>;
if (isPromise(completedItem)) {
if (isPromise(item)) {
const completedItem = completePromiseCatchingErrors(
exeContext,
itemType,
fieldNodes,
info,
itemPath,
item,
asyncPayloadRecord,
);
completedItems = completedItem.then(
(value) => [value],
(error) => {
Expand All @@ -1939,6 +1964,23 @@ function executeStreamField(
},
);
} else {
let completedItem;
try {
completedItem = completeValueCatchingErrors(
exeContext,
itemType,
fieldNodes,
info,
itemPath,
item,
asyncPayloadRecord,
);
} catch (error) {
asyncPayloadRecord.errors.push(error);
filterSubsequentPayloads(exeContext, path, asyncPayloadRecord);
asyncPayloadRecord.addItems(null);
return asyncPayloadRecord;
}
completedItems = [completedItem];
}

Expand Down Expand Up @@ -1969,37 +2011,18 @@ async function executeStreamIteratorItem(
// don't continue if iterator throws
return { done: true, value };
}
let completedItem;
try {
completedItem = completeValue(
return {
done: false,
value: completeValueCatchingErrors(
exeContext,
itemType,
fieldNodes,
info,
itemPath,
item,
asyncPayloadRecord,
);

if (isPromise(completedItem)) {
completedItem = completedItem.then(undefined, (rawError) => {
const error = locatedError(rawError, fieldNodes, pathToArray(itemPath));
const handledError = handleFieldError(
error,
itemType,
asyncPayloadRecord.errors,
);
filterSubsequentPayloads(exeContext, itemPath, asyncPayloadRecord);
return handledError;
});
}
return { done: false, value: completedItem };
} catch (rawError) {
const error = locatedError(rawError, fieldNodes, pathToArray(itemPath));
const value = handleFieldError(error, itemType, asyncPayloadRecord.errors);
filterSubsequentPayloads(exeContext, itemPath, asyncPayloadRecord);
return { done: false, value };
}
),
};
}

async function executeStreamIterator(
Expand Down

0 comments on commit 91bc8c0

Please sign in to comment.