diff --git a/lib/cmap/connection.js b/lib/cmap/connection.js index e86b6f342f..fad9298069 100644 --- a/lib/cmap/connection.js +++ b/lib/cmap/connection.js @@ -3,6 +3,7 @@ const EventEmitter = require('events'); const MessageStream = require('./message_stream'); const MongoError = require('../core/error').MongoError; +const MongoNetworkError = require('../core/error').MongoNetworkError; const MongoWriteConcernError = require('../core/error').MongoWriteConcernError; const wp = require('../core/wireprotocol'); const apm = require('../core/connection/apm'); @@ -26,6 +27,7 @@ class Connection extends EventEmitter { this.socketTimeout = typeof options.socketTimeout === 'number' ? options.socketTimeout : 360000; this.monitorCommands = typeof options.monitorCommands === 'boolean' ? options.monitorCommands : false; + this.closed = false; this[kGeneration] = options.generation; this[kLastUseTime] = Date.now(); @@ -40,7 +42,10 @@ class Connection extends EventEmitter { }); stream.on('close', () => { - this[kQueue].forEach(op => op.cb(new MongoError('Connection closed'))); + this.closed = true; + this[kQueue].forEach(op => + op.cb(new MongoNetworkError(`connection ${this.id} to ${this.address} closed`)) + ); this[kQueue].clear(); this.emit('close'); diff --git a/lib/cmap/connection_pool.js b/lib/cmap/connection_pool.js index 77475b8a8c..1f363f5d63 100644 --- a/lib/cmap/connection_pool.js +++ b/lib/cmap/connection_pool.js @@ -165,13 +165,14 @@ class ConnectionPool extends EventEmitter { const connection = pool[kConnections].pop(); const isStale = connectionIsStale(pool, connection); const isIdle = connectionIsIdle(pool, connection); - if (!isStale && !isIdle) { + if (!isStale && !isIdle && !connection.closed) { pool.emit('connectionCheckedOut', new ConnectionCheckedOutEvent(pool, connection)); callback(null, connection); return; } - destroyConnection(pool, connection, isStale ? 'stale' : 'idle'); + const reason = connection.closed ? 'error' : isStale ? 'stale' : 'idle'; + destroyConnection(pool, connection, reason); } if (maxPoolSize <= 0 || pool.totalConnectionCount < maxPoolSize) { @@ -208,9 +209,9 @@ class ConnectionPool extends EventEmitter { * @param {Connection} connection The connection to check in */ checkIn(connection) { - const closed = this.closed; + const poolClosed = this.closed; const stale = connectionIsStale(this, connection); - const willDestroy = !!(closed || stale); + const willDestroy = !!(poolClosed || stale || connection.closed); // Properly adjust state of connection if (!willDestroy) { @@ -221,7 +222,8 @@ class ConnectionPool extends EventEmitter { this.emit('connectionCheckedIn', new ConnectionCheckedInEvent(this, connection)); if (willDestroy) { - destroyConnection(this, connection, closed ? 'poolClosed' : 'stale'); + const reason = connection.closed ? 'error' : poolClosed ? 'poolClosed' : 'stale'; + destroyConnection(this, connection, reason); } } diff --git a/test/unit/cmap/connection_pool.test.js b/test/unit/cmap/connection_pool.test.js index 27ca2a91ef..e824b4204c 100644 --- a/test/unit/cmap/connection_pool.test.js +++ b/test/unit/cmap/connection_pool.test.js @@ -44,6 +44,49 @@ describe('Connection Pool', function() { mock.createServer().then(s => (server = s)); }); + it('should destroy connections which have been closed', function(done) { + server.setMessageHandler(request => { + const doc = request.document; + if (doc.ismaster) { + request.reply(mock.DEFAULT_ISMASTER_36); + } else { + // destroy on any other command + request.connection.destroy(); + } + }); + + const pool = new ConnectionPool( + Object.assign({ bson: new BSON(), maxPoolSize: 1 }, server.address()) + ); + + const events = []; + pool.on('connectionClosed', event => events.push(event)); + + pool.checkOut((err, conn) => { + expect(err).to.not.exist; + + conn.command('admin.$cmd', { ping: 1 }, (err, result) => { + expect(err).to.exist; + expect(result).to.not.exist; + + pool.checkIn(conn); + + expect(events).to.have.length(1); + const closeEvent = events[0]; + expect(closeEvent) + .have.property('reason') + .equal('error'); + + pool.close(done); + }); + }); + + pool.withConnection((err, conn, cb) => { + expect(err).to.not.exist; + cb(); + }); + }); + describe('withConnection', function() { it('should manage a connection for a successful operation', function(done) { server.setMessageHandler(request => {