diff --git a/lib/cmap/wire_protocol/get_more.js b/lib/cmap/wire_protocol/get_more.js index 69ecedbedb..0f0e4fa8de 100644 --- a/lib/cmap/wire_protocol/get_more.js +++ b/lib/cmap/wire_protocol/get_more.js @@ -59,8 +59,13 @@ function getMore(server, ns, cursorState, batchSize, options, callback) { return; } + const cursorId = + cursorState.cursorId instanceof Long + ? cursorState.cursorId + : Long.fromNumber(cursorState.cursorId); + const getMoreCmd = { - getMore: cursorState.cursorId, + getMore: cursorId, collection: collectionNamespace(ns), batchSize: Math.abs(batchSize) }; diff --git a/test/examples/change_streams.js b/test/examples/change_streams.js index 783170e8c4..bd4c1d6158 100644 --- a/test/examples/change_streams.js +++ b/test/examples/change_streams.js @@ -127,6 +127,11 @@ describe('examples(change-stream):', function() { }); looper.run(); + let processChange; + const streamExampleFinished = new Promise(resolve => { + processChange = resolve; + }); + // Start Changestream Example 3 const collection = db.collection('inventory'); const changeStream = collection.watch(); @@ -138,7 +143,7 @@ describe('examples(change-stream):', function() { newChangeStream = collection.watch({ resumeAfter: resumeToken }); newChangeStream.on('change', next => { - // process next document + processChange(next); }); }); // End Changestream Example 3 @@ -155,6 +160,8 @@ describe('examples(change-stream):', function() { // End Changestream Example 3 Alternative await newChangeStreamIterator.close(); + + await streamExampleFinished; await newChangeStream.close(); await looper.stop(); diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index afbc368903..f5b8c91b77 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -1,7 +1,7 @@ 'use strict'; const assert = require('assert'); const { Transform } = require('stream'); -const { MongoError, MongoNetworkError } = require('../../lib/error'); +const { MongoNetworkError } = require('../../lib/error'); const { delay, setupDatabase, withClient, withCursor } = require('./shared'); const co = require('co'); const mock = require('mongodb-mock-server'); @@ -9,6 +9,8 @@ const chai = require('chai'); const expect = chai.expect; const sinon = require('sinon'); const { ObjectId, Timestamp, Long, ReadPreference } = require('../..'); +const fs = require('fs'); +const crypto = require('crypto'); chai.use(require('chai-subset')); @@ -21,6 +23,7 @@ function withChangeStream(dbName, collectionName, callback) { collectionName = dbName; dbName = undefined; } + dbName = dbName || 'changestream_integration_test'; collectionName = collectionName || 'test'; @@ -79,6 +82,7 @@ function waitForStarted(changeStream, callback) { const timeout = setTimeout(() => { throw new Error('Change stream never started'); }, 2000); + changeStream.cursor.once('init', () => { clearTimeout(timeout); callback(); @@ -145,7 +149,7 @@ function exhaust(changeStream, bag, callback) { } // Define the pipeline processing changes -var pipeline = [ +const pipeline = [ { $addFields: { addedField: 'This is a field added using $addFields' } }, { $project: { documentKey: false } }, { $addFields: { comment: 'The documentKey field has been projected out of this document.' } } @@ -173,25 +177,28 @@ describe('Change Streams', function() { }); afterEach(() => mock.cleanup()); - it('Should close the listeners after the cursor is closed', { + it('should close the listeners after the cursor is closed', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, test: function(done) { let closed = false; - const close = _err => { - if (closed) { - return; - } + function close(err) { + if (closed) return; closed = true; - return client.close(() => done(_err)); - }; + done(err); + } + const configuration = this.configuration; const client = configuration.newClient(); client.connect((err, client) => { expect(err).to.not.exist; + this.defer(() => client.close()); + const coll = client.db('integration_tests').collection('listenertest'); const changeStream = coll.watch(); + this.defer(() => changeStream.close()); + changeStream.on('change', () => { const internalCursor = changeStream.cursor; expect(internalCursor.listenerCount('data')).to.equal(1); @@ -200,68 +207,110 @@ describe('Change Streams', function() { close(err); }); }); - waitForStarted(changeStream, () => coll.insertOne({ x: 1 })); + + waitForStarted(changeStream, () => this.defer(coll.insertOne({ x: 1 }))); changeStream.on('error', err => close(err)); }); } }); - it('Should create a Change Stream on a collection and emit `change` events', { + class EventCollector { + constructor(obj, events, options) { + this._events = []; + this._timeout = options ? options.timeout : 5000; + + events.forEach(eventName => { + this._events[eventName] = []; + obj.on(eventName, event => this._events[eventName].push(event)); + }); + } + + waitForEvent(eventName, count, callback) { + if (typeof count === 'function') { + callback = count; + count = 1; + } + + waitForEventImpl(this, Date.now(), eventName, count, callback); + } + + reset(eventName) { + if (eventName == null) { + Object.keys(this._events).forEach(eventName => { + this._events[eventName] = []; + }); + + return; + } + + if (this._events[eventName] == null) { + throw new TypeError(`invalid event name "${eventName}" specified for reset`); + } + + this._events[eventName] = []; + } + } + + function waitForEventImpl(collector, start, eventName, count, callback) { + const events = collector._events[eventName]; + if (events.length >= count) { + return callback(undefined, events); + } + + if (Date.now() - start >= collector._timeout) { + return callback(new Error(`timed out waiting for event "${eventName}"`)); + } + + setTimeout(() => waitForEventImpl(collector, start, eventName, count, callback), 10); + } + + it('should create a ChangeStream on a collection and emit `change` events', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, test: function(done) { const configuration = this.configuration; const client = configuration.newClient(); - client.connect(function(err, client) { + client.connect((err, client) => { expect(err).to.not.exist; + this.defer(() => client.close()); + const collection = client.db('integration_tests').collection('docsDataEvent'); const changeStream = collection.watch(pipeline); + this.defer(() => changeStream.close()); - let count = 0; - - const cleanup = _err => { - changeStream.removeAllListeners('change'); - changeStream.close(err => client.close(cerr => done(_err || err || cerr))); - }; - - // Attach first event listener - changeStream.on('change', function(change) { - try { - if (count === 0) { - count += 1; - expect(change).to.containSubset({ - operationType: 'insert', - fullDocument: { d: 4 }, - ns: { - db: 'integration_tests', - coll: 'docsDataEvent' - }, - comment: 'The documentKey field has been projected out of this document.' - }); - expect(change).to.not.have.property('documentKey'); - return; - } - - expect(change).to.containSubset({ - operationType: 'update', - updateDescription: { - updatedFields: { d: 6 } - } - }); - cleanup(); - } catch (e) { - cleanup(e); - } - }); - + const collector = new EventCollector(changeStream, ['init', 'change']); waitForStarted(changeStream, () => { // Trigger the first database event - collection.insertOne({ d: 4 }, function(err) { - assert.ifError(err); + collection.insertOne({ d: 4 }, err => { + expect(err).to.not.exist; // Trigger the second database event - collection.updateOne({ d: 4 }, { $inc: { d: 2 } }, function(err) { - assert.ifError(err); + collection.updateOne({ d: 4 }, { $inc: { d: 2 } }, err => { + expect(err).to.not.exist; + + collector.waitForEvent('change', 2, (err, changes) => { + expect(err).to.not.exist; + expect(changes).to.have.length(2); + expect(changes[0]).to.not.have.property('documentKey'); + expect(changes[0]).to.containSubset({ + operationType: 'insert', + fullDocument: { d: 4 }, + ns: { + db: 'integration_tests', + coll: 'docsDataEvent' + }, + comment: 'The documentKey field has been projected out of this document.' + }); + + expect(changes[1]).to.containSubset({ + operationType: 'update', + updateDescription: { + updatedFields: { d: 6 } + } + }); + + done(); + }); }); }); }); @@ -270,26 +319,29 @@ describe('Change Streams', function() { }); it( - 'Should create a Change Stream on a collection and get change events through imperative callback form', + 'should create a ChangeStream on a collection and get change events through imperative callback form', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, test: function(done) { - var configuration = this.configuration; + const configuration = this.configuration; const client = configuration.newClient(); - client.connect(function(err, client) { - assert.ifError(err); + client.connect((err, client) => { + expect(err).to.not.exist; + this.defer(() => client.close()); - var collection = client.db('integration_tests').collection('docsCallback'); - var changeStream = collection.watch(pipeline); + const collection = client.db('integration_tests').collection('docsCallback'); + const changeStream = collection.watch(pipeline); + this.defer(() => changeStream.close()); // Fetch the change notification - changeStream.hasNext(function(err, hasNext) { - assert.ifError(err); + changeStream.hasNext((err, hasNext) => { + expect(err).to.not.exist; + assert.equal(true, hasNext); - changeStream.next(function(err, change) { - assert.ifError(err); + changeStream.next((err, change) => { + expect(err).to.not.exist; assert.equal(change.operationType, 'insert'); assert.equal(change.fullDocument.e, 5); assert.equal(change.ns.db, 'integration_tests'); @@ -301,16 +353,16 @@ describe('Change Streams', function() { ); // Trigger the second database event - collection.update({ e: 5 }, { $inc: { e: 2 } }, function(err) { - assert.ifError(err); - changeStream.hasNext(function(err, hasNext) { - assert.ifError(err); + collection.updateOne({ e: 5 }, { $inc: { e: 2 } }, err => { + expect(err).to.not.exist; + changeStream.hasNext((err, hasNext) => { + expect(err).to.not.exist; assert.equal(true, hasNext); - changeStream.next(function(err, change) { - assert.ifError(err); + changeStream.next((err, change) => { + expect(err).to.not.exist; assert.equal(change.operationType, 'update'); - // Close the change stream - changeStream.close(err => client.close(cerr => done(err || cerr))); + + done(); }); }); }); @@ -320,60 +372,49 @@ describe('Change Streams', function() { // Trigger the first database event // NOTE: this needs to be triggered after the changeStream call so // that the cursor is run - collection.insert({ e: 5 }, function(err, result) { - assert.ifError(err); - assert.equal(result.insertedCount, 1); - }); + this.defer(collection.insertOne({ e: 5 })); }); } } ); - it('Should support creating multiple simultaneous Change Streams', { + it('should support creating multiple simultaneous ChangeStreams', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, test: function(done) { - var configuration = this.configuration; + const configuration = this.configuration; const client = configuration.newClient(); - client.connect(function(err, client) { - assert.ifError(err); + client.connect((err, client) => { + expect(err).to.not.exist; + this.defer(() => client.close()); - var theDatabase = client.db('integration_tests'); - var theCollection1 = theDatabase.collection('simultaneous1'); - var theCollection2 = theDatabase.collection('simultaneous2'); + const database = client.db('integration_tests'); + const collection1 = database.collection('simultaneous1'); + const collection2 = database.collection('simultaneous2'); - var thisChangeStream1, thisChangeStream2, thisChangeStream3; + const changeStream1 = collection1.watch([{ $addFields: { changeStreamNumber: 1 } }]); + this.defer(() => changeStream1.close()); + const changeStream2 = collection2.watch([{ $addFields: { changeStreamNumber: 2 } }]); + this.defer(() => changeStream2.close()); + const changeStream3 = collection2.watch([{ $addFields: { changeStreamNumber: 3 } }]); + this.defer(() => changeStream3.close()); setTimeout(() => { - theCollection1.insert({ a: 1 }).then(function() { - return theCollection2.insert({ a: 1 }); - }); - }); + this.defer(collection1.insert({ a: 1 }).then(() => collection2.insert({ a: 1 }))); + }, 50); Promise.resolve() - .then(function() { - thisChangeStream1 = theCollection1.watch([{ $addFields: { changeStreamNumber: 1 } }]); - thisChangeStream2 = theCollection2.watch([{ $addFields: { changeStreamNumber: 2 } }]); - thisChangeStream3 = theCollection2.watch([{ $addFields: { changeStreamNumber: 3 } }]); - - return Promise.all([ - thisChangeStream1.hasNext(), - thisChangeStream2.hasNext(), - thisChangeStream3.hasNext() - ]); - }) + .then(() => + Promise.all([changeStream1.hasNext(), changeStream2.hasNext(), changeStream3.hasNext()]) + ) .then(function(hasNexts) { // Check all the Change Streams have a next item assert.ok(hasNexts[0]); assert.ok(hasNexts[1]); assert.ok(hasNexts[2]); - return Promise.all([ - thisChangeStream1.next(), - thisChangeStream2.next(), - thisChangeStream3.next() - ]); + return Promise.all([changeStream1.next(), changeStream2.next(), changeStream3.next()]); }) .then(function(changes) { // Check the values of the change documents are correct @@ -396,285 +437,270 @@ describe('Change Streams', function() { assert.equal(changes[0].changeStreamNumber, 1); assert.equal(changes[1].changeStreamNumber, 2); assert.equal(changes[2].changeStreamNumber, 3); - - return Promise.all([ - thisChangeStream1.close(), - thisChangeStream2.close(), - thisChangeStream3.close() - ]); - }) - .then(() => client.close()) - .then(function() { - done(); }) - .catch(function(err) { - assert.ifError(err); - }); + .then( + () => done(), + err => done(err) + ); }); } }); - it('Should properly close Change Stream cursor', { + it('should properly close ChangeStream cursor', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, test: function(done) { - var configuration = this.configuration; + const configuration = this.configuration; const client = configuration.newClient(); - client.connect(function(err, client) { - assert.ifError(err); - var theDatabase = client.db('integration_tests'); + client.connect((err, client) => { + expect(err).to.not.exist; + this.defer(() => client.close()); - var thisChangeStream = theDatabase.collection('changeStreamCloseTest').watch(pipeline); + const database = client.db('integration_tests'); + const changeStream = database.collection('changeStreamCloseTest').watch(pipeline); + this.defer(() => changeStream.close()); - assert.equal(thisChangeStream.isClosed(), false); - assert.equal(thisChangeStream.cursor.isClosed(), false); + assert.equal(changeStream.isClosed(), false); + assert.equal(changeStream.cursor.isClosed(), false); - thisChangeStream.close(function(err) { - assert.ifError(err); + changeStream.close(err => { + expect(err).to.not.exist; // Check the cursor is closed - assert.equal(thisChangeStream.isClosed(), true); - assert.ok(!thisChangeStream.cursor); - client.close(() => done()); + assert.equal(changeStream.isClosed(), true); + assert.ok(!changeStream.cursor); + done(); }); }); } }); it( - 'Should error when attempting to create a Change Stream with a forbidden aggregation pipeline stage', + 'should error when attempting to create a ChangeStream with a forbidden aggregation pipeline stage', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, test: function(done) { - var configuration = this.configuration; + const configuration = this.configuration; const client = configuration.newClient(); - client.connect(function(err, client) { - assert.ifError(err); + client.connect((err, client) => { + expect(err).to.not.exist; + this.defer(() => client.close()); const forbiddenStage = {}; const forbiddenStageName = '$alksdjfhlaskdfjh'; forbiddenStage[forbiddenStageName] = 2; - var theDatabase = client.db('integration_tests'); - var changeStream = theDatabase.collection('forbiddenStageTest').watch([forbiddenStage]); + const database = client.db('integration_tests'); + const changeStream = database.collection('forbiddenStageTest').watch([forbiddenStage]); + this.defer(() => changeStream.close()); - changeStream.next(function(err) { + changeStream.next(err => { assert.ok(err); assert.ok(err.message); assert.ok( err.message.indexOf(`Unrecognized pipeline stage name: '${forbiddenStageName}'`) > -1 ); - changeStream.close(err => client.close(cerr => done(err || cerr))); + + done(); }); }); } } ); - it('Should cache the change stream resume token using imperative callback form', { + it('should cache the change stream resume token using imperative callback form', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, test: function(done) { - var configuration = this.configuration; + const configuration = this.configuration; const client = configuration.newClient(); - client.connect(function(err, client) { - assert.ifError(err); + client.connect((err, client) => { + expect(err).to.not.exist; + this.defer(() => client.close()); - var theDatabase = client.db('integration_tests'); - var thisChangeStream = theDatabase.collection('cacheResumeTokenCallback').watch(pipeline); + const database = client.db('integration_tests'); + const changeStream = database.collection('cacheResumeTokenCallback').watch(pipeline); + this.defer(() => changeStream.close()); // Trigger the first database event - waitForStarted(thisChangeStream, () => { - theDatabase - .collection('cacheResumeTokenCallback') - .insert({ b: 2 }, function(err, result) { - assert.ifError(err); - assert.equal(result.insertedCount, 1); - }); + waitForStarted(changeStream, () => { + this.defer(database.collection('cacheResumeTokenCallback').insert({ b: 2 })); }); + // Fetch the change notification - thisChangeStream.hasNext(function(err, hasNext) { - assert.ifError(err); + changeStream.hasNext(function(err, hasNext) { + expect(err).to.not.exist; assert.equal(true, hasNext); - thisChangeStream.next(function(err, change) { - assert.ifError(err); - assert.deepEqual(thisChangeStream.resumeToken, change._id); - - // Close the change stream - thisChangeStream.close(err => client.close(cerr => done(err || cerr))); + changeStream.next(function(err, change) { + expect(err).to.not.exist; + assert.deepEqual(changeStream.resumeToken, change._id); + done(); }); }); }); } }); - it('Should cache the change stream resume token using promises', { + it('should cache the change stream resume token using promises', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, - test: function() { - var configuration = this.configuration; + const configuration = this.configuration; const client = configuration.newClient(); - return client.connect().then(function() { - var theDatabase = client.db('integration_tests'); - var thisChangeStream = theDatabase.collection('cacheResumeTokenPromise').watch(pipeline); + return client.connect().then(() => { + this.defer(() => client.close()); - waitForStarted(thisChangeStream, () => { - // Trigger the first database event - theDatabase.collection('cacheResumeTokenPromise').insert({ b: 2 }, function(err, result) { - assert.ifError(err); - assert.equal(result.insertedCount, 1); - // Fetch the change notification - }); + const database = client.db('integration_tests'); + const changeStream = database.collection('cacheResumeTokenPromise').watch(pipeline); + this.defer(() => changeStream.close()); + + // trigger the first database event + waitForStarted(changeStream, () => { + this.defer(database.collection('cacheResumeTokenPromise').insert({ b: 2 })); }); - return thisChangeStream + return changeStream .hasNext() - .then(function(hasNext) { + .then(hasNext => { assert.equal(true, hasNext); - return thisChangeStream.next(); + return changeStream.next(); }) - .then(function(change) { - assert.deepEqual(thisChangeStream.resumeToken, change._id); - - // Close the change stream - return thisChangeStream.close().then(() => client.close()); + .then(change => { + assert.deepEqual(changeStream.resumeToken, change._id); }); }); } }); - it('Should cache the change stream resume token using event listeners', { + it('should cache the change stream resume token using event listeners', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, test: function(done) { - var configuration = this.configuration; + const configuration = this.configuration; const client = configuration.newClient(); - client.connect(function(err, client) { - assert.ifError(err); + client.connect((err, client) => { + expect(err).to.not.exist; + this.defer(() => client.close()); - var theDatabase = client.db('integration_tests'); + const db = client.db('integration_tests'); + const changeStream = db.collection('cacheResumeTokenListener').watch(pipeline); + this.defer(() => changeStream.close()); - var thisChangeStream = theDatabase.collection('cacheResumeTokenListener').watch(pipeline); + const collector = new EventCollector(changeStream, ['change']); + waitForStarted(changeStream, () => { + // Trigger the first database event + db.collection('cacheResumeTokenListener').insert({ b: 2 }, (err, result) => { + expect(err).to.not.exist; + expect(result) + .property('insertedCount') + .to.equal(1); - thisChangeStream.once('change', function(change) { - assert.deepEqual(thisChangeStream.resumeToken, change._id); - // Close the change stream - thisChangeStream.close().then(() => client.close(done)); - }); + collector.waitForEvent('change', (err, events) => { + expect(err).to.not.exist; + expect(changeStream) + .property('resumeToken') + .to.eql(events[0]._id); - waitForStarted(thisChangeStream, () => { - // Trigger the first database event - theDatabase - .collection('cacheResumeTokenListener') - .insert({ b: 2 }, function(err, result) { - assert.ifError(err); - assert.equal(result.insertedCount, 1); + done(); }); + }); }); }); } }); it( - 'Should error if resume token projected out of change stream document using imperative callback form', + 'should error if resume token projected out of change stream document using imperative callback form', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, test: function(done) { - var configuration = this.configuration; + const configuration = this.configuration; const client = configuration.newClient(); - client.connect(function(err, client) { - assert.ifError(err); + client.connect((err, client) => { + expect(err).to.not.exist; + this.defer(() => client.close()); - var theDatabase = client.db('integration_tests'); - var thisChangeStream = theDatabase + const database = client.db('integration_tests'); + const changeStream = database .collection('resumetokenProjectedOutCallback') .watch([{ $project: { _id: false } }]); + this.defer(() => changeStream.close()); // Trigger the first database event - waitForStarted(thisChangeStream, () => { - theDatabase - .collection('resumetokenProjectedOutCallback') - .insert({ b: 2 }, function(err, result) { - expect(err).to.not.exist; - expect(result.insertedCount).to.equal(1); - }); + waitForStarted(changeStream, () => { + this.defer(database.collection('resumetokenProjectedOutCallback').insert({ b: 2 })); }); // Fetch the change notification - thisChangeStream.next(function(err) { + changeStream.next(err => { expect(err).to.exist; - - // Close the change stream - thisChangeStream.close(() => client.close(done)); + done(); }); }); } } ); - it('Should error if resume token projected out of change stream document using event listeners', { + it('should error if resume token projected out of change stream document using event listeners', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, test: function(done) { - var configuration = this.configuration; + const configuration = this.configuration; const client = configuration.newClient(); - client.connect(function(err, client) { - assert.ifError(err); - - var theDatabase = client.db('integration_tests'); - var thisChangeStream = theDatabase - .collection('resumetokenProjectedOutListener') - .watch([{ $project: { _id: false } }]); + client.connect((err, client) => { + expect(err).to.not.exist; + this.defer(() => client.close()); - // Fetch the change notification - thisChangeStream.on('change', function() { - assert.ok(false); - }); + const db = client.db('integration_tests'); + const collection = db.collection('resumetokenProjectedOutListener'); + const changeStream = collection.watch([{ $project: { _id: false } }]); + this.defer(() => changeStream.close()); - thisChangeStream.on('error', function(err) { - expect(err).to.exist; - thisChangeStream.close(() => client.close(done)); - }); + const collector = new EventCollector(changeStream, ['change', 'error']); + waitForStarted(changeStream, () => { + collection.insert({ b: 2 }, (err, result) => { + expect(err).to.not.exist; + expect(result) + .property('insertedCount') + .to.equal(1); - // Trigger the first database event - waitForStarted(thisChangeStream, () => { - theDatabase - .collection('resumetokenProjectedOutListener') - .insert({ b: 2 }, function(err, result) { - assert.ifError(err); - assert.equal(result.insertedCount, 1); + collector.waitForEvent('error', (err, events) => { + expect(err).to.not.exist; + expect(events).to.have.lengthOf.at.least(1); + done(); }); + }); }); }); } }); - it('Should invalidate change stream on collection rename using event listeners', { + it('should invalidate change stream on collection rename using event listeners', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, - test: function(done) { - var configuration = this.configuration; + const configuration = this.configuration; const client = configuration.newClient(); - client.connect(function(err, client) { - assert.ifError(err); + client.connect((err, client) => { + expect(err).to.not.exist; + this.defer(() => client.close()); - var database = client.db('integration_tests'); - var changeStream = database + const database = client.db('integration_tests'); + const changeStream = database .collection('invalidateListeners') .watch(pipeline, { batchSize: 1 }); + this.defer(() => changeStream.close()); // Attach first event listener - changeStream.once('change', function(change) { + changeStream.once('change', change => { assert.equal(change.operationType, 'insert'); assert.equal(change.fullDocument.a, 1); assert.equal(change.ns.db, 'integration_tests'); @@ -686,71 +712,68 @@ describe('Change Streams', function() { ); // Attach second event listener - changeStream.on('change', function(change) { + changeStream.on('change', change => { if (change.operationType === 'invalidate') { // now expect the server to close the stream - changeStream.once('close', () => client.close(done)); + changeStream.once('close', () => done()); } }); // Trigger the second database event setTimeout(() => { - database - .collection('invalidateListeners') - .rename('renamedDocs', { dropTarget: true }, function(err) { - assert.ifError(err); - }); + this.defer( + database.collection('invalidateListeners').rename('renamedDocs', { dropTarget: true }) + ); }, 250); }); // Trigger the first database event waitForStarted(changeStream, () => { - database.collection('invalidateListeners').insert({ a: 1 }, function(err) { - assert.ifError(err); - }); + this.defer(database.collection('invalidateListeners').insert({ a: 1 })); }); }); } }); - it('Should invalidate change stream on database drop using imperative callback form', { + it('should invalidate change stream on database drop using imperative callback form', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, test: function(done) { - var configuration = this.configuration; + const configuration = this.configuration; const client = configuration.newClient(); - client.connect(function(err, client) { - assert.ifError(err); + client.connect((err, client) => { + expect(err).to.not.exist; + this.defer(() => client.close()); - var database = client.db('integration_tests'); - var changeStream = database.collection('invalidateCallback').watch(pipeline); + const database = client.db('integration_tests'); + const changeStream = database.collection('invalidateCallback').watch(pipeline); + this.defer(() => changeStream.close()); // Trigger the first database event waitForStarted(changeStream, () => { - database.collection('invalidateCallback').insert({ a: 1 }, function(err) { - assert.ifError(err); - }); + this.defer(database.collection('invalidateCallback').insert({ a: 1 })); }); - return changeStream.next(function(err, change) { - assert.ifError(err); + + changeStream.next((err, change) => { + expect(err).to.not.exist; assert.equal(change.operationType, 'insert'); - database.dropDatabase(function(err) { - assert.ifError(err); + database.dropDatabase(err => { + expect(err).to.not.exist; function completeStream() { changeStream.hasNext(function(err, hasNext) { expect(err).to.not.exist; assert.equal(hasNext, false); assert.equal(changeStream.isClosed(), true); - client.close(done); + done(); }); } function checkInvalidate() { changeStream.next(function(err, change) { - assert.ifError(err); + expect(err).to.not.exist; // Check the cursor invalidation has occured if (change.operationType === 'invalidate') { @@ -768,11 +791,11 @@ describe('Change Streams', function() { } }); - it('Should invalidate change stream on collection drop using promises', { + it('should invalidate change stream on collection drop using promises', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, test: function(done) { - var configuration = this.configuration; + const configuration = this.configuration; const client = configuration.newClient(); function checkInvalidate(changeStream) { @@ -785,19 +808,19 @@ describe('Change Streams', function() { }); } - client.connect(function(err, client) { - assert.ifError(err); - var database = client.db('integration_tests'); - var changeStream = database.collection('invalidateCollectionDropPromises').watch(pipeline); + client.connect((err, client) => { + expect(err).to.not.exist; + this.defer(() => client.close()); + + const database = client.db('integration_tests'); + const changeStream = database + .collection('invalidateCollectionDropPromises') + .watch(pipeline); + this.defer(() => changeStream.close()); // Trigger the first database event waitForStarted(changeStream, () => { - return database - .collection('invalidateCollectionDropPromises') - .insert({ a: 1 }) - .then(function() { - return delay(200); - }); + this.defer(database.collection('invalidateCollectionDropPromises').insert({ a: 1 })); }); return changeStream @@ -811,16 +834,13 @@ describe('Change Streams', function() { .then(function(hasNext) { assert.equal(hasNext, false); assert.equal(changeStream.isClosed(), true); - client.close(done); - }) - .catch(function(err) { - assert.ifError(err); + done(); }); }); } }); - it('Should return MongoNetworkError after first retry attempt fails using promises', { + it.skip('should return MongoNetworkError after first retry attempt fails using promises', { metadata: { requires: { generators: true, @@ -830,13 +850,13 @@ describe('Change Streams', function() { }, test: function(done) { - var configuration = this.configuration; + const configuration = this.configuration; // Contain mock server - var primaryServer = null; + let primaryServer = null; // Default message fields - var defaultFields = { + const defaultFields = { setName: 'rs', setVersion: 1, electionId: new ObjectId(0), @@ -854,7 +874,7 @@ describe('Change Streams', function() { primaryServer = yield mock.createServer(32000, 'localhost'); primaryServer.setMessageHandler(request => { - var doc = request.document; + const doc = request.document; if (doc.ismaster) { request.reply( @@ -879,22 +899,22 @@ describe('Change Streams', function() { const mockServerURL = 'mongodb://localhost:32000/'; const client = configuration.newClient(mockServerURL); - client.connect(function(err, client) { - assert.ifError(err); + client.connect((err, client) => { + expect(err).to.not.exist; - var database = client.db('integration_tests'); - var collection = database.collection('MongoNetworkErrorTestPromises'); - var changeStream = collection.watch(pipeline); + const database = client.db('integration_tests'); + const collection = database.collection('MongoNetworkErrorTestPromises'); + const changeStream = collection.watch(pipeline); return changeStream .next() .then(function() { - // We should never execute this line because calling thisChangeStream.next() should throw an error + // We should never execute this line because calling changeStream.next() should throw an error throw new Error( 'ChangeStream.next() returned a change document but it should have returned a MongoNetworkError' ); }) - .catch(function(err) { + .catch(err => { assert.ok( err instanceof MongoNetworkError, 'error was not instance of MongoNetworkError' @@ -902,8 +922,8 @@ describe('Change Streams', function() { assert.ok(err.message); assert.ok(err.message.indexOf('closed') > -1); - changeStream.close(function(err) { - assert.ifError(err); + changeStream.close(err => { + expect(err).to.not.exist; changeStream.close(); // running = false; @@ -917,7 +937,7 @@ describe('Change Streams', function() { } }); - it('Should return MongoNetworkError after first retry attempt fails using callbacks', { + it.skip('should return MongoNetworkError after first retry attempt fails using callbacks', { metadata: { requires: { generators: true, @@ -926,13 +946,13 @@ describe('Change Streams', function() { } }, test: function(done) { - var configuration = this.configuration; + const configuration = this.configuration; // Contain mock server - var primaryServer = null; + let primaryServer = null; // Default message fields - var defaultFields = { + const defaultFields = { setName: 'rs', setVersion: 1, electionId: new ObjectId(0), @@ -947,13 +967,13 @@ describe('Change Streams', function() { }; // Die - var die = false; + let die = false; co(function*() { primaryServer = yield mock.createServer(32000, 'localhost'); primaryServer.setMessageHandler(request => { - var doc = request.document; + const doc = request.document; if (die) { request.connection.destroy(); @@ -982,14 +1002,14 @@ describe('Change Streams', function() { validateOptions: true }); - client.connect(function(err, client) { - assert.ifError(err); + client.connect((err, client) => { + expect(err).to.not.exist; - var theDatabase = client.db('integration_tests'); - var theCollection = theDatabase.collection('MongoNetworkErrorTestPromises'); - var thisChangeStream = theCollection.watch(pipeline); + const database = client.db('integration_tests'); + const collection = database.collection('MongoNetworkErrorTestPromises'); + const changeStream = collection.watch(pipeline); - thisChangeStream.next(function(err, change) { + changeStream.next(function(err, change) { assert.ok(err instanceof MongoNetworkError); assert.ok(err.message); assert.ok(err.message.indexOf('timed out') > -1); @@ -1000,9 +1020,9 @@ describe('Change Streams', function() { 'ChangeStream.next() returned a change document but it should have returned a MongoNetworkError' ); - thisChangeStream.close(function(err) { - assert.ifError(err); - thisChangeStream.close(); + changeStream.close(err => { + expect(err).to.not.exist; + changeStream.close(); client.close(() => mock.cleanup(() => done())); }); @@ -1011,7 +1031,7 @@ describe('Change Streams', function() { } }); - it('Should resume Change Stream when a resumable error is encountered', { + it.skip('should resume Change Stream when a resumable error is encountered', { metadata: { requires: { generators: true, @@ -1020,13 +1040,13 @@ describe('Change Streams', function() { } }, test: function(done) { - var configuration = this.configuration; + const configuration = this.configuration; // Contain mock server - var primaryServer = null; + let primaryServer = null; // Default message fields - var defaultFields = { + const defaultFields = { setName: 'rs', setVersion: 1, electionId: new ObjectId(0), @@ -1041,15 +1061,15 @@ describe('Change Streams', function() { }; // Die - var callsToGetMore = 0; + let callsToGetMore = 0; // Boot the mock co(function*() { primaryServer = yield mock.createServer(32000, 'localhost'); - var counter = 0; + let counter = 0; primaryServer.setMessageHandler(request => { - var doc = request.document; + const doc = request.document; // Create a server that responds to the initial aggregation to connect to the server, but not to subsequent getMore requests if (doc.ismaster) { @@ -1108,9 +1128,9 @@ describe('Change Streams', function() { client .connect() .then(client => { - var database = client.db('integration_tests'); - var collection = database.collection('MongoNetworkErrorTestPromises'); - var changeStream = collection.watch(pipeline); + const database = client.db('integration_tests'); + const collection = database.collection('MongoNetworkErrorTestPromises'); + const changeStream = collection.watch(pipeline); return changeStream .next() @@ -1147,48 +1167,44 @@ describe('Change Streams', function() { } }); - it('Should resume from point in time using user-provided resumeAfter', { + it('should resume from point in time using user-provided resumeAfter', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, test: function() { - var configuration = this.configuration; + const configuration = this.configuration; const client = configuration.newClient(); return client.connect().then(client => { - var database = client.db('integration_tests'); - var collection = database.collection('resumeAfterTest2'); + this.defer(() => client.close()); - var firstChangeStream, secondChangeStream; + const database = client.db('integration_tests'); + const collection = database.collection('resumeAfterTest2'); - var resumeToken; - var docs = [{ a: 0 }, { a: 1 }, { a: 2 }]; + let firstChangeStream, secondChangeStream; - // Trigger the first database event + let resumeToken; + const docs = [{ a: 0 }, { a: 1 }, { a: 2 }]; firstChangeStream = collection.watch(pipeline); + this.defer(() => firstChangeStream.close()); + + // Trigger the first database event waitForStarted(firstChangeStream, () => { - return collection - .insert(docs[0]) - .then(function(result) { - assert.equal(result.insertedCount, 1); - return collection.insert(docs[1]); - }) - .then(function(result) { - assert.equal(result.insertedCount, 1); - return collection.insert(docs[2]); - }) - .then(function(result) { - assert.equal(result.insertedCount, 1); - return delay(200); - }); + this.defer( + collection + .insert(docs[0]) + .then(() => collection.insertOne(docs[1])) + .then(() => collection.insertOne(docs[2])) + ); }); + return firstChangeStream .hasNext() - .then(function(hasNext) { + .then(hasNext => { assert.equal(true, hasNext); return firstChangeStream.next(); }) - .then(function(change) { + .then(change => { assert.equal(change.operationType, 'insert'); assert.equal(change.fullDocument.a, docs[0].a); @@ -1196,65 +1212,66 @@ describe('Change Streams', function() { resumeToken = change._id; return firstChangeStream.next(); }) - .then(function(change) { + .then(change => { assert.equal(change.operationType, 'insert'); assert.equal(change.fullDocument.a, docs[1].a); return firstChangeStream.next(); }) - .then(function(change) { + .then(change => { assert.equal(change.operationType, 'insert'); assert.equal(change.fullDocument.a, docs[2].a); return firstChangeStream.close(); }) - .then(function() { + .then(() => { secondChangeStream = collection.watch(pipeline, { resumeAfter: resumeToken }); + this.defer(() => secondChangeStream.close()); + return delay(200); }) - .then(function() { - return secondChangeStream.hasNext(); - }) - .then(function(hasNext) { + .then(() => secondChangeStream.hasNext()) + .then(hasNext => { assert.equal(true, hasNext); return secondChangeStream.next(); }) - .then(function(change) { + .then(change => { assert.equal(change.operationType, 'insert'); assert.equal(change.fullDocument.a, docs[1].a); return secondChangeStream.next(); }) - .then(function(change) { + .then(change => { assert.equal(change.operationType, 'insert'); assert.equal(change.fullDocument.a, docs[2].a); return secondChangeStream.close(); - }) - .then(() => client.close()); + }); }); } }); - it('Should support full document lookup', { + it('should support full document lookup', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, test: function() { - var configuration = this.configuration; + const configuration = this.configuration; const client = configuration.newClient(); return client.connect().then(client => { - var database = client.db('integration_tests'); - var collection = database.collection('fullDocumentLookup'); - var changeStream = collection.watch(pipeline, { + this.defer(() => client.close()); + + const database = client.db('integration_tests'); + const collection = database.collection('fullDocumentLookup'); + const changeStream = collection.watch(pipeline, { fullDocument: 'updateLookup' }); + this.defer(() => changeStream.close()); waitForStarted(changeStream, () => { - return collection.insert({ f: 128 }).then(function(result) { - assert.equal(result.insertedCount, 1); - }); + this.defer(collection.insert({ f: 128 })); }); + return changeStream .hasNext() .then(function(hasNext) { @@ -1283,40 +1300,33 @@ describe('Change Streams', function() { assert.ok(change.fullDocument); assert.equal(change.fullDocument.f, 128); assert.equal(change.fullDocument.c, 2); - - return changeStream.close().then(() => client.close()); }); }); } }); - it('Should support full document lookup with deleted documents', { + it('should support full document lookup with deleted documents', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, test: function() { - var configuration = this.configuration; + const configuration = this.configuration; const client = configuration.newClient(); return client.connect().then(client => { - var database = client.db('integration_tests'); - var collection = database.collection('fullLookupTest'); - var changeStream = collection.watch(pipeline, { + this.defer(() => client.close()); + + const database = client.db('integration_tests'); + const collection = database.collection('fullLookupTest'); + const changeStream = collection.watch(pipeline, { fullDocument: 'updateLookup' }); + this.defer(() => changeStream.close()); // Trigger the first database event waitForStarted(changeStream, () => { - return collection - .insert({ i: 128 }) - .then(function(result) { - assert.equal(result.insertedCount, 1); - - return collection.deleteOne({ i: 128 }); - }) - .then(function(result) { - assert.equal(result.result.n, 1); - }); + this.defer(collection.insert({ i: 128 }).then(() => collection.deleteOne({ i: 128 }))); }); + return changeStream .hasNext() .then(function(hasNext) { @@ -1337,73 +1347,66 @@ describe('Change Streams', function() { // Trigger the second database event return collection.update({ i: 128 }, { $set: { c: 2 } }); }) - .then(function() { - return changeStream.hasNext(); - }) + .then(() => changeStream.hasNext()) .then(function(hasNext) { assert.equal(true, hasNext); return changeStream.next(); }) .then(function(change) { assert.equal(change.operationType, 'delete'); - - // Check the full lookedUpDocument is present assert.equal(change.lookedUpDocument, null); - - return changeStream.close(); - }) - .then(() => client.close()); + }); }); } }); - it('Should create Change Streams with correct read preferences', { + it('should create Change Streams with correct read preferences', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, test: function() { - var configuration = this.configuration; + const configuration = this.configuration; const client = configuration.newClient(); return client.connect().then(client => { - // Should get preference from database - var database = client.db('integration_tests', { + this.defer(() => client.close()); + + // should get preference from database + const database = client.db('integration_tests', { readPreference: ReadPreference.PRIMARY_PREFERRED }); - var changeStream0 = database.collection('docs0').watch(pipeline); + const changeStream0 = database.collection('docs0').watch(pipeline); + this.defer(() => changeStream0.close()); + assert.deepEqual( changeStream0.cursor.readPreference.preference, ReadPreference.PRIMARY_PREFERRED ); - // Should get preference from collection - var collection = database.collection('docs1', { + // should get preference from collection + const collection = database.collection('docs1', { readPreference: ReadPreference.SECONDARY_PREFERRED }); - var changeStream1 = collection.watch(pipeline); + const changeStream1 = collection.watch(pipeline); assert.deepEqual( changeStream1.cursor.readPreference.preference, ReadPreference.SECONDARY_PREFERRED ); + this.defer(() => changeStream1.close()); - // Should get preference from Change Stream options - var changeStream2 = collection.watch(pipeline, { + // should get preference from Change Stream options + const changeStream2 = collection.watch(pipeline, { readPreference: ReadPreference.NEAREST }); + this.defer(() => changeStream2.close()); assert.deepEqual(changeStream2.cursor.readPreference.preference, ReadPreference.NEAREST); - - return Promise.all([ - changeStream0.close(), - changeStream1.close(), - changeStream2.close() - ]).then(() => client.close()); }); } }); - it('Should support piping of Change Streams', { + it('should support piping of Change Streams', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, test: function(done) { @@ -1411,44 +1414,40 @@ describe('Change Streams', function() { const stream = require('stream'); const client = configuration.newClient(); - client.connect(function(err, client) { - assert.ifError(err); + client.connect((err, client) => { + expect(err).to.not.exist; + this.defer(() => client.close()); - const theDatabase = client.db('integration_tests'); - const theCollection = theDatabase.collection('pipeTest'); - const thisChangeStream = theCollection.watch(pipeline); + const database = client.db('integration_tests'); + const collection = database.collection('pipeTest'); + const changeStream = collection.watch(pipeline); + this.defer(() => changeStream.close()); const outStream = new stream.PassThrough({ objectMode: true }); // Make a stream transforming to JSON and piping to the file - thisChangeStream.stream({ transform: JSON.stringify }).pipe(outStream); - - function close(_err) { - thisChangeStream.close(err => client.close(cErr => done(_err || err || cErr))); - } + changeStream.stream({ transform: JSON.stringify }).pipe(outStream); outStream .on('data', data => { try { const parsedEvent = JSON.parse(data); assert.equal(parsedEvent.fullDocument.a, 1); - close(); + done(); } catch (e) { - close(e); + done(e); } }) - .on('error', close); + .on('error', done); - waitForStarted(thisChangeStream, () => { - theCollection.insert({ a: 1 }, function(err) { - assert.ifError(err); - }); + waitForStarted(changeStream, () => { + this.defer(collection.insert({ a: 1 })); }); }); } }); - it.skip('Should resume piping of Change Streams when a resumable error is encountered', { + it.skip('should resume piping of Change Streams when a resumable error is encountered', { metadata: { requires: { generators: true, @@ -1457,13 +1456,13 @@ describe('Change Streams', function() { } }, test: function(done) { - var configuration = this.configuration; + const configuration = this.configuration; // Contain mock server - var primaryServer = null; + let primaryServer = null; // Default message fields - var defaultFields = { + const defaultFields = { setName: 'rs', setVersion: 1, electionId: new ObjectId(0), @@ -1480,9 +1479,9 @@ describe('Change Streams', function() { co(function*() { primaryServer = yield mock.createServer(); - var counter = 0; + let counter = 0; primaryServer.setMessageHandler(request => { - var doc = request.document; + const doc = request.document; // Create a server that responds to the initial aggregation to connect to the server, but not to subsequent getMore requests if (doc.ismaster) { @@ -1563,32 +1562,30 @@ describe('Change Streams', function() { validateOptions: true }); - client.connect(function(err, client) { - assert.ifError(err); + client.connect((err, client) => { + expect(err).to.not.exist; - var fs = require('fs'); - var theDatabase = client.db('integration_tests5'); - var theCollection = theDatabase.collection('MongoNetworkErrorTestPromises'); - var thisChangeStream = theCollection.watch(pipeline); + const database = client.db('integration_tests5'); + const collection = database.collection('MongoNetworkErrorTestPromises'); + const changeStream = collection.watch(pipeline); - var filename = '/tmp/_nodemongodbnative_resumepipe.txt'; - var outStream = fs.createWriteStream(filename); + const filename = '/tmp/_nodemongodbnative_resumepipe.txt'; + const outStream = fs.createWriteStream(filename); - thisChangeStream.stream({ transform: JSON.stringify }).pipe(outStream); + changeStream.stream({ transform: JSON.stringify }).pipe(outStream); // Listen for changes to the file - var watcher = fs.watch(filename, function(eventType) { + const watcher = fs.watch(filename, function(eventType) { assert.equal(eventType, 'change'); - var fileContents = fs.readFileSync(filename, 'utf8'); - - var parsedFileContents = JSON.parse(fileContents); + const fileContents = fs.readFileSync(filename, 'utf8'); + const parsedFileContents = JSON.parse(fileContents); assert.equal(parsedFileContents.fullDocument.a, 1); watcher.close(); - thisChangeStream.close(function(err) { - assert.ifError(err); + changeStream.close(err => { + expect(err).to.not.exist; mock.cleanup(() => done()); }); @@ -1598,62 +1595,59 @@ describe('Change Streams', function() { } }); - it('Should support piping of Change Streams through multiple pipes', { + it('should support piping of Change Streams through multiple pipes', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, - test: function(done) { - var configuration = this.configuration; - var crypto = require('crypto'); + const configuration = this.configuration; const client = configuration.newClient(configuration.url(), { poolSize: 1, autoReconnect: false }); - client.connect(function(err, client) { - assert.ifError(err); + client.connect((err, client) => { + expect(err).to.not.exist; + this.defer(() => client.close()); - var cipher = crypto.createCipher('aes192', 'a password'); - var decipher = crypto.createDecipher('aes192', 'a password'); + const cipher = crypto.createCipher('aes192', 'a password'); + const decipher = crypto.createDecipher('aes192', 'a password'); - var theDatabase = client.db('integration_tests'); - var theCollection = theDatabase.collection('multiPipeTest'); - var thisChangeStream = theCollection.watch(pipeline); + const database = client.db('integration_tests'); + const collection = database.collection('multiPipeTest'); + const changeStream = collection.watch(pipeline); + this.defer(() => changeStream.close()); // Make a stream transforming to JSON and piping to the file - var basicStream = thisChangeStream.pipe( + const basicStream = changeStream.pipe( new Transform({ transform: (data, encoding, callback) => callback(null, JSON.stringify(data)), objectMode: true }) ); - var pipedStream = basicStream.pipe(cipher).pipe(decipher); + const pipedStream = basicStream.pipe(cipher).pipe(decipher); - var dataEmitted = ''; + let dataEmitted = ''; pipedStream.on('data', function(data) { dataEmitted += data.toString(); // Work around poor compatibility with crypto cipher - thisChangeStream.cursor.emit('end'); + changeStream.cursor.emit('end'); }); pipedStream.on('end', function() { - var parsedData = JSON.parse(dataEmitted.toString()); + const parsedData = JSON.parse(dataEmitted.toString()); assert.equal(parsedData.operationType, 'insert'); assert.equal(parsedData.fullDocument.a, 1407); basicStream.emit('close'); - - thisChangeStream.close(err => client.close(cErr => done(err || cErr))); + done(); }); - pipedStream.on('error', function(err) { + pipedStream.on('error', err => { done(err); }); - waitForStarted(thisChangeStream, () => { - theCollection.insert({ a: 1407 }, function(err) { - if (err) done(err); - }); + waitForStarted(changeStream, () => { + this.defer(collection.insert({ a: 1407 })); }); }); } @@ -1661,20 +1655,11 @@ describe('Change Streams', function() { it('should maintain change stream options on resume', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, - test: function(done) { + test: function() { const configuration = this.configuration; const client = configuration.newClient(); const collectionName = 'resumeAfterKillCursor'; - - let db; - let coll; - let changeStream; - - function close(e) { - changeStream.close(() => client.close(() => done(e))); - } - const changeStreamOptions = { fullDocument: 'updateLookup', collation: { maxVariable: 'punct' }, @@ -1682,24 +1667,22 @@ describe('Change Streams', function() { batchSize: 200 }; - client - .connect() - .then(() => (db = client.db('integration_tests'))) - .then(() => (coll = db.collection(collectionName))) - .then(() => (changeStream = coll.watch([], changeStreamOptions))) - .then(() => { - expect(changeStream.cursor.resumeOptions).to.containSubset(changeStreamOptions); - }) - .then( - () => close(), - e => close(e) - ); + return client.connect().then(() => { + this.defer(() => client.close()); + + const db = client.db('integration_tests'); + const coll = db.collection(collectionName); + const changeStream = coll.watch([], changeStreamOptions); + this.defer(() => changeStream.close()); + + expect(changeStream.cursor.resumeOptions).to.containSubset(changeStreamOptions); + }); } }); // 9. $changeStream stage for ChangeStream against a server >=4.0 and <4.0.7 that has not received // any results yet MUST include a startAtOperationTime option when resuming a change stream. - it('Should include a startAtOperationTime field when resuming if no changes have been received', { + it('should include a startAtOperationTime field when resuming if no changes have been received', { metadata: { requires: { topology: 'replicaset', mongodb: '>=4.0 <4.0.7' } }, test: function(done) { const configuration = this.configuration; @@ -1856,105 +1839,6 @@ describe('Change Streams', function() { } }); - it('should not resume when error includes error label NonRetryableChangeStreamError', function() { - let server; - let client; - let changeStream; - - function teardown(e) { - return Promise.resolve() - .then(() => changeStream && changeStream.close()) - .catch(() => {}) - .then(() => client && client.close()) - .catch(() => {}) - .then(() => e && Promise.reject(e)); - } - - const db = 'foobar'; - const coll = 'foobar'; - const ns = `${db}.${coll}`; - - let aggregateCount = 0; - let getMoreCount = 0; - - function messageHandler(request) { - const doc = request.document; - - if (doc.ismaster) { - request.reply( - Object.assign({}, mock.DEFAULT_ISMASTER_36, { - ismaster: true, - secondary: false, - me: server.uri(), - primary: server.uri() - }) - ); - } else if (doc.aggregate) { - aggregateCount += 1; - request.reply({ - ok: 1, - cursor: { - firstBatch: [], - id: 1, - ns - } - }); - } else if (doc.getMore) { - if (getMoreCount === 0) { - getMoreCount += 1; - request.reply({ - ok: 0, - errorLabels: ['NonRetryableChangeStreamError'] - }); - } else { - getMoreCount += 1; - request.reply({ - ok: 1, - cursor: { - nextBatch: [ - { - _id: {}, - operationType: 'insert', - ns: { db, coll }, - fullDocument: { a: 1 } - } - ], - id: 1, - ns - } - }); - } - } else { - request.reply({ ok: 1 }); - } - } - - return mock - .createServer() - .then(_server => (server = _server)) - .then(() => server.setMessageHandler(messageHandler)) - .then(() => (client = this.configuration.newClient(`mongodb://${server.uri()}`))) - .then(() => client.connect()) - .then( - () => - (changeStream = client - .db(db) - .collection(coll) - .watch()) - ) - .then(() => changeStream.next()) - .then( - () => Promise.reject('Expected changeStream to not resume'), - err => { - expect(err).to.be.an.instanceOf(MongoError); - expect(err.hasErrorLabel('NonRetryableChangeStreamError')).to.be.true; - expect(aggregateCount).to.equal(1); - expect(getMoreCount).to.equal(1); - } - ) - .then(() => teardown(), teardown); - }); - it('should emit close event after error event', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, test: function(done) { @@ -1962,8 +1846,9 @@ describe('Change Streams', function() { const client = configuration.newClient(); const closeSpy = sinon.spy(); - client.connect(function(err, client) { + client.connect((err, client) => { expect(err).to.not.exist; + this.defer(() => client.close()); const db = client.db('integration_tests'); const coll = db.collection('event_test'); @@ -1971,27 +1856,23 @@ describe('Change Streams', function() { // This will cause an error because the _id will be projected out, which causes the following error: // "A change stream document has been received that lacks a resume token (_id)." const changeStream = coll.watch([{ $project: { _id: false } }]); - + changeStream.on('close', closeSpy); changeStream.on('change', changeDoc => { expect(changeDoc).to.be.null; }); - changeStream.on('close', closeSpy); - changeStream.on('error', err => { expect(err).to.exist; + changeStream.close(() => { - expect(closeSpy.calledOnce).to.be.true; - client.close(done); + expect(closeSpy).property('calledOnce').to.be.true; + done(); }); }); // Trigger the first database event waitForStarted(changeStream, () => { - coll.insertOne({ a: 1 }, (err, result) => { - expect(err).to.not.exist; - expect(result.insertedCount).to.equal(1); - }); + this.defer(coll.insertOne({ a: 1 })); }); }); } @@ -2041,14 +1922,15 @@ describe('Change Streams', function() { it('when invoked with promises', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, test: function() { + const test = this; + function read() { return Promise.resolve() .then(() => changeStream.next()) .then(() => changeStream.next()) .then(() => { - lastWrite(); + test.defer(lastWrite()); const nextP = changeStream.next(); - return changeStream.close().then(() => nextP); }); } @@ -2065,19 +1947,20 @@ describe('Change Streams', function() { test: function(done) { changeStream.next(() => { changeStream.next(() => { - lastWrite(); + this.defer(lastWrite()); + changeStream.next(err => { - let _err = null; try { expect(err) .property('message') .to.equal('ChangeStream is closed'); + done(); } catch (e) { - _err = e; - } finally { - done(_err); + done(e); } }); + + // explicitly close the change stream after the write has begun changeStream.close(); }); }); @@ -2105,7 +1988,7 @@ describe('Change Streams', function() { changeStream.close(); setTimeout(() => close()); } else if (counter >= 3) { - close(new Error('Should not have received more than 2 events')); + close(new Error('should not have received more than 2 events')); } }); changeStream.on('error', err => close(err)); @@ -2727,6 +2610,8 @@ describe('Change Streams', function() { metadata: { requires: { topology: 'replicaset', mongodb: '>=4.1.1' } }, test: function(done) { const changeStream = coll.watch([], { startAfter }); + this.defer(() => changeStream.close()); + coll.insertOne({ x: 2 }, { w: 'majority', j: true }, err => { expect(err).to.not.exist; changeStream.once('change', change => { @@ -2734,7 +2619,8 @@ describe('Change Streams', function() { operationType: 'insert', fullDocument: { x: 2 } }); - changeStream.close(done); + + done(); }); }); } @@ -2744,6 +2630,8 @@ describe('Change Streams', function() { metadata: { requires: { topology: 'replicaset', mongodb: '>=4.1.1' } }, test: function(done) { const changeStream = coll.watch([], { startAfter }); + this.defer(() => changeStream.close()); + coll.insertOne({ x: 2 }, { w: 'majority', j: true }, err => { expect(err).to.not.exist; exhaust(changeStream, (err, bag) => { @@ -2753,7 +2641,8 @@ describe('Change Streams', function() { operationType: 'insert', fullDocument: { x: 2 } }); - changeStream.close(done); + + done(); }); }); } @@ -2770,6 +2659,8 @@ describe('Change Streams', function() { const events = []; client.on('commandStarted', e => recordEvent(events, e)); const changeStream = coll.watch([], { startAfter }); + this.defer(() => changeStream.close()); + changeStream.once('change', change => { expect(change).to.containSubset({ operationType: 'insert', @@ -2781,14 +2672,12 @@ describe('Change Streams', function() { expect(events[0]).nested.property('$changeStream.startAfter').to.exist; expect(events[1]).to.equal('error'); expect(events[2]).nested.property('$changeStream.startAfter').to.exist; - changeStream.close(done); + done(); }); waitForStarted(changeStream, () => { triggerResumableError(changeStream, () => events.push('error')); - coll.insertOne({ x: 2 }, { w: 'majority', j: true }, err => { - expect(err).to.not.exist; - }); + this.defer(coll.insertOne({ x: 2 }, { w: 'majority', j: true })); }); } }); @@ -2804,6 +2693,7 @@ describe('Change Streams', function() { let events = []; client.on('commandStarted', e => recordEvent(events, e)); const changeStream = coll.watch([], { startAfter }); + this.defer(() => changeStream.close()); changeStream.on('change', change => { events.push({ change: { insert: { x: change.fullDocument.x } } }); @@ -2820,17 +2710,17 @@ describe('Change Streams', function() { expect(events[0]).to.equal('error'); expect(events[1]).nested.property('$changeStream.resumeAfter').to.exist; expect(events[2]).to.eql({ change: { insert: { x: 3 } } }); - changeStream.close(done); + done(); break; } }); + waitForStarted(changeStream, () => - coll.insertOne({ x: 2 }, { w: 'majority', j: true }, err => { - expect(err).to.not.exist; - coll.insertOne({ x: 3 }, { w: 'majority', j: true }, err => { - expect(err).to.not.exist; - }); - }) + this.defer( + coll + .insertOne({ x: 2 }, { w: 'majority', j: true }) + .then(() => coll.insertOne({ x: 3 }, { w: 'majority', j: true })) + ) ); } }); @@ -2857,6 +2747,7 @@ describe('Change Stream Resume Error Tests', function() { done(); } }); + waitForStarted(changeStream, () => { collection.insertOne({ a: 42 }, err => { expect(err).to.not.exist; @@ -2893,6 +2784,7 @@ describe('Change Stream Resume Error Tests', function() { }); }); }); + changeStream.hasNext((err, hasNext) => { expect(err).to.not.exist; expect(hasNext).to.be.true; diff --git a/test/functional/client_side_encryption/prose.test.js b/test/functional/client_side_encryption/prose.test.js index 0f28d29c06..f107c1ce42 100644 --- a/test/functional/client_side_encryption/prose.test.js +++ b/test/functional/client_side_encryption/prose.test.js @@ -23,7 +23,7 @@ describe('Client Side Encryption Prose Tests', function() { const shared = require('../shared'); const dropCollection = shared.dropCollection; - const EventCollector = shared.EventCollector; + const APMEventCollector = shared.APMEventCollector; const localKey = Buffer.from( 'Mng0NCt4ZHVUYUJCa1kxNkVyNUR1QURhZ2h2UzR2d2RrZzh0cFBwM3R6NmdWMDFBMUN3YkQ5aXRRMkhGRGdQV09wOGVNYUMxT2k3NjZKelhaQmRCZGJkTXVyZG9uSjFk', @@ -40,7 +40,7 @@ describe('Client Side Encryption Prose Tests', function() { // #. Create a MongoClient without encryption enabled (referred to as ``client``). Enable command monitoring to listen for command_started events. this.client = this.configuration.newClient({}, { monitorCommands: true }); - this.commandStartedEvents = new EventCollector(this.client, 'commandStarted', { + this.commandStartedEvents = new APMEventCollector(this.client, 'commandStarted', { exclude: ['ismaster'] }); @@ -501,7 +501,7 @@ describe('Client Side Encryption Prose Tests', function() { ); return this.clientEncrypted.connect().then(() => { this.encryptedColl = this.clientEncrypted.db(dataDbName).collection(dataCollName); - this.commandStartedEvents = new EventCollector(this.clientEncrypted, 'commandStarted', { + this.commandStartedEvents = new APMEventCollector(this.clientEncrypted, 'commandStarted', { include: ['insert'] }); }); @@ -769,7 +769,7 @@ describe('Client Side Encryption Prose Tests', function() { { monitorCommands: true } ); - this.commandStartedEvents = new EventCollector( + this.commandStartedEvents = new APMEventCollector( this.externalClient, 'commandStarted', { diff --git a/test/functional/core/operations.test.js b/test/functional/core/operations.test.js index 8ef5c8f2f7..855725e70c 100644 --- a/test/functional/core/operations.test.js +++ b/test/functional/core/operations.test.js @@ -96,7 +96,7 @@ describe('Operation tests', function() { it('should correctly execute find', { metadata: { - requires: { topology: ['single', 'replicaset', 'sharded'] } + requires: { topology: ['single', 'replicaset'] } }, test: function(done) { @@ -156,7 +156,7 @@ describe('Operation tests', function() { it('should correctly execute find with limit and skip', { metadata: { - requires: { topology: ['single', 'replicaset', 'sharded'] } + requires: { topology: ['single', 'replicaset'] } }, test: function(done) { @@ -218,7 +218,7 @@ describe('Operation tests', function() { it('should correctly execute find against document with result array field', { metadata: { - requires: { topology: ['single', 'replicaset', 'sharded'] } + requires: { topology: ['single', 'replicaset'] } }, test: function(done) { @@ -255,10 +255,16 @@ describe('Operation tests', function() { // Execute next cursor._next(function(cursorErr, cursorD) { - expect(cursorErr).to.be.null; - expect(cursorD.a).to.equal(1); - expect(cursorD.result[0].c).to.equal(1); - expect(cursorD.result[1].c).to.equal(2); + expect(cursorErr).to.not.exist; + expect(cursorD) + .property('a') + .to.equal(1); + expect(cursorD) + .nested.property('result[0].c') + .to.equal(1); + expect(cursorD) + .nested.property('result[1].c') + .to.equal(2); // Destroy the server connection _server.destroy(done); @@ -276,7 +282,7 @@ describe('Operation tests', function() { it('should correctly execute aggregation command', { metadata: { requires: { - topology: ['single', 'replicaset', 'sharded'], + topology: ['single', 'replicaset'], mongodb: '>=2.6.0' } }, @@ -297,7 +303,7 @@ describe('Operation tests', function() { ordered: true }, function(insertErr, insertResults) { - expect(insertErr).to.be.null; + expect(insertErr).to.not.exist; expect(insertResults) .nested.property('result.n') .to.equal(3); @@ -419,7 +425,7 @@ describe('Operation tests', function() { metadata: { requires: { mongodb: '>=2.6.0', - topology: ['single', 'replicaset', 'sharded'] + topology: ['single', 'replicaset'] } }, @@ -451,8 +457,11 @@ describe('Operation tests', function() { // Execute next cursor._next(function(cursorErr, cursorD) { - expect(cursorErr).to.be.null; - expect(cursorD.a).to.equal(1); + expect(cursorErr).to.not.exist; + expect(cursorD).to.exist; + expect(cursorD) + .property('a') + .to.equal(1); // Kill the cursor cursor.kill(function() { @@ -476,7 +485,7 @@ describe('Operation tests', function() { it('should correctly kill find command cursor', { metadata: { requires: { - topology: ['single', 'replicaset', 'sharded'] + topology: ['single', 'replicaset'] } }, @@ -497,7 +506,9 @@ describe('Operation tests', function() { }, function(insertErr, insertResults) { expect(insertErr).to.be.null; - expect(insertResults.result.n).to.equal(3); + expect(insertResults) + .nested.property('result.n') + .to.equal(3); // Execute find var cursor = _server.cursor(f('%s.inserts21', self.configuration.db), { diff --git a/test/functional/operation_promises_example.test.js b/test/functional/operation_promises_example.test.js index c7c6622ad8..3b489b0e01 100644 --- a/test/functional/operation_promises_example.test.js +++ b/test/functional/operation_promises_example.test.js @@ -5268,7 +5268,8 @@ describe('Operation (Promises)', function() { }); // END } - client + + return client .connect() .then(() => updateEmployeeInfo(client)) .then(() => client.close()); diff --git a/test/functional/shared.js b/test/functional/shared.js index 0f868e64bb..4bfc63524e 100644 --- a/test/functional/shared.js +++ b/test/functional/shared.js @@ -233,7 +233,7 @@ function withCursor(cursor, body, done) { * }); * }); */ -class EventCollector { +class APMEventCollector { constructor(client, eventName, options) { this._client = client; this._eventName = eventName; @@ -287,5 +287,5 @@ module.exports = { withClient, withMonitoredClient, withCursor, - EventCollector + APMEventCollector }; diff --git a/test/tools/runner/plugins/client_leak_checker.js b/test/tools/runner/plugins/client_leak_checker.js index 3e1519a950..1fb3b18393 100644 --- a/test/tools/runner/plugins/client_leak_checker.js +++ b/test/tools/runner/plugins/client_leak_checker.js @@ -47,4 +47,3 @@ after(function() { activeClients = []; }); - diff --git a/test/tools/runner/plugins/deferred.js b/test/tools/runner/plugins/deferred.js index 990b0e3030..14ae564881 100644 --- a/test/tools/runner/plugins/deferred.js +++ b/test/tools/runner/plugins/deferred.js @@ -7,26 +7,33 @@ const kDeferred = Symbol('deferred'); return () => { const deferredActions = test[kDeferred]; - return Promise.all( - Array.from(deferredActions).map(action => { - if (action.length > 0) { - // assume these are async methods with provided `done` - return new Promise((resolve, reject) => { - function done(err) { - if (err) return reject(err); - resolve(); - } - - action(done); - }); - } - - // otherwise assume a Promise is returned - return action(); - }) - ).then(() => { - test[kDeferred].clear(); - }); + // process actions LIFO + const promises = Array.from(deferredActions).reverse(); + const result = promises.reduce((p, action) => { + if (action.length > 0) { + // assume these are async methods with provided `done` + const actionPromise = new Promise((resolve, reject) => { + function done(err) { + if (err) return reject(err); + resolve(); + } + + action(done); + }); + + return p.then(actionPromise); + } + + return p.then(action); + }, Promise.resolve()); + + return result.then( + () => test[kDeferred].clear(), + err => { + test[kDeferred].clear(); + return Promise.reject(err); + } + ); }; }