Skip to content

Commit

Permalink
fix: move session support check to operation layer (#2739)
Browse files Browse the repository at this point in the history
Monitor checks run on a timer and network errors cause a reset
of the topology that would be corrected in the next cycle of the
monitor, we were checking a property that gets updated by this async
work. By moving the check to the operations layer we will allow users
to obtain a session regardless of support and then emit an error if
there is not support when the session is used in an operation.

NODE-3100
  • Loading branch information
nbbeeken authored Mar 11, 2021
1 parent 2d76492 commit 8b370a7
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 96 deletions.
4 changes: 0 additions & 4 deletions lib/mongo_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -457,10 +457,6 @@ MongoClient.prototype.startSession = function(options) {
throw new MongoError('Must connect to a server before calling this method');
}

if (!this.topology.hasSessionSupport()) {
throw new MongoError('Current topology does not support sessions');
}

return this.topology.startSession(options, this.s.options);
};

Expand Down
116 changes: 47 additions & 69 deletions lib/operations/execute_operation.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
'use strict';

const maybePromise = require('../utils').maybePromise;
const MongoError = require('../core/error').MongoError;
const Aspect = require('./operation').Aspect;
const OperationBase = require('./operation').OperationBase;
Expand All @@ -21,7 +22,7 @@ const isUnifiedTopology = require('../core/utils').isUnifiedTopology;
* @param {Operation} operation The operation to execute
* @param {function} callback The command result callback
*/
function executeOperation(topology, operation, callback) {
function executeOperation(topology, operation, cb) {
if (topology == null) {
throw new TypeError('This method requires a valid topology instance');
}
Expand All @@ -30,64 +31,57 @@ function executeOperation(topology, operation, callback) {
throw new TypeError('This method requires a valid operation instance');
}

if (isUnifiedTopology(topology) && topology.shouldCheckForSessionSupport()) {
return selectServerForSessionSupport(topology, operation, callback);
}

const Promise = topology.s.promiseLibrary;

// The driver sessions spec mandates that we implicitly create sessions for operations
// that are not explicitly provided with a session.
let session, owner;
if (topology.hasSessionSupport()) {
if (operation.session == null) {
owner = Symbol();
session = topology.startSession({ owner });
operation.session = session;
} else if (operation.session.hasEnded) {
throw new MongoError('Use of expired sessions is not permitted');
return maybePromise(topology, cb, callback => {
if (isUnifiedTopology(topology) && topology.shouldCheckForSessionSupport()) {
// Recursive call to executeOperation after a server selection
return selectServerForSessionSupport(topology, operation, callback);
}
}

let result;
if (typeof callback !== 'function') {
result = new Promise((resolve, reject) => {
callback = (err, res) => {
if (err) return reject(err);
resolve(res);
};
});
}

function executeCallback(err, result) {
if (session && session.owner === owner) {
session.endSession();
if (operation.session === session) {
operation.clearSession();
// The driver sessions spec mandates that we implicitly create sessions for operations
// that are not explicitly provided with a session.
let session, owner;
if (topology.hasSessionSupport()) {
if (operation.session == null) {
owner = Symbol();
session = topology.startSession({ owner });
operation.session = session;
} else if (operation.session.hasEnded) {
return callback(new MongoError('Use of expired sessions is not permitted'));
}
} else if (operation.session) {
// If the user passed an explicit session and we are still, after server selection,
// trying to run against a topology that doesn't support sessions we error out.
return callback(new MongoError('Current topology does not support sessions'));
}

callback(err, result);
}

try {
if (operation.hasAspect(Aspect.EXECUTE_WITH_SELECTION)) {
executeWithServerSelection(topology, operation, executeCallback);
} else {
operation.execute(executeCallback);
}
} catch (e) {
if (session && session.owner === owner) {
session.endSession();
if (operation.session === session) {
operation.clearSession();
function executeCallback(err, result) {
if (session && session.owner === owner) {
session.endSession();
if (operation.session === session) {
operation.clearSession();
}
}

callback(err, result);
}

throw e;
}
try {
if (operation.hasAspect(Aspect.EXECUTE_WITH_SELECTION)) {
executeWithServerSelection(topology, operation, executeCallback);
} else {
operation.execute(executeCallback);
}
} catch (error) {
if (session && session.owner === owner) {
session.endSession();
if (operation.session === session) {
operation.clearSession();
}
}

return result;
callback(error);
}
});
}

function supportsRetryableReads(server) {
Expand Down Expand Up @@ -139,7 +133,6 @@ function executeWithServerSelection(topology, operation, callback) {
callback(err, null);
return;
}

const shouldRetryReads =
topology.s.options.retryReads !== false &&
operation.session &&
Expand All @@ -156,31 +149,16 @@ function executeWithServerSelection(topology, operation, callback) {
});
}

// TODO: This is only supported for unified topology, it should go away once
// we remove support for legacy topology types.
// The Unified Topology runs serverSelection before executing every operation
// Session support is determined by the result of a monitoring check triggered by this selection
function selectServerForSessionSupport(topology, operation, callback) {
const Promise = topology.s.promiseLibrary;

let result;
if (typeof callback !== 'function') {
result = new Promise((resolve, reject) => {
callback = (err, result) => {
if (err) return reject(err);
resolve(result);
};
});
}

topology.selectServer(ReadPreference.primaryPreferred, err => {
if (err) {
callback(err);
return;
return callback(err);
}

executeOperation(topology, operation, callback);
});

return result;
}

module.exports = executeOperation;
4 changes: 2 additions & 2 deletions test/functional/find.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1067,7 +1067,7 @@ describe('Find', function() {
{ $set: { name: 'test2' } },
{},
function(err, updated_doc) {
test.equal(null, updated_doc);
expect(updated_doc).to.not.exist;
test.ok(err != null);
client.close(done);
}
Expand Down Expand Up @@ -1305,7 +1305,7 @@ describe('Find', function() {
{ a: 10, b: 10, failIndex: 2 },
{ w: 1, upsert: true },
function(err, result) {
test.equal(null, result);
expect(result).to.not.exist;
test.ok(err.errmsg.match('duplicate key'));
client.close(done);
}
Expand Down
10 changes: 5 additions & 5 deletions test/functional/insert.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1097,8 +1097,8 @@ describe('Insert', function() {
var db = client.db(configuration.db);
var collection = db.collection('Should_fail_on_insert_due_to_key_starting_with');
collection.insert(doc, configuration.writeConcernMax(), function(err, result) {
test.ok(err != null);
test.equal(null, result);
expect(err).to.exist;
expect(result).to.not.exist;

client.close(done);
});
Expand Down Expand Up @@ -1348,7 +1348,7 @@ describe('Insert', function() {

// Update two fields
collection.insert({ _id: 1 }, configuration.writeConcernMax(), function(err, r) {
test.equal(r, null);
expect(r).to.not.exist;
test.ok(err != null);
test.ok(err.result);

Expand Down Expand Up @@ -2560,7 +2560,7 @@ describe('Insert', function() {
[{ a: 1 }, { a: 2 }, { a: 1 }, { a: 3 }, { a: 1 }],
{ ordered: true },
function(err, r) {
test.equal(r, null);
expect(r).to.not.exist;
test.ok(err != null);
test.ok(err.result);

Expand Down Expand Up @@ -2601,7 +2601,7 @@ describe('Insert', function() {
[{ a: 1 }, { a: 2 }, { a: 1 }, { a: 3 }, { a: 1 }],
{ ordered: true },
function(err, r) {
test.equal(r, null);
expect(r).to.not.exist;
test.ok(err != null);
test.ok(err.result);

Expand Down
4 changes: 2 additions & 2 deletions test/functional/operation_example.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -2657,7 +2657,7 @@ describe('Operation Examples', function() {
],
{ w: 1, keepGoing: true },
function(err, result) {
test.equal(result, null);
expect(result).to.not.exist;
test.ok(err);
test.ok(err.result);

Expand Down Expand Up @@ -3115,7 +3115,7 @@ describe('Operation Examples', function() {

// Attemp to rename the first collection to the second one, this will fail
collection1.rename('test_rename_collection2', function(err, collection) {
test.equal(null, collection);
expect(collection).to.not.exist;
test.ok(err instanceof Error);
test.ok(err.message.length > 0);

Expand Down
36 changes: 22 additions & 14 deletions test/unit/sessions/client.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ describe('Sessions', function() {
});
});

it('should throw an exception if sessions are not supported', {
it('should not throw a synchronous exception if sessions are not supported', {
metadata: { requires: { topology: 'single' } },
test: function(done) {
test() {
test.server.setMessageHandler(request => {
var doc = request.document;
if (doc.ismaster) {
Expand All @@ -27,13 +27,11 @@ describe('Sessions', function() {
});

const client = this.configuration.newClient(`mongodb://${test.server.uri()}/test`);
client.connect(function(err, client) {
expect(err).to.not.exist;
expect(() => {
client.startSession();
}).to.throw(/Current topology does not support sessions/);

client.close(done);
return client.connect().then(() => {
expect(() => client.startSession()).to.not.throw(
'Current topology does not support sessions'
);
return client.close();
});
}
});
Expand All @@ -42,6 +40,7 @@ describe('Sessions', function() {
metadata: { requires: { topology: 'single' } },
test() {
const replicaSetMock = new ReplSetFixture();
let client;
return replicaSetMock
.setup({ doNotInitHandlers: true })
.then(() => {
Expand Down Expand Up @@ -92,14 +91,23 @@ describe('Sessions', function() {
return replicaSetMock.uri();
})
.then(uri => {
const client = this.configuration.newClient(uri);
client = this.configuration.newClient(uri);
return client.connect();
})
.then(client => {
expect(client.topology.s.description.logicalSessionTimeoutMinutes).to.not.exist;
expect(() => {
client.startSession();
}).to.throw(/Current topology does not support sessions/);
const session = client.startSession();
return client
.db()
.collection('t')
.insertOne({ a: 1 }, { session });
})
.then(() => {
expect.fail('Expected an error to be thrown about not supporting sessions');
})
.catch(error => {
expect(error.message).to.equal('Current topology does not support sessions');
})
.then(() => {
return client.close();
});
}
Expand Down

0 comments on commit 8b370a7

Please sign in to comment.