Skip to content

Commit

Permalink
fix: allow client re-connect after close (#2615)
Browse files Browse the repository at this point in the history
`MongoClient#close` now removes the topology assigned to the client;
when a client is re-connected, a new topology is created and added to
the client. Existing references to Dbs and Collections can be re-used
after client reconnect.

NODE-2544
  • Loading branch information
HanaPearlman authored Nov 16, 2020
1 parent 4472308 commit 9a176ef
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 8 deletions.
2 changes: 1 addition & 1 deletion src/change_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,6 @@ function processNewChange(
}

function processError(changeStream: ChangeStream, error: AnyError, callback?: Callback) {
const topology = getTopology(changeStream.parent);
const cursor = changeStream.cursor;

// If the change stream has been closed explicitly, do not process error.
Expand Down Expand Up @@ -618,6 +617,7 @@ function processError(changeStream: ChangeStream, error: AnyError, callback?: Ca
// close internal cursor, ignore errors
cursor.close();

const topology = getTopology(changeStream.parent);
waitForTopologyConnected(topology, { readPreference: cursor.readPreference }, err => {
// if the topology can't reconnect, close the stream
if (err) return unresumableError(err);
Expand Down
3 changes: 3 additions & 0 deletions src/mongo_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,10 @@ export class MongoClient extends EventEmitter implements OperationParent {
return cb();
}

// clear out references to old topology
const topology = this.topology;
this.topology = undefined;

topology.close({ force }, err => {
const autoEncrypter = topology.s.options.autoEncrypter;
if (!autoEncrypter) {
Expand Down
5 changes: 5 additions & 0 deletions src/operations/connect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,11 @@ export function connect(
throw new Error('no callback function provided');
}

// If a connection already been established, we can terminate early
if (mongoClient.topology && mongoClient.topology.isConnected()) {
return callback(undefined, mongoClient);
}

let didRequestAuthentication = false;
const logger = new Logger('MongoClient', options);

Expand Down
60 changes: 57 additions & 3 deletions test/functional/connection.test.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'use strict';
const test = require('./shared').assert,
setupDatabase = require('./shared').setupDatabase,
expect = require('chai').expect;
const { withClient, setupDatabase } = require('./shared');
const test = require('./shared').assert;
const { expect } = require('chai');

describe('Connection', function () {
before(function () {
Expand Down Expand Up @@ -273,4 +273,58 @@ describe('Connection', function () {
done();
}
});

it(
'should be able to connect again after close',
withClient(function (client, done) {
expect(client.isConnected()).to.be.true;

const collection = client.db('shouldConnectAfterClose').collection('test');
collection.insertOne({ a: 1, b: 2 }, (err, result) => {
expect(err).to.not.exist;
expect(result).to.exist;

client.close(err => {
expect(err).to.not.exist;
expect(client.isConnected()).to.be.false;

client.connect(err => {
expect(err).to.not.exist;
expect(client.isConnected()).to.be.true;

collection.findOne({ a: 1 }, (err, result) => {
expect(err).to.not.exist;
expect(result).to.exist;
expect(result).to.have.property('a', 1);
expect(result).to.have.property('b', 2);
expect(client.topology.isDestroyed()).to.be.false;
done();
});
});
});
});
})
);

it(
'should correctly fail on retry when client has been closed',
withClient(function (client, done) {
expect(client.isConnected()).to.be.true;
const collection = client.db('shouldCorrectlyFailOnRetry').collection('test');
collection.insertOne({ a: 1 }, (err, result) => {
expect(err).to.not.exist;
expect(result).to.exist;

client.close(true, function (err) {
expect(err).to.not.exist;
expect(client.isConnected()).to.be.false;

expect(() => {
collection.insertOne({ a: 2 });
}).to.throw(/must be connected/);
done();
});
});
})
);
});
5 changes: 3 additions & 2 deletions test/functional/find.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -2637,15 +2637,16 @@ describe('Find', function () {
expect(err).to.not.exist;

let selectedServer;
const selectServerStub = sinon.stub(client.topology, 'selectServer').callsFake(function () {
const topology = client.topology;
const selectServerStub = sinon.stub(topology, 'selectServer').callsFake(function () {
const args = Array.prototype.slice.call(arguments);
const originalCallback = args.pop();
args.push((err, server) => {
selectedServer = server;
originalCallback(err, server);
});

return client.topology.selectServer.wrappedMethod.apply(this, args);
return topology.selectServer.wrappedMethod.apply(this, args);
});

const collection = client.db().collection('test_read_preference');
Expand Down
4 changes: 2 additions & 2 deletions test/functional/sessions.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ describe('Sessions', function () {
// verify that the `endSessions` command was sent
const lastCommand = test.commands.started[test.commands.started.length - 1];
expect(lastCommand.commandName).to.equal('endSessions');
expect(client.topology.s.sessionPool.sessions).to.have.length(0);
expect(client.topology).to.not.exist;
});
});
});
Expand All @@ -143,7 +143,7 @@ describe('Sessions', function () {
// verify that the `endSessions` command was sent
const lastCommand = test.commands.started[test.commands.started.length - 1];
expect(lastCommand.commandName).to.equal('endSessions');
expect(client.topology.s.sessionPool.sessions).to.have.length(0);
expect(client.topology).to.not.exist;
});
});
}
Expand Down

0 comments on commit 9a176ef

Please sign in to comment.