Skip to content

Commit

Permalink
fix(shell-api): run $out/$merge aggregations immediately MONGOSH-972 (#…
Browse files Browse the repository at this point in the history
…1108)

As suggested by the Node.js driver team, use `.hasNext()` to
force sending the aggregation command to the server early.
  • Loading branch information
addaleax authored Sep 14, 2021
1 parent f4e672b commit 9eef48a
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 3 deletions.
5 changes: 4 additions & 1 deletion packages/shell-api/src/collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ import {
maybeMarkAsExplainOutput,
markAsExplainOutput,
assertArgsDefinedType,
isValidCollectionName
isValidCollectionName,
shouldRunAggregationImmediately
} from './helpers';
import {
AnyBulkWriteOperation,
Expand Down Expand Up @@ -180,6 +181,8 @@ export default class Collection extends ShellApiWithMongoClass {

if (explain) {
return await cursor.explain(explain);
} else if (shouldRunAggregationImmediately(pipeline)) {
await cursor.hasNext();
}

this._mongo._instanceState.currentCursor = cursor;
Expand Down
2 changes: 1 addition & 1 deletion packages/shell-api/src/database.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2479,7 +2479,7 @@ describe('Database', () => {
const exceptions = {
getCollectionNames: { m: 'listCollections' },
getCollectionInfos: { m: 'listCollections' },
aggregate: { m: 'aggregateDb' },
aggregate: { m: 'aggregateDb', a: [[]] },
dropDatabase: { m: 'dropDatabase', i: 1 },
createCollection: { m: 'createCollection', a: ['coll'] },
createView: { m: 'createCollection', a: ['coll', 'source', []] },
Expand Down
5 changes: 4 additions & 1 deletion packages/shell-api/src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import {
processDigestPassword,
tsToSeconds,
isValidCollectionName,
getConfigDB
getConfigDB,
shouldRunAggregationImmediately
} from './helpers';

import type {
Expand Down Expand Up @@ -336,6 +337,8 @@ export default class Database extends ShellApiWithMongoClass {

if (explain) {
return await cursor.explain(explain);
} else if (shouldRunAggregationImmediately(pipeline)) {
await cursor.hasNext();
}

this._mongo._instanceState.currentCursor = cursor;
Expand Down
6 changes: 6 additions & 0 deletions packages/shell-api/src/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -716,3 +716,9 @@ export function isValidDatabaseName(name: string): boolean {
export function isValidCollectionName(name: string): boolean {
return !!name && !/[$\0]/.test(name);
}

export function shouldRunAggregationImmediately(pipeline: Document[]): boolean {
return pipeline.some(stage =>
Object.keys(stage).some(
stageName => stageName === '$merge' || stageName === '$out'));
}
12 changes: 12 additions & 0 deletions packages/shell-api/src/integration.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -916,6 +916,18 @@ describe('Shell API (integration)', function() {
expect(await cursor.toArray()).to.have.length(0);
});

it('runs the aggregation immediately if it is $merge/$out', async() => {
const x = 123456789;
await collection.insertOne({ x });

await collection.aggregate(
{ $match: {} },
{ $out: 'copy' }
); // ignore the result

expect((await database.getCollection('copy').findOne()).x).to.equal(x);
});

[true, false, 'queryPlanner'].forEach(explain => {
it(`runs an explain with explain: ${explain}`, async() => {
await serviceProvider.insertOne(dbName, collectionName, { x: 1 });
Expand Down

0 comments on commit 9eef48a

Please sign in to comment.