From 1b60b6c6ed23004f0f206f6d538790f31f8a09e5 Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Mon, 24 Oct 2022 11:00:20 -0400 Subject: [PATCH 01/18] unskip tests and update cmap runner to use interruptInUseConnections --- .../connection_monitoring_and_pooling.spec.test.ts | 12 +----------- .../server_discovery_and_monitoring.spec.test.ts | 13 +------------ test/tools/cmap_spec_runner.ts | 4 +--- 3 files changed, 3 insertions(+), 26 deletions(-) diff --git a/test/integration/connection-monitoring-and-pooling/connection_monitoring_and_pooling.spec.test.ts b/test/integration/connection-monitoring-and-pooling/connection_monitoring_and_pooling.spec.test.ts index 66b0b40d28..d0eb20858d 100644 --- a/test/integration/connection-monitoring-and-pooling/connection_monitoring_and_pooling.spec.test.ts +++ b/test/integration/connection-monitoring-and-pooling/connection_monitoring_and_pooling.spec.test.ts @@ -19,16 +19,6 @@ const LB_SKIP_TESTS: SkipDescription[] = [ skipReason: 'cannot run against a load balanced environment' })); -const INTERRUPT_IN_USE_CONNECTIONS_TESTS: SkipDescription[] = [ - 'Connections MUST be interrupted as soon as possible (interruptInUseConnections=true)', - 'Pool clear SHOULD schedule the next background thread run immediately (interruptInUseConnections: false)', - 'clear with interruptInUseConnections = true closes pending connections' -].map(description => ({ - description, - skipIfCondition: 'always', - skipReason: 'TODO(NODE-4691): cancel inflight operations when heartbeat fails' -})); - describe('Connection Monitoring and Pooling Spec Tests (Integration)', function () { const tests: CmapTest[] = loadSpecTests('connection-monitoring-and-pooling'); @@ -40,6 +30,6 @@ describe('Connection Monitoring and Pooling Spec Tests (Integration)', function skipReason: 'not applicable: waitQueueTimeoutMS limits connection establishment time in our driver' } - ]).concat(INTERRUPT_IN_USE_CONNECTIONS_TESTS) + ]) }); }); diff --git a/test/integration/server-discovery-and-monitoring/server_discovery_and_monitoring.spec.test.ts b/test/integration/server-discovery-and-monitoring/server_discovery_and_monitoring.spec.test.ts index a990353aca..74fc67efc6 100644 --- a/test/integration/server-discovery-and-monitoring/server_discovery_and_monitoring.spec.test.ts +++ b/test/integration/server-discovery-and-monitoring/server_discovery_and_monitoring.spec.test.ts @@ -4,16 +4,5 @@ import { loadSpecTests } from '../../spec'; import { runUnifiedSuite } from '../../tools/unified-spec-runner/runner'; describe('SDAM Unified Tests', function () { - const sdamPoolClearedTests = [ - 'Connection pool clear uses interruptInUseConnections=true after monitor timeout', - 'Error returned from connection pool clear with interruptInUseConnections=true is retryable', - 'Error returned from connection pool clear with interruptInUseConnections=true is retryable for write' - ]; - runUnifiedSuite( - loadSpecTests(path.join('server-discovery-and-monitoring', 'unified')), - ({ description }) => - sdamPoolClearedTests.includes(description) - ? 'TODO(NODE-4691): interrupt in-use operations on heartbeat failure' - : false - ); + runUnifiedSuite(loadSpecTests(path.join('server-discovery-and-monitoring', 'unified'))); }); diff --git a/test/tools/cmap_spec_runner.ts b/test/tools/cmap_spec_runner.ts index d5384fc287..ab3fe03bf8 100644 --- a/test/tools/cmap_spec_runner.ts +++ b/test/tools/cmap_spec_runner.ts @@ -197,10 +197,8 @@ const getTestOpDefinitions = (threadContext: ThreadContext) => ({ return threadContext.pool.checkIn(connection); }, - // eslint-disable-next-line @typescript-eslint/no-unused-vars clear: function (interruptInUseConnections: boolean) { - // TODO(NODE-4619): pass interruptInUseConnections into clear pool method - return threadContext.pool.clear(); + return threadContext.pool.clear({ interruptInUseConnections }); }, close: async function () { return await promisify(ConnectionPool.prototype.close).call(threadContext.pool); From 1e2cb076b6a7327f48f130cf1278d43587f87704 Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Mon, 24 Oct 2022 11:00:41 -0400 Subject: [PATCH 02/18] add poolcleared network error --- src/cmap/errors.ts | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/src/cmap/errors.ts b/src/cmap/errors.ts index 4e0d25a27c..e6d8c9256a 100644 --- a/src/cmap/errors.ts +++ b/src/cmap/errors.ts @@ -39,6 +39,23 @@ export class PoolClearedError extends MongoNetworkError { } } +/** + * An error indicating that a connection pool has been cleared after the monitor for that server timed out. + * @category Error + */ +export class PoolClearedOnNetworkError extends MongoNetworkError { + /** The address of the connection pool */ + address: string; + constructor(pool: ConnectionPool) { + super(`Connection to ${pool.address} interrupted due to server monitor timeout`); + this.address = pool.address; + } + + override get name(): string { + return 'PoolClearedOnNetworkError'; + } +} + /** * An error thrown when a request to check out a connection times out * @category Error From c1d3285601451905d31573f3de6136db320d3764 Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Mon, 24 Oct 2022 11:32:12 -0400 Subject: [PATCH 03/18] refactor: make Pool.clear take options object --- src/cmap/connection_pool.ts | 3 ++- src/sdam/server.ts | 4 ++-- src/sessions.ts | 2 +- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index f621b4f9b4..d7f7e70406 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -408,7 +408,8 @@ export class ConnectionPool extends TypedEventEmitter { * Pool reset is handled by incrementing the pool's generation count. Any existing connection of a * previous generation will eventually be pruned during subsequent checkouts. */ - clear(serviceId?: ObjectId): void { + clear(options: { serviceId?: ObjectId } = {}): void { + const { serviceId } = options; if (this.closed) { return; } diff --git a/src/sdam/server.ts b/src/sdam/server.ts index a693bf3bc8..a2930caf8b 100644 --- a/src/sdam/server.ts +++ b/src/sdam/server.ts @@ -400,14 +400,14 @@ export class Server extends TypedEventEmitter { error.addErrorLabel(MongoErrorLabel.ResetPool); markServerUnknown(this, error); } else if (connection) { - this.s.pool.clear(connection.serviceId); + this.s.pool.clear({ serviceId: connection.serviceId }); } } else { if (isSDAMUnrecoverableError(error)) { if (shouldHandleStateChangeError(this, error)) { const shouldClearPool = maxWireVersion(this) <= 7 || isNodeShuttingDownError(error); if (this.loadBalanced && connection && shouldClearPool) { - this.s.pool.clear(connection.serviceId); + this.s.pool.clear({ serviceId: connection.serviceId }); } if (!this.loadBalanced) { diff --git a/src/sessions.ts b/src/sessions.ts index 21468aff11..f2b7d5281e 100644 --- a/src/sessions.ts +++ b/src/sessions.ts @@ -537,7 +537,7 @@ export function maybeClearPinnedConnection( ); if (options?.forceClear) { - loadBalancer.s.pool.clear(conn.serviceId); + loadBalancer.s.pool.clear({ serviceId: conn.serviceId }); } } From 70c847db2f8543839cf0cd8212330005aceffe1c Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Mon, 24 Oct 2022 14:57:10 -0400 Subject: [PATCH 04/18] feat: kill in-flight operations when monitor fails --- src/cmap/connection_pool.ts | 81 +++++++++++++++++-- src/cmap/errors.ts | 4 +- src/error.ts | 1 + src/sdam/monitor.ts | 5 +- src/sdam/topology.ts | 7 +- ...ection_monitoring_and_pooling.spec.test.ts | 27 +++++-- test/tools/cmap_spec_runner.ts | 2 +- 7 files changed, 107 insertions(+), 20 deletions(-) diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index d7f7e70406..6b6966e5a8 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -41,7 +41,12 @@ import { ConnectionPoolReadyEvent, ConnectionReadyEvent } from './connection_pool_events'; -import { PoolClearedError, PoolClosedError, WaitQueueTimeoutError } from './errors'; +import { + PoolClearedError, + PoolClearedOnNetworkError, + PoolClosedError, + WaitQueueTimeoutError +} from './errors'; import { ConnectionPoolMetrics } from './metrics'; /** @internal */ @@ -391,10 +396,10 @@ export class ConnectionPool extends TypedEventEmitter { this[kConnections].unshift(connection); } - this[kCheckedOut].delete(connection); + const wasConnectionDeleted = this[kCheckedOut].delete(connection); this.emit(ConnectionPool.CONNECTION_CHECKED_IN, new ConnectionCheckedInEvent(this, connection)); - if (willDestroy) { + if (wasConnectionDeleted && willDestroy) { const reason = connection.closed ? 'error' : poolClosed ? 'poolClosed' : 'stale'; this.destroyConnection(connection, reason); } @@ -408,8 +413,9 @@ export class ConnectionPool extends TypedEventEmitter { * Pool reset is handled by incrementing the pool's generation count. Any existing connection of a * previous generation will eventually be pruned during subsequent checkouts. */ - clear(options: { serviceId?: ObjectId } = {}): void { + clear(options: { serviceId?: ObjectId; interruptInUseConnections?: boolean } = {}): void { const { serviceId } = options; + const interruptInUseConnections = options.interruptInUseConnections ?? false; if (this.closed) { return; } @@ -433,6 +439,8 @@ export class ConnectionPool extends TypedEventEmitter { return; } + const oldGeneration = this[kGeneration]; + // handle non load-balanced case this[kGeneration] += 1; const alreadyPaused = this[kPoolState] === PoolState.paused; @@ -440,11 +448,63 @@ export class ConnectionPool extends TypedEventEmitter { this.clearMinPoolSizeTimer(); if (!alreadyPaused) { - this.emit(ConnectionPool.CONNECTION_POOL_CLEARED, new ConnectionPoolClearedEvent(this)); + this.emit( + ConnectionPool.CONNECTION_POOL_CLEARED, + new ConnectionPoolClearedEvent(this, { interruptInUseConnections }) + ); } + + process.nextTick(() => + this.pruneConnections({ minGeneration: oldGeneration, interruptInUseConnections }) + ); + this.processWaitQueue(); } + /** + * Closes all checked in perished connections in the pool with a resumable PoolClearedOnNetworkError. + * + * If interruptInUseConnections is `true`, this method attempts to kill checked out connections as well. + * Only connections where `connection.generation <= minGeneration` are killed. Connections are closed with a + * resumable PoolClearedOnNetworkTimeoutError. + */ + private pruneConnections({ + interruptInUseConnections, + minGeneration + }: { + interruptInUseConnections: boolean; + minGeneration: number; + }) { + this[kConnections].prune(connection => { + if (connection.generation <= minGeneration) { + connection.onError(new PoolClearedOnNetworkError(this)); + this.emit( + ConnectionPool.CONNECTION_CLOSED, + new ConnectionClosedEvent(this, connection, 'stale') + ); + + return true; + } + return false; + }); + + if (interruptInUseConnections) { + for (const connection of this[kCheckedOut]) { + if (connection.generation <= minGeneration) { + this[kCheckedOut].delete(connection); + connection.onError(new PoolClearedOnNetworkError(this)); + this.emit( + ConnectionPool.CONNECTION_CLOSED, + new ConnectionClosedEvent(this, connection, 'stale') + ); + } + } + + // TODO(NODE-xxxx): track pending connections and cancel + // this[kCancellationToken].emit('cancel'); + } + } + /** Close the pool */ close(callback: Callback): void; close(options: CloseOptions, callback: Callback): void; @@ -573,7 +633,12 @@ export class ConnectionPool extends TypedEventEmitter { return !!(this.options.maxIdleTimeMS && connection.idleTime > this.options.maxIdleTimeMS); } - private connectionIsPerished(connection: Connection) { + /** + * Destroys a connection if the connection is perished. + * + * @returns `true` if the connection was destroyed, `false` otherwise. + */ + private destroyConnectionIfPerished(connection: Connection) { const isStale = this.connectionIsStale(connection); const isIdle = this.connectionIsIdle(connection); if (!isStale && !isIdle && !connection.closed) { @@ -659,7 +724,7 @@ export class ConnectionPool extends TypedEventEmitter { return; } - this[kConnections].prune(connection => this.connectionIsPerished(connection)); + this[kConnections].prune(connection => this.destroyConnectionIfPerished(connection)); if ( this.totalConnectionCount < minPoolSize && @@ -735,7 +800,7 @@ export class ConnectionPool extends TypedEventEmitter { break; } - if (!this.connectionIsPerished(connection)) { + if (!this.destroyConnectionIfPerished(connection)) { this[kCheckedOut].add(connection); this.emit( ConnectionPool.CONNECTION_CHECKED_OUT, diff --git a/src/cmap/errors.ts b/src/cmap/errors.ts index e6d8c9256a..002333401e 100644 --- a/src/cmap/errors.ts +++ b/src/cmap/errors.ts @@ -1,4 +1,4 @@ -import { MongoDriverError, MongoNetworkError } from '../error'; +import { MongoDriverError, MongoErrorLabel, MongoNetworkError } from '../error'; import type { ConnectionPool } from './connection_pool'; /** @@ -49,6 +49,8 @@ export class PoolClearedOnNetworkError extends MongoNetworkError { constructor(pool: ConnectionPool) { super(`Connection to ${pool.address} interrupted due to server monitor timeout`); this.address = pool.address; + + this.addErrorLabel(MongoErrorLabel.RetryableWriteError); } override get name(): string { diff --git a/src/error.ts b/src/error.ts index 800be95c8b..1dd426cb4f 100644 --- a/src/error.ts +++ b/src/error.ts @@ -91,6 +91,7 @@ export const MongoErrorLabel = Object.freeze({ ResumableChangeStreamError: 'ResumableChangeStreamError', HandshakeError: 'HandshakeError', ResetPool: 'ResetPool', + InterruptInUseConnections: 'InterruptInUseConnections', NoWritesPerformed: 'NoWritesPerformed' } as const); diff --git a/src/sdam/monitor.ts b/src/sdam/monitor.ts index 3711dc59ed..b35093b435 100644 --- a/src/sdam/monitor.ts +++ b/src/sdam/monitor.ts @@ -4,7 +4,7 @@ import { Document, Long } from '../bson'; import { connect } from '../cmap/connect'; import { Connection, ConnectionOptions } from '../cmap/connection'; import { LEGACY_HELLO_COMMAND } from '../constants'; -import { MongoError, MongoErrorLabel } from '../error'; +import { MongoError, MongoErrorLabel, MongoNetworkTimeoutError } from '../error'; import { CancellationToken, TypedEventEmitter } from '../mongo_types'; import type { Callback } from '../utils'; import { calculateDurationInMs, EventEmitterWithState, makeStateMachine, now, ns } from '../utils'; @@ -221,6 +221,9 @@ function checkServer(monitor: Monitor, callback: Callback) { const error = !(err instanceof MongoError) ? new MongoError(err) : err; error.addErrorLabel(MongoErrorLabel.ResetPool); + if (error instanceof MongoNetworkTimeoutError) { + error.addErrorLabel(MongoErrorLabel.InterruptInUseConnections); + } monitor.emit('resetServer', error); callback(err); diff --git a/src/sdam/topology.ts b/src/sdam/topology.ts index d157ef63f6..cd2c48655d 100644 --- a/src/sdam/topology.ts +++ b/src/sdam/topology.ts @@ -6,6 +6,7 @@ import { deserialize, serialize } from '../bson'; import type { MongoCredentials } from '../cmap/auth/mongo_credentials'; import type { ConnectionEvents, DestroyOptions } from '../cmap/connection'; import type { CloseOptions, ConnectionPoolEvents } from '../cmap/connection_pool'; +import { PoolClearedOnNetworkError } from '../cmap/errors'; import { DEFAULT_OPTIONS, FEATURE_FLAGS } from '../connection_string'; import { CLOSE, @@ -839,7 +840,11 @@ function updateServers(topology: Topology, incomingServerDescription?: ServerDes incomingServerDescription.error instanceof MongoError && incomingServerDescription.error.hasErrorLabel(MongoErrorLabel.ResetPool) ) { - server.s.pool.clear(); + const interruptInUseConnections = incomingServerDescription.error.hasErrorLabel( + MongoErrorLabel.InterruptInUseConnections + ); + + server.s.pool.clear({ interruptInUseConnections }); } else if (incomingServerDescription.error == null) { const newTopologyType = topology.s.description.type; const shouldMarkPoolReady = diff --git a/test/integration/connection-monitoring-and-pooling/connection_monitoring_and_pooling.spec.test.ts b/test/integration/connection-monitoring-and-pooling/connection_monitoring_and_pooling.spec.test.ts index d0eb20858d..8cd12bf2c5 100644 --- a/test/integration/connection-monitoring-and-pooling/connection_monitoring_and_pooling.spec.test.ts +++ b/test/integration/connection-monitoring-and-pooling/connection_monitoring_and_pooling.spec.test.ts @@ -19,17 +19,28 @@ const LB_SKIP_TESTS: SkipDescription[] = [ skipReason: 'cannot run against a load balanced environment' })); +const INTERRUPT_IN_USE_SKIPPED_TESTS: SkipDescription[] = [ + { + description: 'clear with interruptInUseConnections = true closes pending connections', + skipIfCondition: 'always', + skipReason: 'TODO(NODE-xxxx): track and kill pending connections' + } +]; + describe('Connection Monitoring and Pooling Spec Tests (Integration)', function () { const tests: CmapTest[] = loadSpecTests('connection-monitoring-and-pooling'); runCmapTestSuite(tests, { - testsToSkip: LB_SKIP_TESTS.concat([ - { - description: 'waiting on maxConnecting is limited by WaitQueueTimeoutMS', - skipIfCondition: 'always', - skipReason: - 'not applicable: waitQueueTimeoutMS limits connection establishment time in our driver' - } - ]) + testsToSkip: LB_SKIP_TESTS.concat( + [ + { + description: 'waiting on maxConnecting is limited by WaitQueueTimeoutMS', + skipIfCondition: 'always', + skipReason: + 'not applicable: waitQueueTimeoutMS limits connection establishment time in our driver' + } + ], + INTERRUPT_IN_USE_SKIPPED_TESTS + ) }); }); diff --git a/test/tools/cmap_spec_runner.ts b/test/tools/cmap_spec_runner.ts index ab3fe03bf8..634e732f3e 100644 --- a/test/tools/cmap_spec_runner.ts +++ b/test/tools/cmap_spec_runner.ts @@ -197,7 +197,7 @@ const getTestOpDefinitions = (threadContext: ThreadContext) => ({ return threadContext.pool.checkIn(connection); }, - clear: function (interruptInUseConnections: boolean) { + clear: function ({ interruptInUseConnections }: { interruptInUseConnections: boolean }) { return threadContext.pool.clear({ interruptInUseConnections }); }, close: async function () { From 1135d6cc31bf24c47fdbdd801a83b3eb86832c74 Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Mon, 31 Oct 2022 09:31:21 -0400 Subject: [PATCH 05/18] chore: fix lint --- src/sdam/topology.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/src/sdam/topology.ts b/src/sdam/topology.ts index cd2c48655d..601ae2c382 100644 --- a/src/sdam/topology.ts +++ b/src/sdam/topology.ts @@ -6,7 +6,6 @@ import { deserialize, serialize } from '../bson'; import type { MongoCredentials } from '../cmap/auth/mongo_credentials'; import type { ConnectionEvents, DestroyOptions } from '../cmap/connection'; import type { CloseOptions, ConnectionPoolEvents } from '../cmap/connection_pool'; -import { PoolClearedOnNetworkError } from '../cmap/errors'; import { DEFAULT_OPTIONS, FEATURE_FLAGS } from '../connection_string'; import { CLOSE, From 7c7c607da153d7b815452e1678c8e5606b71e351 Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Mon, 31 Oct 2022 09:38:59 -0400 Subject: [PATCH 06/18] chore: fix unit tests --- test/unit/sdam/server.test.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/test/unit/sdam/server.test.ts b/test/unit/sdam/server.test.ts index cf810d8754..188c290ba4 100644 --- a/test/unit/sdam/server.test.ts +++ b/test/unit/sdam/server.test.ts @@ -105,7 +105,9 @@ describe('Server', () => { expect(newDescription).to.have.nested.property('[0].type', ServerType.Unknown); } else { expect(newDescription).to.be.undefined; - expect(server.s.pool.clear).to.have.been.calledOnceWith(connection!.serviceId); + expect(server.s.pool.clear).to.have.been.calledOnceWith({ + serviceId: connection!.serviceId + }); } }); From ab16a2f3b4195ab068521fb3a3e81fcc3d97299d Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Mon, 31 Oct 2022 10:40:08 -0400 Subject: [PATCH 07/18] chore: add link to follow up ticket in TODO --- .../connection_monitoring_and_pooling.spec.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/integration/connection-monitoring-and-pooling/connection_monitoring_and_pooling.spec.test.ts b/test/integration/connection-monitoring-and-pooling/connection_monitoring_and_pooling.spec.test.ts index 8cd12bf2c5..9113b23aff 100644 --- a/test/integration/connection-monitoring-and-pooling/connection_monitoring_and_pooling.spec.test.ts +++ b/test/integration/connection-monitoring-and-pooling/connection_monitoring_and_pooling.spec.test.ts @@ -23,7 +23,7 @@ const INTERRUPT_IN_USE_SKIPPED_TESTS: SkipDescription[] = [ { description: 'clear with interruptInUseConnections = true closes pending connections', skipIfCondition: 'always', - skipReason: 'TODO(NODE-xxxx): track and kill pending connections' + skipReason: 'TODO(NODE-4784): track and kill pending connections' } ]; From c7cda84a7ec6d8ca50915b80d5b882d84a388716 Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Mon, 31 Oct 2022 10:58:44 -0400 Subject: [PATCH 08/18] add todo toconnection pool --- src/cmap/connection_pool.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index 6b6966e5a8..de99c76918 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -500,7 +500,7 @@ export class ConnectionPool extends TypedEventEmitter { } } - // TODO(NODE-xxxx): track pending connections and cancel + // TODO(NODE-4784): track pending connections and cancel // this[kCancellationToken].emit('cancel'); } } From 97d72a7b1b96ab6d729c0704e94d278ddd46e5fb Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Mon, 31 Oct 2022 11:08:36 -0400 Subject: [PATCH 09/18] refactor: make new error subclass PoolClearedError --- src/cmap/errors.ts | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/src/cmap/errors.ts b/src/cmap/errors.ts index 002333401e..0a518d0904 100644 --- a/src/cmap/errors.ts +++ b/src/cmap/errors.ts @@ -27,10 +27,11 @@ export class PoolClearedError extends MongoNetworkError { /** The address of the connection pool */ address: string; - constructor(pool: ConnectionPool) { - super( - `Connection pool for ${pool.address} was cleared because another operation failed with: "${pool.serverError?.message}"` - ); + constructor(pool: ConnectionPool, message?: string) { + const errorMessage = message + ? message + : `Connection pool for ${pool.address} was cleared because another operation failed with: "${pool.serverError?.message}"`; + super(errorMessage); this.address = pool.address; } @@ -43,12 +44,9 @@ export class PoolClearedError extends MongoNetworkError { * An error indicating that a connection pool has been cleared after the monitor for that server timed out. * @category Error */ -export class PoolClearedOnNetworkError extends MongoNetworkError { - /** The address of the connection pool */ - address: string; +export class PoolClearedOnNetworkError extends PoolClearedError { constructor(pool: ConnectionPool) { - super(`Connection to ${pool.address} interrupted due to server monitor timeout`); - this.address = pool.address; + super(pool, `Connection to ${pool.address} interrupted due to server monitor timeout`); this.addErrorLabel(MongoErrorLabel.RetryableWriteError); } From c987ade351913b2e03b7967952608c1133d27c2c Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Fri, 18 Nov 2022 14:41:21 -0500 Subject: [PATCH 10/18] address comments --- src/cmap/connection_pool.ts | 45 +++++-------------- ...ection_monitoring_and_pooling.spec.test.ts | 7 +++ 2 files changed, 18 insertions(+), 34 deletions(-) diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index de99c76918..97c6c06364 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -415,7 +415,6 @@ export class ConnectionPool extends TypedEventEmitter { */ clear(options: { serviceId?: ObjectId; interruptInUseConnections?: boolean } = {}): void { const { serviceId } = options; - const interruptInUseConnections = options.interruptInUseConnections ?? false; if (this.closed) { return; } @@ -438,10 +437,9 @@ export class ConnectionPool extends TypedEventEmitter { ); return; } - - const oldGeneration = this[kGeneration]; - // handle non load-balanced case + const interruptInUseConnections = options.interruptInUseConnections ?? false; + const oldGeneration = this[kGeneration]; this[kGeneration] += 1; const alreadyPaused = this[kPoolState] === PoolState.paused; this[kPoolState] = PoolState.paused; @@ -454,9 +452,9 @@ export class ConnectionPool extends TypedEventEmitter { ); } - process.nextTick(() => - this.pruneConnections({ minGeneration: oldGeneration, interruptInUseConnections }) - ); + if (interruptInUseConnections) { + process.nextTick(() => this.interruptInUseConnections(oldGeneration)); + } this.processWaitQueue(); } @@ -468,41 +466,20 @@ export class ConnectionPool extends TypedEventEmitter { * Only connections where `connection.generation <= minGeneration` are killed. Connections are closed with a * resumable PoolClearedOnNetworkTimeoutError. */ - private pruneConnections({ - interruptInUseConnections, - minGeneration - }: { - interruptInUseConnections: boolean; - minGeneration: number; - }) { - this[kConnections].prune(connection => { + private interruptInUseConnections(minGeneration: number) { + for (const connection of this[kCheckedOut]) { if (connection.generation <= minGeneration) { + this[kCheckedOut].delete(connection); connection.onError(new PoolClearedOnNetworkError(this)); this.emit( ConnectionPool.CONNECTION_CLOSED, new ConnectionClosedEvent(this, connection, 'stale') ); - - return true; } - return false; - }); - - if (interruptInUseConnections) { - for (const connection of this[kCheckedOut]) { - if (connection.generation <= minGeneration) { - this[kCheckedOut].delete(connection); - connection.onError(new PoolClearedOnNetworkError(this)); - this.emit( - ConnectionPool.CONNECTION_CLOSED, - new ConnectionClosedEvent(this, connection, 'stale') - ); - } - } - - // TODO(NODE-4784): track pending connections and cancel - // this[kCancellationToken].emit('cancel'); } + + // TODO(NODE-4784): track pending connections and cancel + // this[kCancellationToken].emit('cancel'); } /** Close the pool */ diff --git a/test/integration/connection-monitoring-and-pooling/connection_monitoring_and_pooling.spec.test.ts b/test/integration/connection-monitoring-and-pooling/connection_monitoring_and_pooling.spec.test.ts index 9113b23aff..1d166eb614 100644 --- a/test/integration/connection-monitoring-and-pooling/connection_monitoring_and_pooling.spec.test.ts +++ b/test/integration/connection-monitoring-and-pooling/connection_monitoring_and_pooling.spec.test.ts @@ -24,6 +24,13 @@ const INTERRUPT_IN_USE_SKIPPED_TESTS: SkipDescription[] = [ description: 'clear with interruptInUseConnections = true closes pending connections', skipIfCondition: 'always', skipReason: 'TODO(NODE-4784): track and kill pending connections' + }, + { + description: + 'Pool clear SHOULD schedule the next background thread run immediately (interruptInUseConnections: false)', + skipIfCondition: 'always', + skipReason: + 'NodeJS does not have a background thread responsible for managing connections, and so already checked in connections are not pruned when in-use connections are interrupted.' } ]; From 8d181c01ce8ba6a51999e7ea5dbb82f5f52067d5 Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Tue, 22 Nov 2022 10:07:38 -0500 Subject: [PATCH 11/18] consolidate lb logic in pool clear - moves all load balanced specific logic into the same if-statement - throws an error if pool.clear() is called in lb mode without a serviceId. this should never happen, and previously we would fall through to non-lb mode logic. With this change, we now get a meaningful error. --- src/cmap/connection_pool.ts | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index 97c6c06364..bb2656f695 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -414,13 +414,18 @@ export class ConnectionPool extends TypedEventEmitter { * previous generation will eventually be pruned during subsequent checkouts. */ clear(options: { serviceId?: ObjectId; interruptInUseConnections?: boolean } = {}): void { - const { serviceId } = options; if (this.closed) { return; } // handle load balanced case - if (this.loadBalanced && serviceId) { + if (this.loadBalanced) { + const { serviceId } = options; + if (!serviceId) { + throw new MongoRuntimeError( + 'ConnectionPool.clear() called in load balanced mode with no serviceId.' + ); + } const sid = serviceId.toHexString(); const generation = this.serviceGenerations.get(sid); // Only need to worry if the generation exists, since it should From 02f954af7bd9b3a2271f84880140190dbf6804ef Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Tue, 22 Nov 2022 10:21:24 -0500 Subject: [PATCH 12/18] update comments for interruptInUseConnections --- src/cmap/connection_pool.ts | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index bb2656f695..c17b795efe 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -465,11 +465,9 @@ export class ConnectionPool extends TypedEventEmitter { } /** - * Closes all checked in perished connections in the pool with a resumable PoolClearedOnNetworkError. + * Closes all stale in-use connections in the pool with a resumable PoolClearedOnNetworkError. * - * If interruptInUseConnections is `true`, this method attempts to kill checked out connections as well. - * Only connections where `connection.generation <= minGeneration` are killed. Connections are closed with a - * resumable PoolClearedOnNetworkTimeoutError. + * Only connections where `connection.generation <= minGeneration` are killed. */ private interruptInUseConnections(minGeneration: number) { for (const connection of this[kCheckedOut]) { From 2ae7ae5fa4f3186094766080f7dd53078ce962d5 Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Tue, 22 Nov 2022 15:07:53 -0500 Subject: [PATCH 13/18] attach retryable write error label in constructor for error --- src/cmap/errors.ts | 4 ++-- src/sdam/server.ts | 2 -- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/src/cmap/errors.ts b/src/cmap/errors.ts index 0a518d0904..f6d2ed5888 100644 --- a/src/cmap/errors.ts +++ b/src/cmap/errors.ts @@ -33,6 +33,8 @@ export class PoolClearedError extends MongoNetworkError { : `Connection pool for ${pool.address} was cleared because another operation failed with: "${pool.serverError?.message}"`; super(errorMessage); this.address = pool.address; + + this.addErrorLabel(MongoErrorLabel.RetryableWriteError); } override get name(): string { @@ -47,8 +49,6 @@ export class PoolClearedError extends MongoNetworkError { export class PoolClearedOnNetworkError extends PoolClearedError { constructor(pool: ConnectionPool) { super(pool, `Connection to ${pool.address} interrupted due to server monitor timeout`); - - this.addErrorLabel(MongoErrorLabel.RetryableWriteError); } override get name(): string { diff --git a/src/sdam/server.ts b/src/sdam/server.ts index a2930caf8b..ae7a1fd5f6 100644 --- a/src/sdam/server.ts +++ b/src/sdam/server.ts @@ -354,8 +354,6 @@ export class Server extends TypedEventEmitter { } if (!(err instanceof PoolClearedError)) { this.handleError(err); - } else { - err.addErrorLabel(MongoErrorLabel.RetryableWriteError); } return cb(err); } From 408401809b12f5c898c85ad11cbae68463e2953b Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Tue, 22 Nov 2022 15:15:24 -0500 Subject: [PATCH 14/18] use checkIn in favor of manually emitting events --- src/cmap/connection_pool.ts | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index c17b795efe..be48864f2b 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -387,6 +387,9 @@ export class ConnectionPool extends TypedEventEmitter { * @param connection - The connection to check in */ checkIn(connection: Connection): void { + if (!this[kCheckedOut].has(connection)) { + return; + } const poolClosed = this.closed; const stale = this.connectionIsStale(connection); const willDestroy = !!(poolClosed || stale || connection.closed); @@ -472,12 +475,8 @@ export class ConnectionPool extends TypedEventEmitter { private interruptInUseConnections(minGeneration: number) { for (const connection of this[kCheckedOut]) { if (connection.generation <= minGeneration) { - this[kCheckedOut].delete(connection); + this.checkIn(connection); connection.onError(new PoolClearedOnNetworkError(this)); - this.emit( - ConnectionPool.CONNECTION_CLOSED, - new ConnectionClosedEvent(this, connection, 'stale') - ); } } From 2b6de4e3c6b0515fdb04971ef2ebfc8ca1a314b7 Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Tue, 22 Nov 2022 15:45:53 -0500 Subject: [PATCH 15/18] Skip failing interrupt in use test on lb topologies --- .../connection_monitoring_and_pooling.spec.test.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/integration/connection-monitoring-and-pooling/connection_monitoring_and_pooling.spec.test.ts b/test/integration/connection-monitoring-and-pooling/connection_monitoring_and_pooling.spec.test.ts index 1d166eb614..ac1066a168 100644 --- a/test/integration/connection-monitoring-and-pooling/connection_monitoring_and_pooling.spec.test.ts +++ b/test/integration/connection-monitoring-and-pooling/connection_monitoring_and_pooling.spec.test.ts @@ -12,7 +12,8 @@ const LB_SKIP_TESTS: SkipDescription[] = [ 'clearing a paused pool emits no events', 'after clear, cannot check out connections until pool ready', 'readying a ready pool emits no events', - 'error during minPoolSize population clears pool' + 'error during minPoolSize population clears pool', + 'Connections MUST be interrupted as soon as possible (interruptInUseConnections=true)' ].map(description => ({ description, skipIfCondition: 'loadBalanced', From 41c378150a5b41a2e86b45dec82b39e7e8551f9b Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Tue, 22 Nov 2022 17:08:43 -0500 Subject: [PATCH 16/18] remove unnecssary check in checkIn --- src/cmap/connection_pool.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index be48864f2b..63053c4cb2 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -399,10 +399,10 @@ export class ConnectionPool extends TypedEventEmitter { this[kConnections].unshift(connection); } - const wasConnectionDeleted = this[kCheckedOut].delete(connection); + this[kCheckedOut].delete(connection); this.emit(ConnectionPool.CONNECTION_CHECKED_IN, new ConnectionCheckedInEvent(this, connection)); - if (wasConnectionDeleted && willDestroy) { + if (willDestroy) { const reason = connection.closed ? 'error' : poolClosed ? 'poolClosed' : 'stale'; this.destroyConnection(connection, reason); } From 809803cdbfc4856c14cb5c3323d80b68182a2267 Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Mon, 28 Nov 2022 09:35:54 -0500 Subject: [PATCH 17/18] add type annotation to function --- src/cmap/connection_pool.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index 63053c4cb2..276712ea54 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -617,7 +617,7 @@ export class ConnectionPool extends TypedEventEmitter { * * @returns `true` if the connection was destroyed, `false` otherwise. */ - private destroyConnectionIfPerished(connection: Connection) { + private destroyConnectionIfPerished(connection: Connection): boolean { const isStale = this.connectionIsStale(connection); const isIdle = this.connectionIsIdle(connection); if (!isStale && !isIdle && !connection.closed) { From 6fa8e9b5f76cf1f4f9ef05b8f1b963198004b8c4 Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Mon, 28 Nov 2022 09:37:25 -0500 Subject: [PATCH 18/18] remove TODO comment in connection pool --- src/cmap/connection_pool.ts | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index 276712ea54..5c8cbc9765 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -479,9 +479,6 @@ export class ConnectionPool extends TypedEventEmitter { connection.onError(new PoolClearedOnNetworkError(this)); } } - - // TODO(NODE-4784): track pending connections and cancel - // this[kCancellationToken].emit('cancel'); } /** Close the pool */