diff --git a/src/change_stream.ts b/src/change_stream.ts index aa3e092022..34f92a4477 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -946,7 +946,7 @@ export class ChangeStream< // If the change stream has been closed explicitly, do not process error. if (this[kClosed]) return; - if (isResumableError(changeStreamError, this.cursor.maxWireVersion)) { + if (this.cursor.id != null && isResumableError(changeStreamError, this.cursor.maxWireVersion)) { this._endStream(); this.cursor.close().then(undefined, squashError); @@ -975,7 +975,10 @@ export class ChangeStream< throw new MongoAPIError(CHANGESTREAM_CLOSED_ERROR); } - if (!isResumableError(changeStreamError, this.cursor.maxWireVersion)) { + if ( + this.cursor.id == null || + !isResumableError(changeStreamError, this.cursor.maxWireVersion) + ) { try { await this.close(); } catch (error) { diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index 64297ad512..9e171f0ee6 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -2061,6 +2061,34 @@ describe('ChangeStream resumability', function () { } ); }); + + context('when the error occurs on the aggregate command', function () { + it( + 'does not resume', + { requires: { topology: '!single', mongodb: '>=4.2' } }, + async function () { + const resumableErrorCode = 7; // Host not found + await client.db('admin').command({ + configureFailPoint: 'failCommand', + mode: { times: 2 }, // Fail twice to account for retry attempt in executeOperation which is separate from the change stream's resume attempt + data: { + failCommands: ['aggregate'], + errorCode: resumableErrorCode + } + } as FailPoint); + + changeStream = collection.watch([]); + + await collection.insertOne({ name: 'bailey' }); + + const maybeError = await changeStream.next().catch(e => e); + + expect(maybeError).to.be.instanceof(MongoServerError); + expect(aggregateEvents).to.have.lengthOf(2); + expect(changeStream.closed).to.be.true; + } + ); + }); }); context('#hasNext', function () { @@ -2225,6 +2253,34 @@ describe('ChangeStream resumability', function () { } ); }); + + context('when the error occurs on the aggregate command', function () { + it( + 'does not resume', + { requires: { topology: '!single', mongodb: '>=4.2' } }, + async function () { + const resumableErrorCode = 7; // Host not found + await client.db('admin').command({ + configureFailPoint: 'failCommand', + mode: { times: 2 }, // Fail twice to account for retry attempt in executeOperation which is separate from the change stream's resume attempt + data: { + failCommands: ['aggregate'], + errorCode: resumableErrorCode + } + } as FailPoint); + + changeStream = collection.watch([]); + + await collection.insertOne({ name: 'bailey' }); + + const maybeError = await changeStream.hasNext().catch(e => e); + + expect(maybeError).to.be.instanceof(MongoServerError); + expect(aggregateEvents).to.have.lengthOf(2); + expect(changeStream.closed).to.be.true; + } + ); + }); }); context('#tryNext', function () { @@ -2401,6 +2457,34 @@ describe('ChangeStream resumability', function () { } ); }); + + context('when the error occurs on the aggregate command', function () { + it( + 'does not resume', + { requires: { topology: '!single', mongodb: '>=4.2' } }, + async function () { + const resumableErrorCode = 7; // Host not found + await client.db('admin').command({ + configureFailPoint: 'failCommand', + mode: { times: 2 }, // Fail twice to account for retry attempt in executeOperation which is separate from the change stream's resume attempt + data: { + failCommands: ['aggregate'], + errorCode: resumableErrorCode + } + } as FailPoint); + + changeStream = collection.watch([]); + + await collection.insertOne({ name: 'bailey' }); + + const maybeError = await changeStream.tryNext().catch(e => e); + + expect(maybeError).to.be.instanceof(MongoServerError); + expect(aggregateEvents).to.have.lengthOf(2); + expect(changeStream.closed).to.be.true; + } + ); + }); }); context('#asyncIterator', function () { @@ -2551,6 +2635,41 @@ describe('ChangeStream resumability', function () { } ); }); + + context('when the error occurs on the aggregate command', function () { + it( + 'does not resume', + { requires: { topology: '!single', mongodb: '>=4.2' } }, + async function () { + changeStream = collection.watch([]); + + const docs = [{ city: 'New York City' }, { city: 'Seattle' }, { city: 'Boston' }]; + await collection.insertMany(docs); + + const resumableErrorCode = 7; + await client.db('admin').command({ + configureFailPoint: 'failCommand', + mode: { times: 2 }, // Account for retry in executeOperation which is separate from change stream's resume + data: { + failCommands: ['aggregate'], + errorCode: resumableErrorCode + } + } as FailPoint); + + try { + // eslint-disable-next-line @typescript-eslint/no-unused-vars + for await (const change of changeStream) { + expect.fail('Change stream produced events on an unresumable error'); + } + expect.fail('Change stream did not iterate and did not throw an error'); + } catch (error) { + expect(error).to.be.instanceOf(MongoServerError); + expect(aggregateEvents).to.have.lengthOf(2); + expect(changeStream.closed).to.be.true; + } + } + ); + }); }); }); @@ -2721,6 +2840,35 @@ describe('ChangeStream resumability', function () { } ); }); + + context('when the error occurred on the aggregate', function () { + it( + 'does not resume', + { requires: { topology: '!single', mongodb: '>=4.2' } }, + async function () { + changeStream = collection.watch([]); + + const resumableErrorCode = 7; + await client.db('admin').command({ + configureFailPoint: 'failCommand', + mode: { times: 2 }, // account for retry attempt in executeOperation which is separate from change stream's retry + data: { + failCommands: ['aggregate'], + errorCode: resumableErrorCode + } + } as FailPoint); + + const willBeError = once(changeStream, 'change').catch(error => error); + await collection.insertOne({ name: 'bailey' }); + + const error = await willBeError; + + expect(error).to.be.instanceOf(MongoServerError); + expect(aggregateEvents).to.have.lengthOf(2); + expect(changeStream.closed).to.be.true; + } + ); + }); }); it( diff --git a/test/integration/change-streams/change_streams.prose.test.ts b/test/integration/change-streams/change_streams.prose.test.ts index 4303bb1eb6..60492b40d3 100644 --- a/test/integration/change-streams/change_streams.prose.test.ts +++ b/test/integration/change-streams/change_streams.prose.test.ts @@ -5,6 +5,7 @@ import { setTimeout } from 'timers'; import { type ChangeStream, + type Collection, type CommandFailedEvent, type CommandStartedEvent, type CommandSucceededEvent, @@ -12,6 +13,7 @@ import { isHello, LEGACY_HELLO_COMMAND, Long, + type MongoClient, MongoNetworkError, ObjectId, Timestamp @@ -840,8 +842,8 @@ describe('Change Stream prose tests', function () { // 15 - 16 removed by spec describe('Change Stream prose 17-18', function () { - let client; - let coll; + let client: MongoClient; + let coll: Collection; let startAfter; function recordEvent(events, e) { @@ -886,31 +888,36 @@ describe('Change Stream prose tests', function () { // when resuming a change stream. it('$changeStream without results must include startAfter and not resumeAfter', { metadata: { requires: { topology: 'replicaset', mongodb: '>=4.1.1' } }, - test: function (done) { + test: async function () { const events = []; client.on('commandStarted', e => recordEvent(events, e)); const changeStream = coll.watch([], { startAfter }); - this.defer(() => changeStream.close()); - changeStream.once('change', change => { - expect(change).to.containSubset({ - operationType: 'insert', - fullDocument: { x: 2 } - }); - - expect(events).to.be.an('array').with.lengthOf(3); - expect(events[0]).nested.property('$changeStream.startAfter').to.exist; - expect(events[1]).to.equal('error'); - expect(events[2]).nested.property('$changeStream.startAfter').to.exist; - done(); + changeStream.on('error', async e => { + await changeStream.close(e); }); - waitForStarted(changeStream, () => { - triggerResumableError(changeStream, () => { - events.push('error'); - coll.insertOne({ x: 2 }, { writeConcern: { w: 'majority', j: true } }); - }); + const changePromise = once(changeStream, 'change'); + await once(changeStream.cursor, 'init'); + + const stub = sinon.stub(changeStream.cursor, 'close'); + + stub.callsFake(async function () { + stub.wrappedMethod.call(this); + stub.restore(); + events.push('error'); + await coll.insertOne({ x: 2 }, { writeConcern: { w: 'majority', j: true } }); }); + + changeStream.cursorStream.emit('error', new MongoNetworkError('error triggered from test')); + + const [change] = await changePromise; + expect(change).to.containSubset({ operationType: 'insert', fullDocument: { x: 2 } }); + expect(events).to.be.an('array').with.lengthOf(3); + + expect(events[0]).nested.property('$changeStream.startAfter').to.exist; + expect(events[1]).to.equal('error'); + expect(events[2]).nested.property('$changeStream.startAfter').to.exist; } });