From 9d7894b003bd808660c694787ab1586b1872f250 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Tue, 15 Feb 2022 17:54:32 -0500 Subject: [PATCH] wip --- docs/errors.md | 6 + global.d.ts | 51 +++-- src/change_stream.ts | 8 +- src/cmap/auth/mongo_credentials.ts | 3 +- src/cmap/auth/mongodb_aws.ts | 3 +- src/cmap/connection.ts | 90 +++++--- src/cmap/errors.ts | 3 + src/cmap/wire_protocol/constants.ts | 51 ++++- src/error.ts | 123 ++++++++--- src/index.ts | 3 + src/operations/add_user.ts | 4 +- src/operations/aggregate.ts | 6 +- src/operations/command.ts | 15 +- src/operations/delete.ts | 8 +- src/operations/distinct.ts | 3 +- src/operations/estimated_document_count.ts | 3 +- src/operations/execute_operation.ts | 172 +++++++-------- src/operations/find.ts | 12 +- src/operations/find_and_modify.ts | 8 +- src/operations/indexes.ts | 8 +- src/operations/list_collections.ts | 5 +- src/operations/map_reduce.ts | 3 +- src/operations/update.ts | 13 +- src/read_preference.ts | 3 +- src/sdam/common.ts | 6 - src/sdam/server.ts | 28 ++- src/sdam/server_description.ts | 5 +- src/sdam/server_selection.ts | 6 +- src/sdam/topology.ts | 67 ++---- src/sdam/topology_description.ts | 16 +- src/sessions.ts | 60 +++--- src/utils.ts | 37 +++- .../load_balancers.spec.test.js | 1 - .../retryable_writes.spec.test.js | 199 ------------------ .../retryable_writes.spec.test.ts | 33 ++- test/readme.md | 40 +++- test/tools/spec-runner/index.js | 32 ++- test/tools/unified-spec-runner/entities.ts | 2 +- test/tools/unified-spec-runner/runner.ts | 2 +- test/types/community/client.test-d.ts | 1 + test/unit/error.test.js | 63 +++--- test/unit/sdam/server_selection.test.js | 6 +- 42 files changed, 623 insertions(+), 585 deletions(-) delete mode 100644 test/integration/retryable-writes/retryable_writes.spec.test.js diff --git a/docs/errors.md b/docs/errors.md index e841b4b14b..3e36a8e132 100644 --- a/docs/errors.md +++ b/docs/errors.md @@ -14,6 +14,7 @@ - [`MongoDriverError`](#MongoDriverError) - [`MongoAPIError`](#MongoAPIError) - [`MongoRuntimeError`](#MongoRuntimeError) + - [`MongoUnexpectedServerResponseError`](#MongoUnexpectedServerResponseError) - [`MongoNetworkError`](#MongoNetworkError) - [`MongoServerError`](#MongoServerError) - [`MongoSystemError`](#MongoSystemError) @@ -90,6 +91,11 @@ This class should **never** be directly instantiated. | **MongoChangeStreamError** | Thrown when an error is encountered when operating on a ChangeStream. | | **MongoGridFSStreamError** | Thrown when an unexpected state is reached when operating on a GridFS Stream. | | **MongoGridFSChunkError** | Thrown when a malformed or invalid chunk is encountered when reading from a GridFS Stream. | +| **MongoUnexpectedServerResponseError** | Thrown when the driver receives a **parsable** response it did not expect from the server | + +### MongoUnexpectedServerResponseError + +xxx ### `MongoNetworkError` diff --git a/global.d.ts b/global.d.ts index 3f7f03eb62..e589ac331f 100644 --- a/global.d.ts +++ b/global.d.ts @@ -1,32 +1,40 @@ import { OneOrMore } from './src/mongo_types'; import type { TestConfiguration } from './test/tools/runner/config'; -type WithExclusion = `!${T}` +type WithExclusion = `!${T}`; /** Defined in test/tools/runner/filters/mongodb_topology_filter.js (topologyTypeToString) */ type TopologyTypes = 'single' | 'replicaset' | 'sharded' | 'load-balanced'; -type TopologyTypeRequirement = OneOrMore | OneOrMore> +type TopologyTypeRequirement = OneOrMore | OneOrMore>; declare global { -interface MongoDBMetadataUI { - requires?: { - topology?: TopologyTypeRequirement; - mongodb?: string; - os?: NodeJS.Platform | `!${NodeJS.Platform}`; - apiVersion?: '1'; - clientSideEncryption?: boolean; - serverless?: 'forbid' | 'allow' | 'require'; - auth: 'enabled' | 'disabled' - }; + interface MongoDBMetadataUI { + requires?: { + topology?: TopologyTypeRequirement; + mongodb?: string; + os?: NodeJS.Platform | `!${NodeJS.Platform}`; + apiVersion?: '1'; + clientSideEncryption?: boolean; + serverless?: 'forbid' | 'allow' | 'require'; + auth: 'enabled' | 'disabled'; + }; - sessions?: { - skipLeakTests?: boolean; - }; -} + sessions?: { + skipLeakTests?: boolean; + }; + } + + interface MetadataAndTest { + metadata: MongoDBMetadataUI; + test: Fn; + } + + namespace Chai { + interface Assertion { + /** @deprecated Used only by the legacy spec runner, the unified runner implements the unified spec expectations */ + matchMongoSpec: (anything: any) => Chai.Assertion; + } + } -interface MetadataAndTest { - metadata: MongoDBMetadataUI; - test: Fn; -} namespace Mocha { interface TestFunction { (title: string, metadata: MongoDBMetadataUI, fn: Mocha.Func): Mocha.Test; @@ -42,6 +50,9 @@ interface MetadataAndTest { interface Test { metadata: MongoDBMetadataUI; + + /** @deprecated Attach spec to a test if you need access to it in a beforeEach hook */ + spec: Record; } interface Runnable { diff --git a/src/change_stream.ts b/src/change_stream.ts index ee56af2bd9..76bc27f89a 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -2,6 +2,7 @@ import Denque = require('denque'); import type { Readable } from 'stream'; import type { Document, Timestamp } from './bson'; +import { MONGODB_WIRE_VERSION } from './cmap/wire_protocol/constants'; import { Collection } from './collection'; import { AbstractCursor, @@ -462,7 +463,10 @@ export class ChangeStreamCursor extends Abs const resumeKey = this.options.startAfter && !this.hasReceived ? 'startAfter' : 'resumeAfter'; Reflect.set(result, resumeKey, this.resumeToken); - } else if (this.startAtOperationTime && maxWireVersion(this.server) >= 7) { + } else if ( + this.startAtOperationTime && + maxWireVersion(this.server) >= MONGODB_WIRE_VERSION.REPLICA_SET_TRANSACTIONS + ) { result.startAtOperationTime = this.startAtOperationTime; } } @@ -513,7 +517,7 @@ export class ChangeStreamCursor extends Abs this.startAtOperationTime == null && this.resumeAfter == null && this.startAfter == null && - maxWireVersion(server) >= 7 + maxWireVersion(server) >= MONGODB_WIRE_VERSION.REPLICA_SET_TRANSACTIONS ) { this.startAtOperationTime = response.operationTime; } diff --git a/src/cmap/auth/mongo_credentials.ts b/src/cmap/auth/mongo_credentials.ts index 326f98350e..a9a48d03a8 100644 --- a/src/cmap/auth/mongo_credentials.ts +++ b/src/cmap/auth/mongo_credentials.ts @@ -2,6 +2,7 @@ import type { Document } from '../../bson'; import { MongoAPIError, MongoMissingCredentialsError } from '../../error'; import { emitWarningOnce } from '../../utils'; +import { MONGODB_WIRE_VERSION } from '../wire_protocol/constants'; import { AUTH_MECHS_AUTH_SRC_EXTERNAL, AuthMechanism } from './providers'; // https://github.com/mongodb/specifications/blob/master/source/auth/auth.rst @@ -16,7 +17,7 @@ function getDefaultAuthMechanism(hello?: Document): AuthMechanism { } // Fallback to legacy selection method. If wire version >= 3, use scram-sha-1 - if (hello.maxWireVersion >= 3) { + if (hello.maxWireVersion >= MONGODB_WIRE_VERSION.RELEASE_2_7_7) { return AuthMechanism.MONGODB_SCRAM_SHA1; } } diff --git a/src/cmap/auth/mongodb_aws.ts b/src/cmap/auth/mongodb_aws.ts index 0f274f1c22..b51daf821e 100644 --- a/src/cmap/auth/mongodb_aws.ts +++ b/src/cmap/auth/mongodb_aws.ts @@ -11,6 +11,7 @@ import { MongoRuntimeError } from '../../error'; import { Callback, maxWireVersion, ns } from '../../utils'; +import { MONGODB_WIRE_VERSION } from '../wire_protocol/constants'; import { AuthContext, AuthProvider } from './auth_provider'; import { MongoCredentials } from './mongo_credentials'; import { AuthMechanism } from './providers'; @@ -44,7 +45,7 @@ export class MongoDBAWS extends AuthProvider { } const { sign } = aws4; - if (maxWireVersion(connection) < 9) { + if (maxWireVersion(connection) < MONGODB_WIRE_VERSION.RESUMABLE_INITIAL_SYNC) { callback( new MongoCompatibilityError( 'MONGODB-AWS authentication requires MongoDB version 4.4 or later' diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index c8feb30898..0ff46e8ae5 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -53,6 +53,7 @@ import { import type { Stream } from './connect'; import { MessageStream, OperationDescription } from './message_stream'; import { StreamDescription, StreamDescriptionOptions } from './stream_description'; +import { MONGODB_WIRE_VERSION } from './wire_protocol/constants'; import { applyCommonQueryOptions, getReadPreference, isSharded } from './wire_protocol/shared'; /** @internal */ @@ -109,8 +110,10 @@ export interface CommandOptions extends BSONSerializeOptions { noResponse?: boolean; omitReadPreference?: boolean; - // FIXME: NODE-2802 - willRetryWrite?: boolean; + // FIXME(NODE-2802): Currently the CommandOptions take a property willRetryWrite which is a hint from executeOperation that the txnNum should be applied to this command. + // Applying a session to a command should happen as part of command construction, most likely in the CommandOperation#executeCommand method, + // where we have access to the details we need to determine if a txnNum should also be applied. + willRetryWrite?: true; // FIXME: NODE-2781 writeConcern?: WriteConcernOptions | WriteConcern | W; @@ -248,9 +251,35 @@ export class Connection extends TypedEventEmitter { /* ignore errors, listen to `close` instead */ }); - this[kMessageStream].on('error', error => this.handleIssue({ destroy: error })); - stream.on('close', () => this.handleIssue({ isClose: true })); - stream.on('timeout', () => this.handleIssue({ isTimeout: true, destroy: true })); + const eventHandlerFactory = (event: 'error' | 'close' | 'timeout') => + [ + event, + (error?: Error) => { + if (this.closed) { + return; + } + this.closed = true; + + switch (event) { + case 'error': + this.onError(error); + break; + case 'close': + this.onClose(); + break; + case 'timeout': + this.onTimeout(); + break; + } + + this[kQueue].clear(); + this.emit(Connection.CLOSE); + } + ] as const; + + this[kMessageStream].on(...eventHandlerFactory('error')); + stream.on(...eventHandlerFactory('close')); + stream.on(...eventHandlerFactory('timeout')); // hook the message stream up to the passed in stream stream.pipe(this[kMessageStream]); @@ -306,33 +335,29 @@ export class Connection extends TypedEventEmitter { this[kLastUseTime] = now(); } - handleIssue(issue: { isTimeout?: boolean; isClose?: boolean; destroy?: boolean | Error }): void { - if (this.closed) { - return; - } + onTimeout() { + this[kStream].destroy(); + const beforeHandshake = { + beforeHandshake: this.hello == null + }; + const msg = `connection ${this.id} to ${this.address} timed out`; - if (issue.destroy) { - this[kStream].destroy(typeof issue.destroy === 'boolean' ? undefined : issue.destroy); + for (const [, op] of this[kQueue]) { + op.cb(new MongoNetworkTimeoutError(msg, beforeHandshake)); } + } - this.closed = true; - + onClose() { for (const [, op] of this[kQueue]) { - if (issue.isTimeout) { - op.cb( - new MongoNetworkTimeoutError(`connection ${this.id} to ${this.address} timed out`, { - beforeHandshake: this.hello == null - }) - ); - } else if (issue.isClose) { - op.cb(new MongoNetworkError(`connection ${this.id} to ${this.address} closed`)); - } else { - op.cb(typeof issue.destroy === 'boolean' ? undefined : issue.destroy); - } + op.cb(new MongoNetworkError(`connection ${this.id} to ${this.address} closed`)); } + } - this[kQueue].clear(); - this.emit(Connection.CLOSE); + onError(error?: Error) { + this[kStream].destroy(error); + for (const [, op] of this[kQueue]) { + op.cb(error); + } } destroy(): void; @@ -541,7 +566,7 @@ export class Connection extends TypedEventEmitter { return; } - if (wireVersion < 4) { + if (wireVersion < MONGODB_WIRE_VERSION.FIND_COMMAND) { const getMoreOp = new GetMore(ns.toString(), cursorId, { numberToReturn: options.batchSize }); const queryOptions = applyCommonQueryOptions( {}, @@ -595,7 +620,7 @@ export class Connection extends TypedEventEmitter { throw new MongoRuntimeError(`Invalid list of cursor ids provided: ${cursorIds}`); } - if (maxWireVersion(this) < 4) { + if (maxWireVersion(this) < MONGODB_WIRE_VERSION.FIND_COMMAND) { try { write( this, @@ -653,12 +678,12 @@ export class CryptoConnection extends Connection { } const serverWireVersion = maxWireVersion(this); - if (serverWireVersion === 0) { + if (serverWireVersion === MONGODB_WIRE_VERSION.UNKNOWN) { // This means the initial handshake hasn't happened yet return super.command(ns, cmd, options, callback); } - if (serverWireVersion < 8) { + if (serverWireVersion < MONGODB_WIRE_VERSION.SHARDED_TRANSACTIONS) { callback( new MongoCompatibilityError('Auto-encryption requires a minimum MongoDB version of 4.2') ); @@ -695,7 +720,10 @@ function supportsOpMsg(conn: Connection) { return false; } - return maxWireVersion(conn) >= 6 && !description.__nodejs_mock_server__; + return ( + maxWireVersion(conn) >= MONGODB_WIRE_VERSION.SUPPORTS_OP_MSG && + !description.__nodejs_mock_server__ // mock server does not support OP_MSG + ); } function messageHandler(conn: Connection) { diff --git a/src/cmap/errors.ts b/src/cmap/errors.ts index 7d0319829f..c9c5b19e2d 100644 --- a/src/cmap/errors.ts +++ b/src/cmap/errors.ts @@ -3,6 +3,9 @@ import type { ConnectionPool } from './connection_pool'; /** * An error indicating a connection pool is closed + * + * @public + * * @category Error */ export class PoolClosedError extends MongoDriverError { diff --git a/src/cmap/wire_protocol/constants.ts b/src/cmap/wire_protocol/constants.ts index 4555bc11f6..8b27aabe6d 100644 --- a/src/cmap/wire_protocol/constants.ts +++ b/src/cmap/wire_protocol/constants.ts @@ -1,7 +1,54 @@ +/** + * All known MongoDB Wire versions, used to determine basic feature support + * - see the [server enum](https://github.com/mongodb/mongo/blob/fe4cf6134b16f102591053d6f4fe11e5cc0eb3ec/src/mongo/db/wire_version.h#L57) + * - (note: the is a link to a commit so you can find the file, you should then check the main branch) + */ +export const MONGODB_WIRE_VERSION = Object.freeze({ + /** A helper wire version, 0 means no information about what is supported on the server is known */ + UNKNOWN: 0, + /** Everything before we started tracking. */ + RELEASE_2_4_AND_BEFORE: 0, + /** The aggregation command may now be requested to return cursors. */ + AGG_RETURNS_CURSORS: 1, + /** insert, update, and delete batch command */ + BATCH_COMMANDS: 2, + /** support SCRAM-SHA1, listIndexes, listCollections, new explain */ + RELEASE_2_7_7: 3, + /** Support find and getMore commands, as well as OP_COMMAND in mongod (but not mongos). */ + FIND_COMMAND: 4, + /** Supports all write commands take a write concern. */ + COMMANDS_ACCEPT_WRITE_CONCERN: 5, + /** Supports the new OP_MSG wireprotocol (3.6+). */ + SUPPORTS_OP_MSG: 6, + /** Supports replica set transactions (4.0+). */ + REPLICA_SET_TRANSACTIONS: 7, + /** Supports sharded transactions (4.2+). */ + SHARDED_TRANSACTIONS: 8, + /** Supports resumable initial sync (4.4+). */ + RESUMABLE_INITIAL_SYNC: 9, + /** Supports features available from 4.7 and onwards. */ + WIRE_VERSION_47: 10, + /** Supports features available from 4.8 and onwards. */ + WIRE_VERSION_48: 11, + /** + * Supports features available from 4.9 and onwards. + * - EstimatedDocumentCountOperation can run as an aggregation + */ + WIRE_VERSION_49: 12, + /** + * Supports features available from 5.0 and onwards. + * - Writes to secondaries $out/$merge + * - Snapshot reads + */ + WIRE_VERSION_50: 13, + /** Supports features available from 5.1 and onwards. */ + WIRE_VERSION_51: 14 +} as const); + export const MIN_SUPPORTED_SERVER_VERSION = '3.6'; export const MAX_SUPPORTED_SERVER_VERSION = '5.1'; -export const MIN_SUPPORTED_WIRE_VERSION = 6; -export const MAX_SUPPORTED_WIRE_VERSION = 14; +export const MIN_SUPPORTED_WIRE_VERSION = MONGODB_WIRE_VERSION.SUPPORTS_OP_MSG; +export const MAX_SUPPORTED_WIRE_VERSION = MONGODB_WIRE_VERSION.WIRE_VERSION_51; export const OP_REPLY = 1; export const OP_UPDATE = 2001; export const OP_INSERT = 2002; diff --git a/src/error.ts b/src/error.ts index cfd70f32aa..cb91fa4f7b 100644 --- a/src/error.ts +++ b/src/error.ts @@ -1,4 +1,5 @@ import type { Document } from './bson'; +import { MONGODB_WIRE_VERSION } from './cmap/wire_protocol/constants'; import type { TopologyVersion } from './sdam/server_description'; import type { TopologyDescription } from './sdam/topology_description'; @@ -80,6 +81,17 @@ export const GET_MORE_RESUMABLE_CODES = new Set([ MONGODB_ERROR_CODES.CursorNotFound ]); +/** @public */ +export const MONGODB_ERROR_LABELS = Object.freeze({ + RetryableWriteError: 'RetryableWriteError', + TransientTransactionError: 'TransientTransactionError', + UnknownTransactionCommitResult: 'UnknownTransactionCommitResult', + ResumableChangeStreamError: 'ResumableChangeStreamError' +} as const); + +/** @public */ +export type MONGODB_ERROR_LABELS = typeof MONGODB_ERROR_LABELS[keyof typeof MONGODB_ERROR_LABELS]; + /** @public */ export interface ErrorDescription extends Document { message?: string; @@ -89,6 +101,8 @@ export interface ErrorDescription extends Document { errInfo?: Document; } +const kCreated = Symbol('created'); + /** * @public * @category Error @@ -99,6 +113,7 @@ export interface ErrorDescription extends Document { export class MongoError extends Error { /** @internal */ [kErrorLabels]: Set; + [kCreated]: Date; /** * This is a number in MongoServerError and a string in MongoDriverError * @privateRemarks @@ -113,6 +128,7 @@ export class MongoError extends Error { } else { super(message); } + this[kCreated] = new Date(); } get name(): string { @@ -130,7 +146,7 @@ export class MongoError extends Error { * @param label - The error label to check for * @returns returns true if the error has the provided error label */ - hasErrorLabel(label: string): boolean { + hasErrorLabel(label: MONGODB_ERROR_LABELS): boolean { if (this[kErrorLabels] == null) { return false; } @@ -238,7 +254,7 @@ export class MongoRuntimeError extends MongoDriverError { } /** - * An error generated when a batch command is reexecuted after one of the commands in the batch + * An error generated when a batch command is re-executed after one of the commands in the batch * has failed * * @public @@ -403,6 +419,32 @@ export class MongoGridFSChunkError extends MongoRuntimeError { } } +/** + * An error generated when a **parsable** unexpected response comes from the server. + * This is generally an error where the driver in a state expecting a certain behavior to occur in + * the next message from MongoDB but it receives something else. + * This error **does not** represent an issue with wire message formatting. + * + * #### Example + * When an operation fails, it is the driver's job to retry it. It must perform serverSelection + * again to make sure that it attempts the operation against a server in a good state. If server + * selection returns a server that does not support retryable operations, this error is used. + * This scenario is unlikely as retryable support would also have been determined on the first attempt + * but it is possible the state change could report a selectable server that does not support retries. + * + * @public + * @category Error + */ +export class MongoUnexpectedServerResponseError extends MongoRuntimeError { + constructor(message: string) { + super(message); + } + + get name(): string { + return 'MongoUnexpectedServerResponseError'; + } +} + /** * An error thrown when the user attempts to add options to a cursor that has already been * initialized @@ -688,8 +730,8 @@ export class MongoWriteConcernError extends MongoServerError { } } -// see: https://github.com/mongodb/specifications/blob/master/source/retryable-writes/retryable-writes.rst#terms -const RETRYABLE_ERROR_CODES = new Set([ +// https://github.com/mongodb/specifications/blob/master/source/retryable-reads/retryable-reads.rst#retryable-error +const RETRYABLE_READS_ERROR_CODES = new Set([ MONGODB_ERROR_CODES.HostUnreachable, MONGODB_ERROR_CODES.HostNotFound, MONGODB_ERROR_CODES.NetworkTimeout, @@ -703,26 +745,21 @@ const RETRYABLE_ERROR_CODES = new Set([ MONGODB_ERROR_CODES.NotPrimaryOrSecondary ]); +// see: https://github.com/mongodb/specifications/blob/master/source/retryable-writes/retryable-writes.rst#terms const RETRYABLE_WRITE_ERROR_CODES = new Set([ - MONGODB_ERROR_CODES.InterruptedAtShutdown, - MONGODB_ERROR_CODES.InterruptedDueToReplStateChange, - MONGODB_ERROR_CODES.NotWritablePrimary, - MONGODB_ERROR_CODES.NotPrimaryNoSecondaryOk, - MONGODB_ERROR_CODES.NotPrimaryOrSecondary, - MONGODB_ERROR_CODES.PrimarySteppedDown, - MONGODB_ERROR_CODES.ShutdownInProgress, - MONGODB_ERROR_CODES.HostNotFound, - MONGODB_ERROR_CODES.HostUnreachable, - MONGODB_ERROR_CODES.NetworkTimeout, - MONGODB_ERROR_CODES.SocketException, + ...RETRYABLE_READS_ERROR_CODES, MONGODB_ERROR_CODES.ExceededTimeLimit ]); -export function isRetryableEndTransactionError(error: MongoError): boolean { - return error.hasErrorLabel('RetryableWriteError'); -} - -export function isRetryableWriteError(error: MongoError): boolean { +export function isRetryableWriteError(error: MongoError, maxWireVersion: number): boolean { + if (maxWireVersion >= MONGODB_WIRE_VERSION.RESUMABLE_INITIAL_SYNC) { + // After 4.4 the error label is the only source of truth for retry writes + return error.hasErrorLabel(MONGODB_ERROR_LABELS.RetryableWriteError); + } else if (error.hasErrorLabel(MONGODB_ERROR_LABELS.RetryableWriteError)) { + // Before 4.4 the error label can be one way of identifying retry + // so we can return true if we have the label, but fall back to code checking below + return true; + } if (error instanceof MongoWriteConcernError) { return RETRYABLE_WRITE_ERROR_CODES.has(error.result?.code ?? error.code ?? 0); } @@ -730,14 +767,31 @@ export function isRetryableWriteError(error: MongoError): boolean { } /** Determines whether an error is something the driver should attempt to retry */ -export function isRetryableError(error: MongoError): boolean { - return ( - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - (typeof error.code === 'number' && RETRYABLE_ERROR_CODES.has(error.code!)) || - error instanceof MongoNetworkError || - !!error.message.match(new RegExp(LEGACY_NOT_WRITABLE_PRIMARY_ERROR_MESSAGE)) || - !!error.message.match(new RegExp(NODE_IS_RECOVERING_ERROR_MESSAGE)) - ); +export function isRetryableReadError(error: MongoError): boolean { + const hasRetryableErrorCode = + typeof error.code === 'number' ? RETRYABLE_READS_ERROR_CODES.has(error.code) : false; + if (hasRetryableErrorCode) { + return true; + } + + const isNetworkError = error instanceof MongoNetworkError; + if (isNetworkError) { + return true; + } + + const legacyNotWritablePrimaryRegExp = new RegExp(LEGACY_NOT_WRITABLE_PRIMARY_ERROR_MESSAGE); + const isNotWritablePrimaryError = legacyNotWritablePrimaryRegExp.test(error.message); + if (isNotWritablePrimaryError) { + return true; + } + + const nodeIsRecoveringRegExp = new RegExp(NODE_IS_RECOVERING_ERROR_MESSAGE); + const isNodeIsRecoveringError = nodeIsRecoveringRegExp.test(error.message); + if (isNodeIsRecoveringError) { + return true; + } + + return false; } const SDAM_RECOVERING_CODES = new Set([ @@ -748,7 +802,7 @@ const SDAM_RECOVERING_CODES = new Set([ MONGODB_ERROR_CODES.NotPrimaryOrSecondary ]); -const SDAM_NOTPRIMARY_CODES = new Set([ +const SDAM_NOT_PRIMARY_CODES = new Set([ MONGODB_ERROR_CODES.NotWritablePrimary, MONGODB_ERROR_CODES.NotPrimaryNoSecondaryOk, MONGODB_ERROR_CODES.LegacyNotPrimary @@ -774,7 +828,7 @@ function isRecoveringError(err: MongoError) { function isNotWritablePrimaryError(err: MongoError) { if (typeof err.code === 'number') { // If any error code exists, we ignore the error.message - return SDAM_NOTPRIMARY_CODES.has(err.code); + return SDAM_NOT_PRIMARY_CODES.has(err.code); } if (isRecoveringError(err)) { @@ -826,12 +880,15 @@ export function isResumableError(error?: MongoError, wireVersion?: number): bool return true; } - if (wireVersion != null && wireVersion >= 9) { + if (wireVersion != null && wireVersion >= MONGODB_WIRE_VERSION.RESUMABLE_INITIAL_SYNC) { // DRIVERS-1308: For 4.4 drivers running against 4.4 servers, drivers will add a special case to treat the CursorNotFound error code as resumable - if (error && error instanceof MongoError && error.code === 43) { + if (error && error instanceof MongoError && error.code === MONGODB_ERROR_CODES.CursorNotFound) { return true; } - return error instanceof MongoError && error.hasErrorLabel('ResumableChangeStreamError'); + return ( + error instanceof MongoError && + error.hasErrorLabel(MONGODB_ERROR_LABELS.ResumableChangeStreamError) + ); } if (error && typeof error.code === 'number') { diff --git a/src/index.ts b/src/index.ts index 4a82c1bcbd..36ad2bd7e0 100644 --- a/src/index.ts +++ b/src/index.ts @@ -43,6 +43,7 @@ export { MongoCompatibilityError, MongoCursorExhaustedError, MongoCursorInUseError, + MONGODB_ERROR_LABELS, MongoDecompressionError, MongoDriverError, MongoError, @@ -62,8 +63,10 @@ export { MongoServerError, MongoServerSelectionError, MongoSystemError, + MongoTailableCursorError, MongoTopologyClosedError, MongoTransactionError, + MongoUnexpectedServerResponseError, MongoWriteConcernError } from './error'; export { diff --git a/src/operations/add_user.ts b/src/operations/add_user.ts index c9b4f49fa3..afdf90b226 100644 --- a/src/operations/add_user.ts +++ b/src/operations/add_user.ts @@ -1,6 +1,7 @@ import * as crypto from 'crypto'; import type { Document } from '../bson'; +import { MONGODB_WIRE_VERSION } from '../cmap/wire_protocol/constants'; import type { Db } from '../db'; import { MongoInvalidArgumentError } from '../error'; import type { Server } from '../sdam/server'; @@ -75,7 +76,8 @@ export class AddUserOperation extends CommandOperation { roles = Array.isArray(options.roles) ? options.roles : [options.roles]; } - const digestPassword = getTopology(db).lastHello().maxWireVersion >= 7; + const digestPassword = + getTopology(db).lastHello().maxWireVersion >= MONGODB_WIRE_VERSION.REPLICA_SET_TRANSACTIONS; let userPassword = password; diff --git a/src/operations/aggregate.ts b/src/operations/aggregate.ts index 631926ce71..cffe73e93b 100644 --- a/src/operations/aggregate.ts +++ b/src/operations/aggregate.ts @@ -1,4 +1,5 @@ import type { Document } from '../bson'; +import { MONGODB_WIRE_VERSION } from '../cmap/wire_protocol/constants'; import { MongoInvalidArgumentError } from '../error'; import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; @@ -9,7 +10,6 @@ import { Aspect, defineAspects, Hint } from './operation'; /** @internal */ export const DB_AGGREGATE_COLLECTION = 1 as const; -const MIN_WIRE_VERSION_$OUT_READ_CONCERN_SUPPORT = 8 as const; /** @public */ export interface AggregateOptions extends CommandOperationOptions { @@ -91,11 +91,11 @@ export class AggregateOperation extends CommandOperation { const serverWireVersion = maxWireVersion(server); const command: Document = { aggregate: this.target, pipeline: this.pipeline }; - if (this.hasWriteStage && serverWireVersion < MIN_WIRE_VERSION_$OUT_READ_CONCERN_SUPPORT) { + if (this.hasWriteStage && serverWireVersion < MONGODB_WIRE_VERSION.SHARDED_TRANSACTIONS) { this.readConcern = undefined; } - if (serverWireVersion >= 5) { + if (serverWireVersion >= MONGODB_WIRE_VERSION.COMMANDS_ACCEPT_WRITE_CONCERN) { if (this.hasWriteStage && this.writeConcern) { Object.assign(command, { writeConcern: this.writeConcern }); } diff --git a/src/operations/command.ts b/src/operations/command.ts index 2abb500b5e..8a7ab5da8e 100644 --- a/src/operations/command.ts +++ b/src/operations/command.ts @@ -1,11 +1,11 @@ import type { BSONSerializeOptions, Document } from '../bson'; +import { MONGODB_WIRE_VERSION } from '../cmap/wire_protocol/constants'; import { MongoCompatibilityError, MongoInvalidArgumentError } from '../error'; import { Explain, ExplainOptions } from '../explain'; import type { Logger } from '../logger'; import { ReadConcern } from '../read_concern'; import type { ReadPreference } from '../read_preference'; import type { Server } from '../sdam/server'; -import { MIN_SECONDARY_WRITE_WIRE_VERSION } from '../sdam/server_selection'; import type { ClientSession } from '../sessions'; import { Callback, @@ -18,8 +18,6 @@ import { WriteConcern, WriteConcernOptions } from '../write_concern'; import type { ReadConcernLike } from './../read_concern'; import { AbstractOperation, Aspect, OperationOptions } from './operation'; -const SUPPORTS_WRITE_CONCERN_AND_COLLATION = 5; - /** @public */ export interface CollationOptions { locale: string; @@ -133,11 +131,14 @@ export abstract class CommandOperation extends AbstractOperation { Object.assign(cmd, { readConcern: this.readConcern }); } - if (this.trySecondaryWrite && serverWireVersion < MIN_SECONDARY_WRITE_WIRE_VERSION) { + if (this.trySecondaryWrite && serverWireVersion < MONGODB_WIRE_VERSION.WIRE_VERSION_50) { options.omitReadPreference = true; } - if (options.collation && serverWireVersion < SUPPORTS_WRITE_CONCERN_AND_COLLATION) { + if ( + options.collation && + serverWireVersion < MONGODB_WIRE_VERSION.COMMANDS_ACCEPT_WRITE_CONCERN + ) { callback( new MongoCompatibilityError( `Server ${server.name}, which reports wire version ${serverWireVersion}, does not support collation` @@ -150,7 +151,7 @@ export abstract class CommandOperation extends AbstractOperation { Object.assign(cmd, { writeConcern: this.writeConcern }); } - if (serverWireVersion >= SUPPORTS_WRITE_CONCERN_AND_COLLATION) { + if (serverWireVersion >= MONGODB_WIRE_VERSION.COMMANDS_ACCEPT_WRITE_CONCERN) { if ( options.collation && typeof options.collation === 'object' && @@ -169,7 +170,7 @@ export abstract class CommandOperation extends AbstractOperation { } if (this.hasAspect(Aspect.EXPLAINABLE) && this.explain) { - if (serverWireVersion < 6 && cmd.aggregate) { + if (serverWireVersion < MONGODB_WIRE_VERSION.SUPPORTS_OP_MSG && cmd.aggregate) { // Prior to 3.6, with aggregate, verbosity is ignored, and we must pass in "explain: true" cmd.explain = true; } else { diff --git a/src/operations/delete.ts b/src/operations/delete.ts index 20e7d61a49..8af8ba4363 100644 --- a/src/operations/delete.ts +++ b/src/operations/delete.ts @@ -1,4 +1,5 @@ import type { Document } from '../bson'; +import { MONGODB_WIRE_VERSION } from '../cmap/wire_protocol/constants'; import type { Collection } from '../collection'; import { MongoCompatibilityError, MongoServerError } from '../error'; import type { Server } from '../sdam/server'; @@ -80,7 +81,7 @@ export class DeleteOperation extends CommandOperation { command.let = options.let; } - if (options.explain != null && maxWireVersion(server) < 3) { + if (options.explain != null && maxWireVersion(server) < MONGODB_WIRE_VERSION.RELEASE_2_7_7) { return callback ? callback( new MongoCompatibilityError(`Server ${server.name} does not support explain on delete`) @@ -89,7 +90,10 @@ export class DeleteOperation extends CommandOperation { } const unacknowledgedWrite = this.writeConcern && this.writeConcern.w === 0; - if (unacknowledgedWrite || maxWireVersion(server) < 5) { + if ( + unacknowledgedWrite || + maxWireVersion(server) < MONGODB_WIRE_VERSION.COMMANDS_ACCEPT_WRITE_CONCERN + ) { if (this.statements.find((o: Document) => o.hint)) { callback(new MongoCompatibilityError(`Servers < 3.4 do not support hint on delete`)); return; diff --git a/src/operations/distinct.ts b/src/operations/distinct.ts index 29f6619c7c..a9540041f5 100644 --- a/src/operations/distinct.ts +++ b/src/operations/distinct.ts @@ -1,4 +1,5 @@ import type { Document } from '../bson'; +import { MONGODB_WIRE_VERSION } from '../cmap/wire_protocol/constants'; import type { Collection } from '../collection'; import { MongoCompatibilityError } from '../error'; import type { Server } from '../sdam/server'; @@ -67,7 +68,7 @@ export class DistinctOperation extends CommandOperation { return callback(err); } - if (this.explain && maxWireVersion(server) < 4) { + if (this.explain && maxWireVersion(server) < MONGODB_WIRE_VERSION.FIND_COMMAND) { callback( new MongoCompatibilityError(`Server ${server.name} does not support explain on distinct`) ); diff --git a/src/operations/estimated_document_count.ts b/src/operations/estimated_document_count.ts index ef9b76a0d5..3424ceb971 100644 --- a/src/operations/estimated_document_count.ts +++ b/src/operations/estimated_document_count.ts @@ -1,4 +1,5 @@ import type { Document } from '../bson'; +import { MONGODB_WIRE_VERSION } from '../cmap/wire_protocol/constants'; import type { Collection } from '../collection'; import type { MongoServerError } from '../error'; import type { Server } from '../sdam/server'; @@ -29,7 +30,7 @@ export class EstimatedDocumentCountOperation extends CommandOperation { } execute(server: Server, session: ClientSession, callback: Callback): void { - if (maxWireVersion(server) < 12) { + if (maxWireVersion(server) < MONGODB_WIRE_VERSION.WIRE_VERSION_49) { return this.executeLegacy(server, session, callback); } const pipeline = [{ $collStats: { count: {} } }, { $group: { _id: 1, n: { $sum: '$count' } } }]; diff --git a/src/operations/execute_operation.ts b/src/operations/execute_operation.ts index ea1eba4f2f..28f0ab75d7 100644 --- a/src/operations/execute_operation.ts +++ b/src/operations/execute_operation.ts @@ -1,6 +1,8 @@ import type { Document } from '../bson'; +import { MONGODB_WIRE_VERSION } from '../cmap/wire_protocol/constants'; import { - isRetryableError, + isRetryableReadError, + isRetryableWriteError, MongoCompatibilityError, MONGODB_ERROR_CODES, MongoError, @@ -8,7 +10,8 @@ import { MongoNetworkError, MongoRuntimeError, MongoServerError, - MongoTransactionError + MongoTransactionError, + MongoUnexpectedServerResponseError } from '../error'; import { ReadPreference } from '../read_preference'; import type { Server } from '../sdam/server'; @@ -72,16 +75,16 @@ export function executeOperation< TResult = ResultTypeFromOperation >(topology: Topology, operation: T, callback?: Callback): Promise | void { if (!(operation instanceof AbstractOperation)) { - // TODO(NODE-3483) + // TODO(NODE-3483): Extend MongoRuntimeError throw new MongoRuntimeError('This method requires a valid operation instance'); } - return maybePromise(callback, cb => { + return maybePromise(callback, callback => { if (topology.shouldCheckForSessionSupport()) { return topology.selectServer(ReadPreference.primaryPreferred, err => { - if (err) return cb(err); + if (err) return callback(err); - executeOperation(topology, operation, cb); + executeOperation(topology, operation, callback); }); } @@ -94,63 +97,56 @@ export function executeOperation< owner = Symbol(); session = topology.startSession({ owner, explicit: false }); } else if (session.hasEnded) { - return cb(new MongoExpiredSessionError('Use of expired sessions is not permitted')); + return callback(new MongoExpiredSessionError('Use of expired sessions is not permitted')); } else if (session.snapshotEnabled && !topology.capabilities.supportsSnapshotReads) { - return cb(new MongoCompatibilityError('Snapshot reads require MongoDB 5.0 or later')); + return callback(new MongoCompatibilityError('Snapshot reads require MongoDB 5.0 or later')); } } else if (session) { // If the user passed an explicit session and we are still, after server selection, // trying to run against a topology that doesn't support sessions we error out. - return cb(new MongoCompatibilityError('Current topology does not support sessions')); + return callback(new MongoCompatibilityError('Current topology does not support sessions')); } try { - executeWithServerSelection(topology, session, operation, (err, result) => { - if (session && session.owner && session.owner === owner) { - return session.endSession(err2 => cb(err2 || err, result)); + executeWithServerSelection(topology, session, operation, (error, result) => { + if (session && session.owner != null && session.owner === owner) { + return session.endSession(endSessionError => callback(endSessionError ?? error, result)); } - cb(err, result); + callback(error, result); }); - } catch (e) { - if (session && session.owner && session.owner === owner) { + } catch (error) { + // TODO: we shouldn't catch here. + // We catch and NOT finally, cus its only in case of an error that we want to end the session + if (session?.owner != null && session?.owner === owner) { session.endSession(); } - - throw e; } }); } -function supportsRetryableReads(server: Server) { - return maxWireVersion(server) >= 6; +function supportsRetryableReads(server?: Server) { + return maxWireVersion(server) >= MONGODB_WIRE_VERSION.SUPPORTS_OP_MSG; } -function executeWithServerSelection( +function executeWithServerSelection( topology: Topology, session: ClientSession, operation: AbstractOperation, - callback: Callback + callback: Callback ) { - const readPreference = operation.readPreference || ReadPreference.primary; - const inTransaction = session && session.inTransaction(); + const readPreference = operation.readPreference ?? ReadPreference.primary; + const inTransaction = !!session?.inTransaction(); if (inTransaction && !readPreference.equals(ReadPreference.primary)) { - callback( + return callback( new MongoTransactionError( `Read preference in a transaction must be primary, not: ${readPreference.mode}` ) ); - - return; } - if ( - session && - session.isPinned && - session.transaction.isCommitted && - !operation.bypassPinningCheck - ) { + if (session?.isPinned && session?.transaction.isCommitted && !operation.bypassPinningCheck) { session.unpin(); } @@ -170,60 +166,68 @@ function executeWithServerSelection( } const serverSelectionOptions = { session }; - function callbackWithRetry(err?: any, result?: any) { - if (err == null) { - return callback(undefined, result); - } + function retryOperation(originalError: MongoError, maxWireVersion: number) { + const isWriteOperation = operation.hasAspect(Aspect.WRITE_OPERATION); + const isReadOperation = operation.hasAspect(Aspect.READ_OPERATION); - const hasReadAspect = operation.hasAspect(Aspect.READ_OPERATION); - const hasWriteAspect = operation.hasAspect(Aspect.WRITE_OPERATION); - const itShouldRetryWrite = shouldRetryWrite(err); + if (isWriteOperation && !isRetryableWriteError(originalError, maxWireVersion)) { + return callback(originalError); + } - if ((hasReadAspect && !isRetryableError(err)) || (hasWriteAspect && !itShouldRetryWrite)) { - return callback(err); + if (isReadOperation && !isRetryableReadError(originalError)) { + return callback(originalError); } if ( - hasWriteAspect && - itShouldRetryWrite && - err.code === MMAPv1_RETRY_WRITES_ERROR_CODE && - err.errmsg.match(/Transaction numbers/) + isWriteOperation && + originalError.code === MMAPv1_RETRY_WRITES_ERROR_CODE && + /Transaction numbers/.test(originalError.errmsg) ) { - callback( + return callback( new MongoServerError({ message: MMAPv1_RETRY_WRITES_ERROR_MESSAGE, errmsg: MMAPv1_RETRY_WRITES_ERROR_MESSAGE, - originalError: err + originalError }) ); + } - return; + if ( + originalError instanceof MongoNetworkError && + session.isPinned && + !session.inTransaction() && + operation.hasAspect(Aspect.CURSOR_CREATING) + ) { + // If we have a cursor and the initial command fails with a network error, + // we can retry it on another connection. So we need to check it back in, clear the + // pool for the service id, and retry again. + session.unpin({ force: true, forceClear: true }); } // select a new server, and attempt to retry the operation - topology.selectServer(selector, serverSelectionOptions, (e?: any, server?: any) => { - if ( - e || - (operation.hasAspect(Aspect.READ_OPERATION) && !supportsRetryableReads(server)) || - (operation.hasAspect(Aspect.WRITE_OPERATION) && !supportsRetryableWrites(server)) - ) { - callback(e); - return; + topology.selectServer(selector, serverSelectionOptions, (error?: Error, server?: Server) => { + if (!error && operation.hasAspect(Aspect.READ_OPERATION) && !supportsRetryableReads(server)) { + return callback( + new MongoUnexpectedServerResponseError('Selected server does not support retryable reads') + ); } - // If we have a cursor and the initial command fails with a network error, - // we can retry it on another connection. So we need to check it back in, clear the - // pool for the service id, and retry again. if ( - err && - err instanceof MongoNetworkError && - server.loadBalanced && - session && - session.isPinned && - !session.inTransaction() && - operation.hasAspect(Aspect.CURSOR_CREATING) + !error && + operation.hasAspect(Aspect.WRITE_OPERATION) && + !supportsRetryableWrites(server) ) { - session.unpin({ force: true, forceClear: true }); + return callback( + new MongoUnexpectedServerResponseError( + 'Selected server does not support retryable writes' + ) + ); + } + + if (error || !server) { + return callback( + error ?? new MongoUnexpectedServerResponseError('Server selection failed without error') + ); } operation.execute(server, session, callback); @@ -233,8 +237,7 @@ function executeWithServerSelection( if ( readPreference && !readPreference.equals(ReadPreference.primary) && - session && - session.inTransaction() + session?.inTransaction() ) { callback( new MongoTransactionError( @@ -246,21 +249,20 @@ function executeWithServerSelection( } // select a server, and execute the operation against it - topology.selectServer(selector, serverSelectionOptions, (err?: any, server?: any) => { - if (err) { - callback(err); - return; + topology.selectServer(selector, serverSelectionOptions, (error, server) => { + if (error || !server) { + return callback(error); } if (session && operation.hasAspect(Aspect.RETRYABLE)) { const willRetryRead = - topology.s.options.retryReads !== false && + topology.s.options.retryReads !== false && // why is this not false !inTransaction && supportsRetryableReads(server) && operation.canRetryRead; const willRetryWrite = - topology.s.options.retryWrites === true && + topology.s.options.retryWrites === true && // and this is true !inTransaction && supportsRetryableWrites(server) && operation.canRetryWrite; @@ -274,15 +276,21 @@ function executeWithServerSelection( session.incrementTransactionNumber(); } - operation.execute(server, session, callbackWithRetry); - return; + // You have to save the max wire version before the + // Server might become marked Unknown by an error + const knownMaxWireVersion = maxWireVersion(server); + + return operation.execute(server, session, (error, result) => { + if (error instanceof MongoError) { + return retryOperation(error, knownMaxWireVersion); + } else if (error) { + return callback(error); + } + callback(undefined, result); + }); } } - operation.execute(server, session, callback); + return operation.execute(server, session, callback); }); } - -function shouldRetryWrite(err: any) { - return err instanceof MongoError && err.hasErrorLabel('RetryableWriteError'); -} diff --git a/src/operations/find.ts b/src/operations/find.ts index e223eab83e..76c664627e 100644 --- a/src/operations/find.ts +++ b/src/operations/find.ts @@ -1,4 +1,5 @@ import type { Document } from '../bson'; +import { MONGODB_WIRE_VERSION } from '../cmap/wire_protocol/constants'; import { isSharded } from '../cmap/wire_protocol/shared'; import type { Collection } from '../collection'; import { MongoCompatibilityError, MongoInvalidArgumentError } from '../error'; @@ -68,8 +69,6 @@ export interface FindOptions extends Comman let?: Document; } -const SUPPORTS_WRITE_CONCERN_AND_COLLATION = 5; - /** @internal */ export class FindOperation extends CommandOperation { options: FindOptions; @@ -109,14 +108,17 @@ export class FindOperation extends CommandOperation { const serverWireVersion = maxWireVersion(server); const options = this.options; - if (options.allowDiskUse != null && serverWireVersion < 4) { + if (options.allowDiskUse != null && serverWireVersion < MONGODB_WIRE_VERSION.FIND_COMMAND) { callback( new MongoCompatibilityError('Option "allowDiskUse" is not supported on MongoDB < 3.2') ); return; } - if (options.collation && serverWireVersion < SUPPORTS_WRITE_CONCERN_AND_COLLATION) { + if ( + options.collation && + serverWireVersion < MONGODB_WIRE_VERSION.COMMANDS_ACCEPT_WRITE_CONCERN + ) { callback( new MongoCompatibilityError( `Server ${server.name}, which reports wire version ${serverWireVersion}, does not support collation` @@ -126,7 +128,7 @@ export class FindOperation extends CommandOperation { return; } - if (serverWireVersion < 4) { + if (serverWireVersion < MONGODB_WIRE_VERSION.FIND_COMMAND) { if (this.readConcern && this.readConcern.level !== 'local') { callback( new MongoCompatibilityError( diff --git a/src/operations/find_and_modify.ts b/src/operations/find_and_modify.ts index 77314bf144..eba6b5c99d 100644 --- a/src/operations/find_and_modify.ts +++ b/src/operations/find_and_modify.ts @@ -1,4 +1,5 @@ import type { Document } from '../bson'; +import { MONGODB_WIRE_VERSION } from '../cmap/wire_protocol/constants'; import type { Collection } from '../collection'; import { MongoCompatibilityError, MongoInvalidArgumentError } from '../error'; import { ReadPreference } from '../read_preference'; @@ -170,7 +171,10 @@ class FindAndModifyOperation extends CommandOperation { // TODO: once this method becomes a CommandOperation we will have the server // in place to check. const unacknowledgedWrite = this.writeConcern?.w === 0; - if (unacknowledgedWrite || maxWireVersion(server) < 8) { + if ( + unacknowledgedWrite || + maxWireVersion(server) < MONGODB_WIRE_VERSION.SHARDED_TRANSACTIONS + ) { callback( new MongoCompatibilityError( 'The current topology does not support a hint on findAndModify commands' @@ -183,7 +187,7 @@ class FindAndModifyOperation extends CommandOperation { cmd.hint = options.hint; } - if (this.explain && maxWireVersion(server) < 4) { + if (this.explain && maxWireVersion(server) < MONGODB_WIRE_VERSION.FIND_COMMAND) { callback( new MongoCompatibilityError( `Server ${server.name} does not support explain on findAndModify` diff --git a/src/operations/indexes.ts b/src/operations/indexes.ts index d4a8cae1ba..6efa5bd08c 100644 --- a/src/operations/indexes.ts +++ b/src/operations/indexes.ts @@ -1,4 +1,5 @@ import type { Document } from '../bson'; +import { MONGODB_WIRE_VERSION } from '../cmap/wire_protocol/constants'; import type { Collection } from '../collection'; import { AbstractCursor } from '../cursor/abstract_cursor'; import type { Db } from '../db'; @@ -24,7 +25,6 @@ import { indexInformation, IndexInformationOptions } from './common_functions'; import { executeOperation, ExecutionResult } from './execute_operation'; import { AbstractOperation, Aspect, defineAspects } from './operation'; -const LIST_INDEXES_WIRE_VERSION = 3; const VALID_INDEX_OPTIONS = new Set([ 'background', 'unique', @@ -213,7 +213,7 @@ export class CreateIndexesOperation< // Ensure we generate the correct name if the parameter is not set for (let i = 0; i < indexes.length; i++) { // Did the user pass in a collation, check if our write server supports it - if (indexes[i].collation && serverWireVersion < 5) { + if (indexes[i].collation && serverWireVersion < MONGODB_WIRE_VERSION.RELEASE_2_7_7) { callback( new MongoCompatibilityError( `Server ${server.name}, which reports wire version ${serverWireVersion}, ` + @@ -238,7 +238,7 @@ export class CreateIndexesOperation< const cmd: Document = { createIndexes: this.collectionName, indexes }; if (options.commitQuorum != null) { - if (serverWireVersion < 9) { + if (serverWireVersion < MONGODB_WIRE_VERSION.RESUMABLE_INITIAL_SYNC) { callback( new MongoCompatibilityError( 'Option `commitQuorum` for `createIndexes` not supported on servers < 4.4' @@ -385,7 +385,7 @@ export class ListIndexesOperation extends CommandOperation { execute(server: Server, session: ClientSession, callback: Callback): void { const serverWireVersion = maxWireVersion(server); - if (serverWireVersion < LIST_INDEXES_WIRE_VERSION) { + if (serverWireVersion < MONGODB_WIRE_VERSION.RELEASE_2_7_7) { const systemIndexesNS = this.collectionNamespace.withCollection('system.indexes'); const collectionNS = this.collectionNamespace.toString(); diff --git a/src/operations/list_collections.ts b/src/operations/list_collections.ts index 16ed86d4d6..1a02f04992 100644 --- a/src/operations/list_collections.ts +++ b/src/operations/list_collections.ts @@ -1,4 +1,5 @@ import type { Binary, Document } from '../bson'; +import { MONGODB_WIRE_VERSION } from '../cmap/wire_protocol/constants'; import * as CONSTANTS from '../constants'; import { AbstractCursor } from '../cursor/abstract_cursor'; import type { Db } from '../db'; @@ -9,8 +10,6 @@ import { CommandOperation, CommandOperationOptions } from './command'; import { executeOperation, ExecutionResult } from './execute_operation'; import { Aspect, defineAspects } from './operation'; -const LIST_COLLECTIONS_WIRE_VERSION = 3; - /** @public */ export interface ListCollectionsOptions extends CommandOperationOptions { /** Since 4.0: If true, will only return the collection name in the response, and will omit additional info */ @@ -45,7 +44,7 @@ export class ListCollectionsOperation extends CommandOperation { } execute(server: Server, session: ClientSession, callback: Callback): void { - if (maxWireVersion(server) < LIST_COLLECTIONS_WIRE_VERSION) { + if (maxWireVersion(server) < MONGODB_WIRE_VERSION.RELEASE_2_7_7) { let filter = this.filter; const databaseName = this.db.s.namespace.db; diff --git a/src/operations/map_reduce.ts b/src/operations/map_reduce.ts index dcb52f6c01..212c258d2e 100644 --- a/src/operations/map_reduce.ts +++ b/src/operations/map_reduce.ts @@ -1,5 +1,6 @@ import type { ObjectId } from '../bson'; import { Code, Document } from '../bson'; +import { MONGODB_WIRE_VERSION } from '../cmap/wire_protocol/constants'; import type { Collection } from '../collection'; import { Db } from '../db'; import { MongoCompatibilityError, MongoServerError } from '../error'; @@ -165,7 +166,7 @@ export class MapReduceOperation extends CommandOperation return callback(err); } - if (this.explain && maxWireVersion(server) < 9) { + if (this.explain && maxWireVersion(server) < MONGODB_WIRE_VERSION.RESUMABLE_INITIAL_SYNC) { callback( new MongoCompatibilityError(`Server ${server.name} does not support explain on mapReduce`) ); diff --git a/src/operations/update.ts b/src/operations/update.ts index 0026245bfb..603da531d4 100644 --- a/src/operations/update.ts +++ b/src/operations/update.ts @@ -1,4 +1,5 @@ import type { Document, ObjectId } from '../bson'; +import { MONGODB_WIRE_VERSION } from '../cmap/wire_protocol/constants'; import type { Collection } from '../collection'; import { MongoCompatibilityError, MongoInvalidArgumentError, MongoServerError } from '../error'; import type { Server } from '../sdam/server'; @@ -113,21 +114,27 @@ export class UpdateOperation extends CommandOperation { } const unacknowledgedWrite = this.writeConcern && this.writeConcern.w === 0; - if (unacknowledgedWrite || maxWireVersion(server) < 5) { + if ( + unacknowledgedWrite || + maxWireVersion(server) < MONGODB_WIRE_VERSION.COMMANDS_ACCEPT_WRITE_CONCERN + ) { if (this.statements.find((o: Document) => o.hint)) { callback(new MongoCompatibilityError(`Servers < 3.4 do not support hint on update`)); return; } } - if (this.explain && maxWireVersion(server) < 3) { + if (this.explain && maxWireVersion(server) < MONGODB_WIRE_VERSION.RELEASE_2_7_7) { callback( new MongoCompatibilityError(`Server ${server.name} does not support explain on update`) ); return; } - if (this.statements.some(statement => !!statement.arrayFilters) && maxWireVersion(server) < 6) { + if ( + this.statements.some(statement => !!statement.arrayFilters) && + maxWireVersion(server) < MONGODB_WIRE_VERSION.SUPPORTS_OP_MSG + ) { callback( new MongoCompatibilityError('Option "arrayFilters" is only supported on MongoDB 3.6+') ); diff --git a/src/read_preference.ts b/src/read_preference.ts index 554bf23412..b14afb9833 100644 --- a/src/read_preference.ts +++ b/src/read_preference.ts @@ -1,4 +1,5 @@ import type { Document } from './bson'; +import { MONGODB_WIRE_VERSION } from './cmap/wire_protocol/constants'; import { MongoInvalidArgumentError } from './error'; import type { TagSet } from './sdam/server_description'; import type { ClientSession } from './sessions'; @@ -109,7 +110,7 @@ export class ReadPreference { // NOTE: The minimum required wire version is 5 for this read preference. If the existing // topology has a lower value then a MongoError will be thrown during server selection. - this.minWireVersion = 5; + this.minWireVersion = MONGODB_WIRE_VERSION.COMMANDS_ACCEPT_WRITE_CONCERN; } if (this.mode === ReadPreference.PRIMARY) { diff --git a/src/sdam/common.ts b/src/sdam/common.ts index 9a9a5450a7..6738ad9eb8 100644 --- a/src/sdam/common.ts +++ b/src/sdam/common.ts @@ -53,12 +53,6 @@ export function drainTimerQueue(queue: TimerQueue): void { queue.clear(); } -/** @internal */ -export function clearAndRemoveTimerFrom(timer: NodeJS.Timeout, timers: TimerQueue): boolean { - clearTimeout(timer); - return timers.delete(timer); -} - /** @public */ export interface ClusterTime { clusterTime: Timestamp; diff --git a/src/sdam/server.ts b/src/sdam/server.ts index 9e9e1c059a..99f6e04aaa 100644 --- a/src/sdam/server.ts +++ b/src/sdam/server.ts @@ -11,6 +11,7 @@ import { ConnectionPoolEvents, ConnectionPoolOptions } from '../cmap/connection_pool'; +import { MONGODB_WIRE_VERSION } from '../cmap/wire_protocol/constants'; import { APM_EVENTS, CLOSED, @@ -30,6 +31,7 @@ import { isRetryableWriteError, isSDAMUnrecoverableError, MongoCompatibilityError, + MONGODB_ERROR_LABELS, MongoError, MongoInvalidArgumentError, MongoNetworkError, @@ -556,8 +558,11 @@ function makeOperationHandler( } // inActiveTransaction check handles commit and abort. - if (inActiveTransaction(session, cmd) && !err.hasErrorLabel('TransientTransactionError')) { - err.addErrorLabel('TransientTransactionError'); + if ( + inActiveTransaction(session, cmd) && + !err.hasErrorLabel(MONGODB_ERROR_LABELS.TransientTransactionError) + ) { + err.addErrorLabel(MONGODB_ERROR_LABELS.TransientTransactionError); } if ( @@ -565,7 +570,7 @@ function makeOperationHandler( supportsRetryableWrites(server) && !inActiveTransaction(session, cmd) ) { - err.addErrorLabel('RetryableWriteError'); + err.addErrorLabel(MONGODB_ERROR_LABELS.RetryableWriteError); } if (!(err instanceof MongoNetworkTimeoutError) || isNetworkErrorBeforeHandshake(err)) { @@ -581,16 +586,19 @@ function makeOperationHandler( // if pre-4.4 server, then add error label if its a retryable write error if ( (isRetryableWritesEnabled(server.s.topology) || isTransactionCommand(cmd)) && - maxWireVersion(server) < 9 && - isRetryableWriteError(err) && + maxWireVersion(server) < MONGODB_WIRE_VERSION.RESUMABLE_INITIAL_SYNC && + isRetryableWriteError(err, maxWireVersion(server)) && !inActiveTransaction(session, cmd) ) { - err.addErrorLabel('RetryableWriteError'); + err.addErrorLabel(MONGODB_ERROR_LABELS.RetryableWriteError); } if (isSDAMUnrecoverableError(err)) { if (shouldHandleStateChangeError(server, err)) { - if (maxWireVersion(server) <= 7 || isNodeShuttingDownError(err)) { + if ( + maxWireVersion(server) <= MONGODB_WIRE_VERSION.REPLICA_SET_TRANSACTIONS || + isNodeShuttingDownError(err) + ) { server.s.pool.clear(connection.serviceId); } @@ -602,7 +610,11 @@ function makeOperationHandler( } } - if (session && session.isPinned && err.hasErrorLabel('TransientTransactionError')) { + if ( + session && + session.isPinned && + err.hasErrorLabel(MONGODB_ERROR_LABELS.TransientTransactionError) + ) { session.unpin({ force: true }); } } diff --git a/src/sdam/server_description.ts b/src/sdam/server_description.ts index 5082c7cb3f..c48167ee16 100644 --- a/src/sdam/server_description.ts +++ b/src/sdam/server_description.ts @@ -1,4 +1,5 @@ import { Document, Long, ObjectId } from '../bson'; +import { MONGODB_WIRE_VERSION } from '../cmap/wire_protocol/constants'; import type { MongoError } from '../error'; import { arrayStrictEqual, errorStrictEqual, HostAddress, now } from '../utils'; import type { ClusterTime } from './common'; @@ -96,8 +97,8 @@ export class ServerDescription { this.passives = hello?.passives?.map((host: string) => host.toLowerCase()) ?? []; this.arbiters = hello?.arbiters?.map((host: string) => host.toLowerCase()) ?? []; this.tags = hello?.tags ?? {}; - this.minWireVersion = hello?.minWireVersion ?? 0; - this.maxWireVersion = hello?.maxWireVersion ?? 0; + this.minWireVersion = hello?.minWireVersion ?? MONGODB_WIRE_VERSION.UNKNOWN; + this.maxWireVersion = hello?.maxWireVersion ?? MONGODB_WIRE_VERSION.UNKNOWN; this.roundTripTime = options?.roundTripTime ?? -1; this.lastUpdateTime = now(); this.lastWriteDate = hello?.lastWrite?.lastWriteDate ?? 0; diff --git a/src/sdam/server_selection.ts b/src/sdam/server_selection.ts index 74bbd89189..a4e65d5fce 100644 --- a/src/sdam/server_selection.ts +++ b/src/sdam/server_selection.ts @@ -1,3 +1,4 @@ +import { MONGODB_WIRE_VERSION } from '../cmap/wire_protocol/constants'; import { MongoCompatibilityError, MongoInvalidArgumentError } from '../error'; import { ReadPreference } from '../read_preference'; import { ServerType, TopologyType } from './common'; @@ -8,9 +9,6 @@ import type { TopologyDescription } from './topology_description'; const IDLE_WRITE_PERIOD = 10000; const SMALLEST_MAX_STALENESS_SECONDS = 90; -// Minimum version to try writes on secondaries. -export const MIN_SECONDARY_WRITE_WIRE_VERSION = 13; - /** @public */ export type ServerSelector = ( topologyDescription: TopologyDescription, @@ -64,7 +62,7 @@ export function secondaryWritableServerSelector( if ( !readPreference || !wireVersion || - (wireVersion && wireVersion < MIN_SECONDARY_WRITE_WIRE_VERSION) + (wireVersion && wireVersion < MONGODB_WIRE_VERSION.WIRE_VERSION_50) ) { return readPreferenceServerSelector(ReadPreference.primary); } diff --git a/src/sdam/topology.ts b/src/sdam/topology.ts index 4d9a067eaa..d2afa58468 100644 --- a/src/sdam/topology.ts +++ b/src/sdam/topology.ts @@ -4,6 +4,7 @@ import { deserialize, serialize } from '../bson'; import type { MongoCredentials } from '../cmap/auth/mongo_credentials'; import type { ConnectionEvents, DestroyOptions } from '../cmap/connection'; import type { CloseOptions, ConnectionPoolEvents } from '../cmap/connection_pool'; +import { MONGODB_WIRE_VERSION } from '../cmap/wire_protocol/constants'; import { DEFAULT_OPTIONS } from '../connection_string'; import { CLOSE, @@ -50,7 +51,6 @@ import { } from '../utils'; import { _advanceClusterTime, - clearAndRemoveTimerFrom, ClusterTime, drainTimerQueue, ServerType, @@ -435,7 +435,12 @@ export class Topology extends TypedEventEmitter { // connect all known servers, then attempt server selection to connect const serverDescriptions = Array.from(this.s.description.servers.values()); - connectServers(this, serverDescriptions); + this.s.servers = new Map( + serverDescriptions.map(serverDescription => [ + serverDescription.address, + createAndConnectServer(this, serverDescription) + ]) + ); // In load balancer mode we need to fake a server description getting // emitted from the monitor, since the monitor doesn't exist. @@ -877,11 +882,7 @@ function randomSelection(array: ServerDescription[]): ServerDescription { * @param serverDescription - The description for the server to initialize and connect to * @param connectDelay - Time to wait before attempting initial connection */ -function createAndConnectServer( - topology: Topology, - serverDescription: ServerDescription, - connectDelay?: number -) { +function createAndConnectServer(topology: Topology, serverDescription: ServerDescription) { topology.emit( Topology.SERVER_OPENING, new ServerOpeningEvent(topology.s.id, serverDescription.address) @@ -894,38 +895,10 @@ function createAndConnectServer( server.on(Server.DESCRIPTION_RECEIVED, description => topology.serverUpdateHandler(description)); - if (connectDelay) { - const connectTimer = setTimeout(() => { - clearAndRemoveTimerFrom(connectTimer, topology.s.connectionTimers); - server.connect(); - }, connectDelay); - - topology.s.connectionTimers.add(connectTimer); - return server; - } - server.connect(); return server; } -/** - * Create `Server` instances for all initially known servers, connect them, and assign - * them to the passed in `Topology`. - * - * @param topology - The topology responsible for the servers - * @param serverDescriptions - A list of server descriptions to connect - */ -function connectServers(topology: Topology, serverDescriptions: ServerDescription[]) { - topology.s.servers = serverDescriptions.reduce( - (servers: Map, serverDescription: ServerDescription) => { - const server = createAndConnectServer(topology, serverDescription); - servers.set(serverDescription.address, server); - return servers; - }, - new Map() - ); -} - /** * @param topology - Topology to update. * @param incomingServerDescription - New server description. @@ -1067,42 +1040,44 @@ export class ServerCapabilities { minWireVersion: number; constructor(hello: Document) { - this.minWireVersion = hello.minWireVersion || 0; - this.maxWireVersion = hello.maxWireVersion || 0; + this.minWireVersion = hello.minWireVersion ?? MONGODB_WIRE_VERSION.UNKNOWN; + this.maxWireVersion = hello.maxWireVersion ?? MONGODB_WIRE_VERSION.UNKNOWN; } get hasAggregationCursor(): boolean { - return this.maxWireVersion >= 1; + return this.maxWireVersion >= MONGODB_WIRE_VERSION.AGG_RETURNS_CURSORS; } get hasWriteCommands(): boolean { - return this.maxWireVersion >= 2; + return this.maxWireVersion >= MONGODB_WIRE_VERSION.BATCH_COMMANDS; } + get hasTextSearch(): boolean { - return this.minWireVersion >= 0; + // TODO: Is this correct? + return this.minWireVersion >= MONGODB_WIRE_VERSION.UNKNOWN; } get hasAuthCommands(): boolean { - return this.maxWireVersion >= 1; + return this.maxWireVersion >= MONGODB_WIRE_VERSION.AGG_RETURNS_CURSORS; } get hasListCollectionsCommand(): boolean { - return this.maxWireVersion >= 3; + return this.maxWireVersion >= MONGODB_WIRE_VERSION.RELEASE_2_7_7; } get hasListIndexesCommand(): boolean { - return this.maxWireVersion >= 3; + return this.maxWireVersion >= MONGODB_WIRE_VERSION.RELEASE_2_7_7; } get supportsSnapshotReads(): boolean { - return this.maxWireVersion >= 13; + return this.maxWireVersion >= MONGODB_WIRE_VERSION.WIRE_VERSION_50; } get commandsTakeWriteConcern(): boolean { - return this.maxWireVersion >= 5; + return this.maxWireVersion >= MONGODB_WIRE_VERSION.COMMANDS_ACCEPT_WRITE_CONCERN; } get commandsTakeCollation(): boolean { - return this.maxWireVersion >= 5; + return this.maxWireVersion >= MONGODB_WIRE_VERSION.COMMANDS_ACCEPT_WRITE_CONCERN; } } diff --git a/src/sdam/topology_description.ts b/src/sdam/topology_description.ts index a14df1a555..b91a2bb6ab 100644 --- a/src/sdam/topology_description.ts +++ b/src/sdam/topology_description.ts @@ -1,17 +1,17 @@ import type { Document, ObjectId } from '../bson'; -import * as WIRE_CONSTANTS from '../cmap/wire_protocol/constants'; +import { + MAX_SUPPORTED_SERVER_VERSION, + MAX_SUPPORTED_WIRE_VERSION, + MIN_SUPPORTED_SERVER_VERSION, + MIN_SUPPORTED_WIRE_VERSION, + MONGODB_WIRE_VERSION +} from '../cmap/wire_protocol/constants'; import { MongoError, MongoRuntimeError } from '../error'; import { shuffle } from '../utils'; import { ServerType, TopologyType } from './common'; import { ServerDescription } from './server_description'; import type { SrvPollingEvent } from './srv_polling'; -// constants related to compatibility checks -const MIN_SUPPORTED_SERVER_VERSION = WIRE_CONSTANTS.MIN_SUPPORTED_SERVER_VERSION; -const MAX_SUPPORTED_SERVER_VERSION = WIRE_CONSTANTS.MAX_SUPPORTED_SERVER_VERSION; -const MIN_SUPPORTED_WIRE_VERSION = WIRE_CONSTANTS.MIN_SUPPORTED_WIRE_VERSION; -const MAX_SUPPORTED_WIRE_VERSION = WIRE_CONSTANTS.MAX_SUPPORTED_WIRE_VERSION; - const MONGOS_OR_UNKNOWN = new Set([ServerType.Mongos, ServerType.Unknown]); const MONGOS_OR_STANDALONE = new Set([ServerType.Mongos, ServerType.Standalone]); const NON_PRIMARY_RS_MEMBERS = new Set([ @@ -211,7 +211,7 @@ export class TopologyDescription { const serverDescriptions = new Map(this.servers); // update common wire version - if (serverDescription.maxWireVersion !== 0) { + if (serverDescription.maxWireVersion !== MONGODB_WIRE_VERSION.UNKNOWN) { if (commonWireVersion == null) { commonWireVersion = serverDescription.maxWireVersion; } else { diff --git a/src/sessions.ts b/src/sessions.ts index 830aeea9cd..99d6473676 100644 --- a/src/sessions.ts +++ b/src/sessions.ts @@ -1,16 +1,17 @@ import { Binary, Document, Long, Timestamp } from './bson'; import type { CommandOptions, Connection } from './cmap/connection'; import { ConnectionPoolMetrics } from './cmap/metrics'; +import { MONGODB_WIRE_VERSION } from './cmap/wire_protocol/constants'; import { isSharded } from './cmap/wire_protocol/shared'; import { PINNED, UNPINNED } from './constants'; import type { AbstractCursor } from './cursor/abstract_cursor'; import { AnyError, - isRetryableEndTransactionError, - isRetryableError, + isRetryableReadError, MongoAPIError, MongoCompatibilityError, MONGODB_ERROR_CODES, + MONGODB_ERROR_LABELS, MongoDriverError, MongoError, MongoExpiredSessionError, @@ -43,8 +44,6 @@ import { } from './utils'; import type { WriteConcern } from './write_concern'; -const minWireVersionForShardedTransactions = 8; - function assertAlive(session: ClientSession, callback?: Callback): boolean { if (session.serverSession == null) { const error = new MongoExpiredSessionError(); @@ -392,7 +391,7 @@ export class ClientSession extends TypedEventEmitter { if ( isSharded(this.topology) && topologyMaxWireVersion != null && - topologyMaxWireVersion < minWireVersionForShardedTransactions + topologyMaxWireVersion < MONGODB_WIRE_VERSION.SHARDED_TRANSACTIONS ) { throw new MongoCompatibilityError( 'Transactions are not supported on sharded clusters in MongoDB < 4.2.' @@ -507,7 +506,7 @@ export function maybeClearPinnedConnection( session.inTransaction() && error && error instanceof MongoError && - error.hasErrorLabel('TransientTransactionError') + error.hasErrorLabel(MONGODB_ERROR_LABELS.TransientTransactionError) ) { return; } @@ -559,11 +558,11 @@ function attemptTransactionCommit( hasNotTimedOut(startTime, MAX_WITH_TRANSACTION_TIMEOUT) && !isMaxTimeMSExpiredError(err) ) { - if (err.hasErrorLabel('UnknownTransactionCommitResult')) { + if (err.hasErrorLabel(MONGODB_ERROR_LABELS.UnknownTransactionCommitResult)) { return attemptTransactionCommit(session, startTime, fn, options); } - if (err.hasErrorLabel('TransientTransactionError')) { + if (err.hasErrorLabel(MONGODB_ERROR_LABELS.TransientTransactionError)) { return attemptTransaction(session, startTime, fn, options); } } @@ -617,14 +616,14 @@ function attemptTransaction( function maybeRetryOrThrow(err: MongoError): Promise { if ( err instanceof MongoError && - err.hasErrorLabel('TransientTransactionError') && + err.hasErrorLabel(MONGODB_ERROR_LABELS.TransientTransactionError) && hasNotTimedOut(startTime, MAX_WITH_TRANSACTION_TIMEOUT) ) { return attemptTransaction(session, startTime, fn, options); } if (isMaxTimeMSExpiredError(err)) { - err.addErrorLabel('UnknownTransactionCommitResult'); + err.addErrorLabel(MONGODB_ERROR_LABELS.UnknownTransactionCommitResult); } throw err; @@ -639,7 +638,11 @@ function attemptTransaction( ); } -function endTransaction(session: ClientSession, commandName: string, callback: Callback) { +function endTransaction( + session: ClientSession, + commandName: 'abortTransaction' | 'commitTransaction', + callback: Callback +) { if (!assertAlive(session, callback)) { // checking result in case callback was called return; @@ -717,7 +720,7 @@ function endTransaction(session: ClientSession, commandName: string, callback: C Object.assign(command, { maxTimeMS: session.transaction.options.maxTimeMS }); } - function commandHandler(e?: MongoError, r?: Document) { + function commandHandler(error?: Error, result?: Document) { if (commandName !== 'commitTransaction') { session.transaction.transition(TxnState.TRANSACTION_ABORTED); if (session.loadBalanced) { @@ -729,25 +732,25 @@ function endTransaction(session: ClientSession, commandName: string, callback: C } session.transaction.transition(TxnState.TRANSACTION_COMMITTED); - if (e) { + if (error instanceof MongoError) { if ( - e instanceof MongoNetworkError || - e instanceof MongoWriteConcernError || - isRetryableError(e) || - isMaxTimeMSExpiredError(e) + error instanceof MongoNetworkError || + error instanceof MongoWriteConcernError || + isRetryableReadError(error) || // TODO is commit a write? probably, double check its retry rules + isMaxTimeMSExpiredError(error) ) { - if (isUnknownTransactionCommitResult(e)) { - e.addErrorLabel('UnknownTransactionCommitResult'); + if (isUnknownTransactionCommitResult(error)) { + error.addErrorLabel(MONGODB_ERROR_LABELS.UnknownTransactionCommitResult); // per txns spec, must unpin session in this case - session.unpin({ error: e }); + session.unpin({ error }); } - } else if (e.hasErrorLabel('TransientTransactionError')) { - session.unpin({ error: e }); + } else if (error.hasErrorLabel(MONGODB_ERROR_LABELS.TransientTransactionError)) { + session.unpin({ error }); } } - callback(e, r); + callback(error, result); } // Assumption here that commandName is "commitTransaction" or "abortTransaction" @@ -763,13 +766,16 @@ function endTransaction(session: ClientSession, commandName: string, callback: C readPreference: ReadPreference.primary, bypassPinningCheck: true }), - (err, reply) => { + (error, result) => { if (command.abortTransaction) { // always unpin on abort regardless of command outcome session.unpin(); } - if (err && isRetryableEndTransactionError(err as MongoError)) { + if ( + error instanceof MongoError && + error.hasErrorLabel(MONGODB_ERROR_LABELS.RetryableWriteError) + ) { // SPEC-1185: apply majority write concern when retrying commitTransaction if (command.commitTransaction) { // per txns spec, must unpin session in this case @@ -787,11 +793,11 @@ function endTransaction(session: ClientSession, commandName: string, callback: C readPreference: ReadPreference.primary, bypassPinningCheck: true }), - (_err, _reply) => commandHandler(_err as MongoError, _reply) + commandHandler ); } - commandHandler(err as MongoError, reply); + commandHandler(error, result); } ); } diff --git a/src/utils.ts b/src/utils.ts index ac787252f6..12e650173e 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -5,7 +5,7 @@ import { URL } from 'url'; import { Document, ObjectId, resolveBSONOptions } from './bson'; import type { Connection } from './cmap/connection'; -import { MAX_SUPPORTED_WIRE_VERSION } from './cmap/wire_protocol/constants'; +import { MAX_SUPPORTED_WIRE_VERSION, MONGODB_WIRE_VERSION } from './cmap/wire_protocol/constants'; import type { Collection } from './collection'; import { LEGACY_HELLO_COMMAND } from './constants'; import type { Db } from './db'; @@ -686,8 +686,10 @@ export function maxWireVersion(topologyOrServer?: Connection | Topology | Server * @param server - to check against * @param cmd - object where collation may be specified */ -export function collationNotSupported(server: Server, cmd: Document): boolean { - return cmd && cmd.collation && maxWireVersion(server) < 5; +export function collationNotSupported(server: Server, cmd?: Document): boolean { + return ( + cmd?.collation && maxWireVersion(server) < MONGODB_WIRE_VERSION.COMMANDS_ACCEPT_WRITE_CONCERN + ); } /** @@ -1419,13 +1421,28 @@ export function enumToString(en: Record): string { * * @internal */ -export function supportsRetryableWrites(server: Server): boolean { - return ( - !!server.loadBalanced || - (server.description.maxWireVersion >= 6 && - !!server.description.logicalSessionTimeoutMinutes && - server.description.type !== ServerType.Standalone) - ); +export function supportsRetryableWrites(server?: Server): boolean { + if (!server) { + return false; + } + + if (server.loadBalanced) { + // Loadbalanced topologies will always support retry writes + return true; + } + + if (server.description.maxWireVersion >= MONGODB_WIRE_VERSION.SUPPORTS_OP_MSG) { + // Talking to a 3.6+ server + if (server.description.logicalSessionTimeoutMinutes != null) { + // that supports sessions + if (server.description.type !== ServerType.Standalone) { + // and that is not a standalone + return true; + } + } + } + + return false; } export function parsePackageVersion({ version }: { version: string }): { diff --git a/test/integration/load-balancers/load_balancers.spec.test.js b/test/integration/load-balancers/load_balancers.spec.test.js index 1446a5e42c..ae8abc298d 100644 --- a/test/integration/load-balancers/load_balancers.spec.test.js +++ b/test/integration/load-balancers/load_balancers.spec.test.js @@ -61,6 +61,5 @@ const SKIP = [ ]; describe('Load Balancer Unified Tests', function () { - this.timeout(10000); runUnifiedSuite(loadSpecTests(path.join('load-balancers')), SKIP); }); diff --git a/test/integration/retryable-writes/retryable_writes.spec.test.js b/test/integration/retryable-writes/retryable_writes.spec.test.js deleted file mode 100644 index d85f0881f0..0000000000 --- a/test/integration/retryable-writes/retryable_writes.spec.test.js +++ /dev/null @@ -1,199 +0,0 @@ -'use strict'; - -const { expect } = require('chai'); -const { loadSpecTests } = require('../../spec'); -const { legacyRunOnToRunOnRequirement } = require('../../tools/spec-runner'); -const { isAnyRequirementSatisfied } = require('../../tools/unified-spec-runner/unified-utils'); - -describe('Legacy Retryable Writes specs', function () { - let ctx = {}; - const retryableWrites = loadSpecTests('retryable-writes/legacy'); - - for (const suite of retryableWrites) { - describe(suite.name, function () { - beforeEach(async function () { - let utilClient; - if (this.configuration.isLoadBalanced) { - // The util client can always point at the single mongos LB frontend. - utilClient = this.configuration.newClient(this.configuration.singleMongosLoadBalancerUri); - } else { - utilClient = this.configuration.newClient(); - } - - await utilClient.connect(); - - const allRequirements = suite.runOn.map(legacyRunOnToRunOnRequirement); - - const someRequirementMet = - !allRequirements.length || - (await isAnyRequirementSatisfied(this.currentTest.ctx, allRequirements, utilClient)); - - await utilClient.close(); - - if (!someRequirementMet) this.skip(); - }); - - afterEach(async function () { - // Step 3: Test Teardown. Turn off failpoints, and close client - if (!ctx.db || !ctx.client) { - return; - } - - if (ctx.failPointName) { - await turnOffFailPoint(ctx.client, ctx.failPointName); - } - await ctx.client.close(); - ctx = {}; // reset context - }); - - for (const test of suite.tests) { - it(test.description, async function () { - // Step 1: Test Setup. Includes a lot of boilerplate stuff - // like creating a client, dropping and refilling data collections, - // and enabling failpoints - await executeScenarioSetup(suite, test, this.configuration, ctx); - // Step 2: Run the test - await executeScenarioTest(test, ctx); - }); - } - }); - } -}); - -function executeScenarioSetup(scenario, test, config, ctx) { - const url = config.url(); - const options = Object.assign({}, test.clientOptions, { - heartbeatFrequencyMS: 100, - monitorCommands: true, - minPoolSize: 10 - }); - - ctx.failPointName = test.failPoint && test.failPoint.configureFailPoint; - - const client = config.newClient(url, options); - return client - .connect() - .then(client => (ctx.client = client)) - .then(() => (ctx.db = ctx.client.db(config.db))) - .then( - () => - (ctx.collection = ctx.db.collection( - `retryable_writes_test_${config.name}_${test.operation.name}` - )) - ) - .then(() => ctx.collection.drop()) - .catch(err => { - if (!err.message.match(/ns not found/)) { - throw err; - } - }) - .then(() => - Array.isArray(scenario.data) && scenario.data.length - ? ctx.collection.insertMany(scenario.data) - : {} - ) - .then(() => (test.failPoint ? ctx.client.db('admin').command(test.failPoint) : {})); -} - -function executeScenarioTest(test, ctx) { - return Promise.resolve() - .then(() => { - const args = generateArguments(test); - - let result = ctx.collection[test.operation.name].apply(ctx.collection, args); - const outcome = test.outcome && test.outcome.result; - const errorLabelsContain = outcome && outcome.errorLabelsContain; - const errorLabelsOmit = outcome && outcome.errorLabelsOmit; - const hasResult = outcome && !errorLabelsContain && !errorLabelsOmit; - if (test.outcome.error) { - result = result - .then(() => expect(false).to.be.true) - .catch(err => { - expect(err).to.exist; - expect(err.message, 'expected operations to fail, but they succeeded').to.not.match( - /expected false to be true/ - ); - if (hasResult) expect(err.result).to.matchMongoSpec(test.outcome.result); - if (errorLabelsContain) expect(err.errorLabels).to.include.members(errorLabelsContain); - if (errorLabelsOmit) { - errorLabelsOmit.forEach(label => { - expect(err.errorLabels).to.not.contain(label); - }); - } - }); - } else if (test.outcome.result) { - const expected = test.outcome.result; - result = result.then(transformToResultValue).then(r => expect(r).to.deep.include(expected)); - } - - return result; - }) - .then(() => { - if (test.outcome.collection) { - return ctx.collection - .find({}) - .toArray() - .then(collectionResults => { - expect(collectionResults).to.eql(test.outcome.collection.data); - }); - } - }); -} - -// Helper Functions - -/** - * Transforms the arguments from a test into actual arguments for our function calls - * - * @param {any} test - */ -function generateArguments(test) { - const args = []; - - if (test.operation.arguments) { - const options = {}; - Object.keys(test.operation.arguments).forEach(arg => { - if (arg === 'requests') { - args.push(test.operation.arguments[arg].map(convertBulkWriteOperation)); - } else if (arg === 'upsert') { - options.upsert = test.operation.arguments[arg]; - } else if (arg === 'returnDocument') { - options.returnDocument = test.operation.arguments[arg].toLowerCase(); - } else { - args.push(test.operation.arguments[arg]); - } - }); - - if (Object.keys(options).length > 0) { - args.push(options); - } - } - - return args; -} - -/** - * Transforms a request arg into a bulk write operation - * - * @param {any} op - */ -function convertBulkWriteOperation(op) { - return { [op.name]: op.arguments }; -} - -/** - * Transforms output of a bulk write to conform to the test format - * - * @param {any} result - */ -function transformToResultValue(result) { - return result && result.value ? result.value : result; -} - -/** Runs a command that turns off a fail point */ -function turnOffFailPoint(client, name) { - return client.db('admin').command({ - configureFailPoint: name, - mode: 'off' - }); -} diff --git a/test/integration/retryable-writes/retryable_writes.spec.test.ts b/test/integration/retryable-writes/retryable_writes.spec.test.ts index 14920cf872..faaadc7f94 100644 --- a/test/integration/retryable-writes/retryable_writes.spec.test.ts +++ b/test/integration/retryable-writes/retryable_writes.spec.test.ts @@ -14,6 +14,7 @@ interface RetryableWriteTestContext { describe('Legacy Retryable Writes Specs', function () { let ctx: RetryableWriteTestContext = {}; + const retryableWrites = loadSpecTests('retryable-writes', 'legacy'); for (const suite of retryableWrites) { @@ -44,10 +45,8 @@ describe('Legacy Retryable Writes Specs', function () { // Step 1: Test Setup. Includes a lot of boilerplate stuff // like creating a client, dropping and refilling data collections, // and enabling failpoints - // eslint-disable-next-line @typescript-eslint/ban-ts-comment - //@ts-expect-error - const { specTest } = this.currentTest; - await executeScenarioSetup(suite, specTest, this.configuration, ctx); + const { spec } = this.currentTest; + await executeScenarioSetup(suite, spec, this.configuration, ctx); }); afterEach(async function () { @@ -63,11 +62,15 @@ describe('Legacy Retryable Writes Specs', function () { ctx = {}; // reset context }); - for (const test of suite.tests) { - it(test.description, async function () { - // Step 2: Run the test - await executeScenarioTest(test, ctx); - }).specTest = test; + for (const spec of suite.tests) { + // Step 2: Run the test + const mochaTest = it(spec.description, async () => await executeScenarioTest(spec, ctx)); + + // A pattern we don't need to repeat for unified tests + // In order to give the beforeEach hook access to the + // spec test so it can be responsible for skipping it + // and executeScenarioSetup + mochaTest.spec = spec; } }); } @@ -111,16 +114,23 @@ async function executeScenarioSetup(scenario, test, config, ctx) { async function executeScenarioTest(test, ctx) { const args = generateArguments(test); + // In case the spec files or our API changes + expect(ctx.collection).to.have.property(test.operation.name).that.is.a('function'); + let thrownError; - let result = await ctx.collection[test.operation.name](...args).catch(error => { + let result; + try { + result = await ctx.collection[test.operation.name](...args); + } catch (error) { thrownError = error; - }); + } const outcome = test.outcome && test.outcome.result; const errorLabelsContain = outcome && outcome.errorLabelsContain; const errorLabelsOmit = outcome && outcome.errorLabelsOmit; const hasResult = outcome && !errorLabelsContain && !errorLabelsOmit; if (test.outcome.error) { + expect(thrownError, `${test.operation.name} was supposed to fail but did not!`).to.exist; expect(thrownError).to.have.property('message'); if (hasResult) { @@ -137,6 +147,7 @@ async function executeScenarioTest(test, ctx) { } } } else if (test.outcome.result) { + expect(thrownError, thrownError?.stack).to.not.exist; const expected = test.outcome.result; result = transformToResultValue(result); expect(result).to.deep.include(expected); diff --git a/test/readme.md b/test/readme.md index b5105234b9..c1f258447e 100644 --- a/test/readme.md +++ b/test/readme.md @@ -13,6 +13,43 @@ about the types of tests and how to run them. - [Writing Tests](#writing-tests) - [Testing with Special Environments](#testing-with-special-environments) +## Complex deployments + +Some of the topologies mentioned in this testing guide require a bit of "orchestration". +For that, we have [mongo-orchestration](https://github.com/10gen/mongo-orchestration). +We advise cloning this repository (instead of installing from pip). Inside the repo run: + +```sh +# Create a virtual environment so you can make sure m-orch has all of its dependencies versioned separate from your global environment +python -m .venv.com +# There are activates for a few different shells, use the one that matches your shell +source ./.venv.com/bin/activate +``` + +Your working directory must still be at the root of the mongo-orchestration repository. +With the virtual environment activated install mongo-orchestration with the following command. + +```sh +# "-e ." installs the package as an editable python module +# the benefit is that the source code in the repo is the same code that is run when you launch orchestration +# instead of a new copy being installed elsewhere +pip install -e . +``` + +I prefer launching mongo-orchestration in the foreground so it's easily stopped with ctrl-C but the pid file can be found in the current directory. + +```sh +mongo-orchestration start --no-fork +``` + +Posting a configuration can look something like this: + +```sh +curl --silent --show-error --max-time 60 --fail http://localhost:8889/v1/sharded_clusters --data @"$DRIVERS_TOOLS/.evergreen/orchestration/configs/sharded_clusters/load-balancer.json" +``` + +Where `$DRIVERS_TOOLS` is the path to drivers evergreen tools. + ## About the Tests All of our test automation is powered by the [Mocha test framework][mocha]. @@ -263,7 +300,8 @@ The following steps will walk you through how to create and test a MongoDB Serve The following steps will walk you through how to start and test a load balancer. -1. Start a sharded cluster. You can use the [cluster_setup.sh](tools/cluster_setup.sh) script to do so: `./test/tools/cluster_setup.sh sharded_cluster`. The tool should create a cluster with two mongos, so you have a URI similar to `MONGODB_URI=mongodb://host1,host2/`. +1. Start a sharded cluster with `--setParameter featureFlagLoadBalancer=true` and `--setParameter loadBalancerPort=27051` flags passed to the mongos processes. + 1. Follow the instructions here to setup [mongo-orchestration]() 1. Create an environment variable named `MONGODB_URI` that stores the URI of the sharded cluster you just created. For example: `export MONGODB_URI="mongodb://host1,host2/"` 1. Install the HAProxy load balancer. For those on macOS, you can install HAProxy with `brew install haproxy`. 1. Start the load balancer by using the [run-load-balancer script](https://github.com/mongodb-labs/drivers-evergreen-tools/blob/master/.evergreen/run-load-balancer.sh) provided in drivers-evergreen-tools. diff --git a/test/tools/spec-runner/index.js b/test/tools/spec-runner/index.js index 0cbac9ff69..f37c7d39a9 100644 --- a/test/tools/spec-runner/index.js +++ b/test/tools/spec-runner/index.js @@ -856,25 +856,23 @@ function convertCollectionOptions(options) { return result; } -function testOperations(testData, operationContext, options) { +async function testOperations(testData, operationContext, options) { options = options || { swallowOperationErrors: true }; - return testData.operations.reduce((combined, operation) => { - return combined.then(() => { - const object = operation.object || 'collection'; - if (object === 'collection') { - const db = operationContext.database; - const collectionName = operationContext.collectionName; - const collectionOptions = operation.collectionOptions || {}; - - operationContext[object] = db.collection( - collectionName, - convertCollectionOptions(collectionOptions) - ); - } - return testOperation(operation, operationContext[object], operationContext, options); - }); - }, Promise.resolve()); + for (const operation of testData.operations) { + const object = operation.object || 'collection'; + if (object === 'collection') { + const db = operationContext.database; + const collectionName = operationContext.collectionName; + const collectionOptions = operation.collectionOptions || {}; + + operationContext[object] = db.collection( + collectionName, + convertCollectionOptions(collectionOptions) + ); + } + await testOperation(operation, operationContext[object], operationContext, options); + } } module.exports = { diff --git a/test/tools/unified-spec-runner/entities.ts b/test/tools/unified-spec-runner/entities.ts index b26c0fccfc..0599e32114 100644 --- a/test/tools/unified-spec-runner/entities.ts +++ b/test/tools/unified-spec-runner/entities.ts @@ -242,7 +242,7 @@ export class FailPointMap extends Map { try { await client.connect(); } catch (error) { - console.error(`failed to connect disableFailPoints ${hostAddress}`); + console.error(`failed to connect disableFailPoints ${hostAddress} - ${error}`); throw error; } const admin = client.db('admin'); diff --git a/test/tools/unified-spec-runner/runner.ts b/test/tools/unified-spec-runner/runner.ts index 34964caf67..49c0b54a06 100644 --- a/test/tools/unified-spec-runner/runner.ts +++ b/test/tools/unified-spec-runner/runner.ts @@ -69,7 +69,7 @@ export async function runUnifiedTest( utilClient = ctx.configuration.newClient(); } - let entities; + let entities: EntitiesMap; try { trace('\n starting test:'); try { diff --git a/test/types/community/client.test-d.ts b/test/types/community/client.test-d.ts index 79000b8e35..0f968f5bfc 100644 --- a/test/types/community/client.test-d.ts +++ b/test/types/community/client.test-d.ts @@ -55,6 +55,7 @@ export async function testFunc(): Promise { MongoClient.connect(connectionString, err => { if (err instanceof MongoError) { + //@ts-expect-error // TODO? can we break? expectType(err.hasErrorLabel('label')); } }); diff --git a/test/unit/error.test.js b/test/unit/error.test.js index 134d309aa8..ed110a6af2 100644 --- a/test/unit/error.test.js +++ b/test/unit/error.test.js @@ -5,7 +5,7 @@ const expect = require('chai').expect; const mock = require('../tools/mongodb-mock/index'); const { getSymbolFrom } = require('../tools/utils'); const { ReplSetFixture } = require('../tools/common'); -const { ns, isHello } = require('../../src/utils'); +const { ns, isHello, setDifference } = require('../../src/utils'); const { Topology } = require('../../src/sdam/topology'); const { MongoNetworkError, @@ -18,7 +18,6 @@ const { LEGACY_NOT_WRITABLE_PRIMARY_ERROR_MESSAGE, LEGACY_NOT_PRIMARY_OR_SECONDARY_ERROR_MESSAGE, NODE_IS_RECOVERING_ERROR_MESSAGE, - isRetryableEndTransactionError, isSDAMUnrecoverableError } = require('../../src/error'); const { @@ -28,16 +27,38 @@ const { describe('MongoErrors', () => { // import errors as object - let errorClasses = Object.fromEntries( - Object.entries(require('../../src/index')).filter(([key]) => key.endsWith('Error')) + let errorClassesFromEntryPoint = Object.fromEntries( + Object.entries(require('../../src/index')).filter( + ([key, value]) => key.endsWith('Error') && value.toString().startsWith('class') + ) ); - errorClasses = { ...errorClasses, MongoPoolClosedError, MongoWaitQueueTimeoutError }; + errorClassesFromEntryPoint = { + ...errorClassesFromEntryPoint, + MongoPoolClosedError, + MongoWaitQueueTimeoutError + }; + + let errorClassesFromErrorSrc = Object.fromEntries( + Object.entries(require('../../src/error')).filter( + ([key, value]) => key.endsWith('Error') && value.toString().startsWith('class') + ) + ); + + it('all defined errors should be public', () => { + expect( + setDifference(Object.keys(errorClassesFromEntryPoint), Object.keys(errorClassesFromErrorSrc)) + ).to.have.property('size', 3); - for (const errorName in errorClasses) { + expect( + setDifference(Object.keys(errorClassesFromErrorSrc), Object.keys(errorClassesFromEntryPoint)) + ).to.have.property('size', 0); + }); + + for (const errorName in errorClassesFromEntryPoint) { describe(errorName, () => { it(`name should be read-only`, () => { // Dynamically create error class with message - let error = new errorClasses[errorName]('generated by test'); + let error = new errorClassesFromEntryPoint[errorName]('generated by test'); // expect name property to be class name expect(error).to.have.property('name', errorName); @@ -89,34 +110,6 @@ describe('MongoErrors', () => { }); }); - describe('#isRetryableEndTransactionError', function () { - context('when the error has a RetryableWriteError label', function () { - const error = new MongoNetworkError(''); - error.addErrorLabel('RetryableWriteError'); - - it('returns true', function () { - expect(isRetryableEndTransactionError(error)).to.be.true; - }); - }); - - context('when the error does not have a RetryableWriteError label', function () { - const error = new MongoNetworkError(''); - error.addErrorLabel('InvalidLabel'); - - it('returns false', function () { - expect(isRetryableEndTransactionError(error)).to.be.false; - }); - }); - - context('when the error does not have any label', function () { - const error = new MongoNetworkError(''); - - it('returns false', function () { - expect(isRetryableEndTransactionError(error)).to.be.false; - }); - }); - }); - describe('#isSDAMUnrecoverableError', function () { context('when the error is a MongoParseError', function () { it('returns true', function () { diff --git a/test/unit/sdam/server_selection.test.js b/test/unit/sdam/server_selection.test.js index 4f32651be2..c2be74e014 100644 --- a/test/unit/sdam/server_selection.test.js +++ b/test/unit/sdam/server_selection.test.js @@ -6,12 +6,14 @@ const { ObjectId } = require('../../../src/bson'); const { ReadPreference } = require('../../../src/read_preference'); const { sameServerSelector, - secondaryWritableServerSelector, - MIN_SECONDARY_WRITE_WIRE_VERSION + secondaryWritableServerSelector } = require('../../../src/sdam/server_selection'); const { ServerDescription } = require('../../../src/sdam/server_description'); const { TopologyDescription } = require('../../../src/sdam/topology_description'); const { TopologyType } = require('../../../src/sdam/common'); +const { MONGODB_WIRE_VERSION } = require('../../../src/cmap/wire_protocol/constants'); + +const MIN_SECONDARY_WRITE_WIRE_VERSION = MONGODB_WIRE_VERSION.WIRE_VERSION_50; describe('server selection', function () { const primary = new ServerDescription('127.0.0.1:27017', {