From ecfc6157786360832d1afb4294d76f83a90a9d70 Mon Sep 17 00:00:00 2001 From: Valeri Karpov Date: Tue, 13 Feb 2024 13:04:59 -0500 Subject: [PATCH] fix(NODE-5901): propagate errors to transformed stream in cursor (#3985) Co-authored-by: Durran Jordan --- src/cursor/abstract_cursor.ts | 8 +++- .../node-specific/abstract_cursor.test.ts | 44 ++++++++++++++++++- 2 files changed, 50 insertions(+), 2 deletions(-) diff --git a/src/cursor/abstract_cursor.ts b/src/cursor/abstract_cursor.ts index d528d9b5d8..412361d0e8 100644 --- a/src/cursor/abstract_cursor.ts +++ b/src/cursor/abstract_cursor.ts @@ -337,7 +337,7 @@ export abstract class AbstractCursor< const transform = options.transform; const readable = new ReadableCursorStream(this); - return readable.pipe( + const transformedStream = readable.pipe( new Transform({ objectMode: true, highWaterMark: 1, @@ -351,6 +351,12 @@ export abstract class AbstractCursor< } }) ); + + // Bubble errors to transformed stream, because otherwise no way + // to handle this error. + readable.on('error', err => transformedStream.emit('error', err)); + + return transformedStream; } return new ReadableCursorStream(this); diff --git a/test/integration/node-specific/abstract_cursor.test.ts b/test/integration/node-specific/abstract_cursor.test.ts index eab72617f0..8a382724c6 100644 --- a/test/integration/node-specific/abstract_cursor.test.ts +++ b/test/integration/node-specific/abstract_cursor.test.ts @@ -1,9 +1,16 @@ import { expect } from 'chai'; import { once } from 'events'; import * as sinon from 'sinon'; +import { Transform } from 'stream'; import { inspect } from 'util'; -import { type Collection, type FindCursor, MongoAPIError, type MongoClient } from '../../mongodb'; +import { + type Collection, + type FindCursor, + MongoAPIError, + type MongoClient, + MongoServerError +} from '../../mongodb'; describe('class AbstractCursor', function () { describe('regression tests NODE-5372', function () { @@ -233,4 +240,39 @@ describe('class AbstractCursor', function () { }); }); }); + + describe('transform stream error handling', function () { + let client: MongoClient; + let collection: Collection; + const docs = [{ count: 0 }]; + beforeEach(async function () { + client = this.configuration.newClient(); + + collection = client.db('abstract_cursor_integration').collection('test'); + + await collection.insertMany(docs); + }); + + afterEach(async function () { + await collection.deleteMany({}); + await client.close(); + }); + + it('propagates errors to transform stream', async function () { + const transform = new Transform({ + transform(data, encoding, callback) { + callback(null, data); + } + }); + + // MongoServerError: unknown operator: $bar + const stream = collection.find({ foo: { $bar: 25 } }).stream({ transform }); + + const error: Error | null = await new Promise(resolve => { + stream.on('error', error => resolve(error)); + stream.on('end', () => resolve(null)); + }); + expect(error).to.be.instanceof(MongoServerError); + }); + }); });