Skip to content

Commit

Permalink
fix(NODE-5901): propagate errors to transformed stream in cursor (#3985)
Browse files Browse the repository at this point in the history
Co-authored-by: Durran Jordan <durran@gmail.com>
  • Loading branch information
vkarpov15 and durran authored Feb 13, 2024
1 parent a63fbc2 commit ecfc615
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 2 deletions.
8 changes: 7 additions & 1 deletion src/cursor/abstract_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ export abstract class AbstractCursor<
const transform = options.transform;
const readable = new ReadableCursorStream(this);

return readable.pipe(
const transformedStream = readable.pipe(
new Transform({
objectMode: true,
highWaterMark: 1,
Expand All @@ -351,6 +351,12 @@ export abstract class AbstractCursor<
}
})
);

// Bubble errors to transformed stream, because otherwise no way
// to handle this error.
readable.on('error', err => transformedStream.emit('error', err));

return transformedStream;
}

return new ReadableCursorStream(this);
Expand Down
44 changes: 43 additions & 1 deletion test/integration/node-specific/abstract_cursor.test.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
import { expect } from 'chai';
import { once } from 'events';
import * as sinon from 'sinon';
import { Transform } from 'stream';
import { inspect } from 'util';

import { type Collection, type FindCursor, MongoAPIError, type MongoClient } from '../../mongodb';
import {
type Collection,
type FindCursor,
MongoAPIError,
type MongoClient,
MongoServerError
} from '../../mongodb';

describe('class AbstractCursor', function () {
describe('regression tests NODE-5372', function () {
Expand Down Expand Up @@ -233,4 +240,39 @@ describe('class AbstractCursor', function () {
});
});
});

describe('transform stream error handling', function () {
let client: MongoClient;
let collection: Collection;
const docs = [{ count: 0 }];
beforeEach(async function () {
client = this.configuration.newClient();

collection = client.db('abstract_cursor_integration').collection('test');

await collection.insertMany(docs);
});

afterEach(async function () {
await collection.deleteMany({});
await client.close();
});

it('propagates errors to transform stream', async function () {
const transform = new Transform({
transform(data, encoding, callback) {
callback(null, data);
}
});

// MongoServerError: unknown operator: $bar
const stream = collection.find({ foo: { $bar: 25 } }).stream({ transform });

const error: Error | null = await new Promise(resolve => {
stream.on('error', error => resolve(error));
stream.on('end', () => resolve(null));
});
expect(error).to.be.instanceof(MongoServerError);
});
});
});

0 comments on commit ecfc615

Please sign in to comment.