Skip to content

Commit

Permalink
return async iterables in the non incremental delivery case (#4144)
Browse files Browse the repository at this point in the history
catching errors
  • Loading branch information
yaacovCR authored Jul 11, 2024
1 parent 5cd5001 commit d9fc656
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 76 deletions.
30 changes: 29 additions & 1 deletion src/execution/__tests__/lists-test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { expect } from 'chai';
import { assert, expect } from 'chai';
import { describe, it } from 'mocha';

import { expectJSON } from '../../__testUtils__/expectJSON.js';
Expand Down Expand Up @@ -245,6 +245,34 @@ describe('Execute: Accepts async iterables as list value', () => {
errors,
});
});

it('Returns async iterable when list nulls', async () => {
const values = [1, null, 2];
let i = 0;
let returned = false;
const listField = {
[Symbol.asyncIterator]: () => ({
next: () => Promise.resolve({ value: values[i++], done: false }),
return: () => {
returned = true;
return Promise.resolve({ value: undefined, done: true });
},
}),
};
const errors = [
{
message: 'Cannot return null for non-nullable field Query.listField.',
locations: [{ line: 1, column: 3 }],
path: ['listField', 1],
},
];

expectJSON(await complete({ listField }, '[Int!]')).toDeepEqual({
data: { listField: null },
errors,
});
assert(returned);
});
});

describe('Execute: Handles list nullability', () => {
Expand Down
163 changes: 88 additions & 75 deletions src/execution/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1094,66 +1094,86 @@ async function completeAsyncIteratorValue(
];
let index = 0;
const streamUsage = getStreamUsage(exeContext, fieldGroup, path);
// eslint-disable-next-line no-constant-condition
while (true) {
if (streamUsage && index >= streamUsage.initialCount) {
const streamItemQueue = buildAsyncStreamItemQueue(
index,
path,
asyncIterator,
exeContext,
streamUsage.fieldGroup,
info,
itemType,
);

const returnFn = asyncIterator.return;
let streamRecord: StreamRecord | CancellableStreamRecord;
if (returnFn === undefined) {
streamRecord = {
label: streamUsage.label,
path,
streamItemQueue,
};
} else {
streamRecord = {
label: streamUsage.label,
const earlyReturn =
asyncIterator.return === undefined
? undefined
: asyncIterator.return.bind(asyncIterator);
try {
// eslint-disable-next-line no-constant-condition
while (true) {
if (streamUsage && index >= streamUsage.initialCount) {
const streamItemQueue = buildAsyncStreamItemQueue(
index,
path,
streamItemQueue,
earlyReturn: returnFn.bind(asyncIterator),
};
if (exeContext.cancellableStreams === undefined) {
exeContext.cancellableStreams = new Set();
asyncIterator,
exeContext,
streamUsage.fieldGroup,
info,
itemType,
);

let streamRecord: StreamRecord | CancellableStreamRecord;
if (earlyReturn === undefined) {
streamRecord = {
label: streamUsage.label,
path,
streamItemQueue,
};
} else {
streamRecord = {
label: streamUsage.label,
path,
earlyReturn,
streamItemQueue,
};
if (exeContext.cancellableStreams === undefined) {
exeContext.cancellableStreams = new Set();
}
exeContext.cancellableStreams.add(streamRecord);
}
exeContext.cancellableStreams.add(streamRecord);
}

addIncrementalDataRecords(graphqlWrappedResult, [streamRecord]);
break;
}
addIncrementalDataRecords(graphqlWrappedResult, [streamRecord]);
break;
}

const itemPath = addPath(path, index, undefined);
let iteration;
try {
// eslint-disable-next-line no-await-in-loop
iteration = await asyncIterator.next();
} catch (rawError) {
throw locatedError(rawError, toNodes(fieldGroup), pathToArray(path));
}
const itemPath = addPath(path, index, undefined);
let iteration;
try {
// eslint-disable-next-line no-await-in-loop
iteration = await asyncIterator.next();
} catch (rawError) {
throw locatedError(rawError, toNodes(fieldGroup), pathToArray(path));
}

// TODO: add test case for stream returning done before initialCount
/* c8 ignore next 3 */
if (iteration.done) {
break;
}
// TODO: add test case for stream returning done before initialCount
/* c8 ignore next 3 */
if (iteration.done) {
break;
}

const item = iteration.value;
// TODO: add tests for stream backed by asyncIterator that returns a promise
/* c8 ignore start */
if (isPromise(item)) {
completedResults.push(
completePromisedListItemValue(
const item = iteration.value;
// TODO: add tests for stream backed by asyncIterator that returns a promise
/* c8 ignore start */
if (isPromise(item)) {
completedResults.push(
completePromisedListItemValue(
item,
graphqlWrappedResult,
exeContext,
itemType,
fieldGroup,
info,
itemPath,
incrementalContext,
deferMap,
),
);
containsPromise = true;
} else if (
/* c8 ignore stop */
completeListItemValue(
item,
completedResults,
graphqlWrappedResult,
exeContext,
itemType,
Expand All @@ -1162,30 +1182,23 @@ async function completeAsyncIteratorValue(
itemPath,
incrementalContext,
deferMap,
),
);
containsPromise = true;
} else if (
)
// TODO: add tests for stream backed by asyncIterator that completes to a promise
/* c8 ignore start */
) {
containsPromise = true;
}
/* c8 ignore stop */
completeListItemValue(
item,
completedResults,
graphqlWrappedResult,
exeContext,
itemType,
fieldGroup,
info,
itemPath,
incrementalContext,
deferMap,
)
// TODO: add tests for stream backed by asyncIterator that completes to a promise
/* c8 ignore start */
) {
containsPromise = true;
index++;
}
/* c8 ignore stop */
index++;
} catch (error) {
if (earlyReturn !== undefined) {
earlyReturn().catch(() => {
/* c8 ignore next 1 */
// ignore error
});
}
throw error;
}

return containsPromise
Expand Down

0 comments on commit d9fc656

Please sign in to comment.