Skip to content

Commit

Permalink
feat(NODE-6090): Implement CSOT logic for connection checkout and ser…
Browse files Browse the repository at this point in the history
…ver selection
  • Loading branch information
W-A-James authored and dariakp committed Nov 6, 2024
1 parent dc3fe95 commit bd8a9f4
Show file tree
Hide file tree
Showing 20 changed files with 570 additions and 178 deletions.
3 changes: 2 additions & 1 deletion src/admin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ export class Admin {
new RunAdminCommandOperation(command, {
...resolveBSONOptions(options),
session: options?.session,
readPreference: options?.readPreference
readPreference: options?.readPreference,
timeoutMS: options?.timeoutMS ?? this.s.db.timeoutMS
})
);
}
Expand Down
4 changes: 4 additions & 0 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import { type CancellationToken, TypedEventEmitter } from '../mongo_types';
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
import { ServerType } from '../sdam/common';
import { applySession, type ClientSession, updateSessionFromResponse } from '../sessions';
import { type Timeout } from '../timeout';
import {
BufferPool,
calculateDurationInMs,
Expand Down Expand Up @@ -99,6 +100,9 @@ export interface CommandOptions extends BSONSerializeOptions {
writeConcern?: WriteConcern;

directConnection?: boolean;

/** @internal */
timeout?: Timeout;
}

/** @public */
Expand Down
53 changes: 38 additions & 15 deletions src/cmap/connection_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ import {
MongoInvalidArgumentError,
MongoMissingCredentialsError,
MongoNetworkError,
MongoOperationTimeoutError,
MongoRuntimeError,
MongoServerError
} from '../error';
import { CancellationToken, TypedEventEmitter } from '../mongo_types';
import type { Server } from '../sdam/server';
import { Timeout, TimeoutError } from '../timeout';
import { type Callback, List, makeCounter, now, promiseWithResolvers } from '../utils';
import { type Callback, csotMin, List, makeCounter, promiseWithResolvers } from '../utils';
import { connect } from './connect';
import { Connection, type ConnectionEvents, type ConnectionOptions } from './connection';
import {
Expand Down Expand Up @@ -102,7 +103,6 @@ export interface ConnectionPoolOptions extends Omit<ConnectionOptions, 'id' | 'g
export interface WaitQueueMember {
resolve: (conn: Connection) => void;
reject: (err: AnyError) => void;
timeout: Timeout;
[kCancelled]?: boolean;
checkoutTime: number;
}
Expand Down Expand Up @@ -355,37 +355,57 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
* will be held by the pool. This means that if a connection is checked out it MUST be checked back in or
* explicitly destroyed by the new owner.
*/
async checkOut(): Promise<Connection> {
const checkoutTime = now();
async checkOut(options?: { timeout?: Timeout }): Promise<Connection> {
this.emitAndLog(
ConnectionPool.CONNECTION_CHECK_OUT_STARTED,
new ConnectionCheckOutStartedEvent(this)
);

const waitQueueTimeoutMS = this.options.waitQueueTimeoutMS;
const serverSelectionTimeoutMS = this[kServer].topology.s.serverSelectionTimeoutMS;

const { promise, resolve, reject } = promiseWithResolvers<Connection>();

const timeout = Timeout.expires(waitQueueTimeoutMS);
let timeout: Timeout | null = null;
if (options?.timeout) {
// CSOT enabled
// Determine if we're using the timeout passed in or a new timeout
if (options.timeout.duration > 0 || serverSelectionTimeoutMS > 0) {
// This check determines whether or not Topology.selectServer used the configured
// `timeoutMS` or `serverSelectionTimeoutMS` value for its timeout
if (
options.timeout.duration === serverSelectionTimeoutMS ||
csotMin(options.timeout.duration, serverSelectionTimeoutMS) < serverSelectionTimeoutMS
) {
// server selection used `timeoutMS`, so we should use the existing timeout as the timeout
// here
timeout = options.timeout;
} else {
// server selection used `serverSelectionTimeoutMS`, so we construct a new timeout with
// the time remaining to ensure that Topology.selectServer and ConnectionPool.checkOut
// cumulatively don't spend more than `serverSelectionTimeoutMS` blocking
timeout = Timeout.expires(serverSelectionTimeoutMS - options.timeout.timeElapsed);
}
}
} else {
timeout = Timeout.expires(waitQueueTimeoutMS);
}

const waitQueueMember: WaitQueueMember = {
resolve,
reject,
timeout,
checkoutTime
reject
};

this[kWaitQueue].push(waitQueueMember);
process.nextTick(() => this.processWaitQueue());

try {
return await Promise.race([promise, waitQueueMember.timeout]);
timeout?.throwIfExpired();
return await (timeout ? Promise.race([promise, timeout]) : promise);
} catch (error) {
if (TimeoutError.is(error)) {
waitQueueMember[kCancelled] = true;

waitQueueMember.timeout.clear();

this.emitAndLog(
ConnectionPool.CONNECTION_CHECK_OUT_FAILED,
new ConnectionCheckOutFailedEvent(this, 'timeout', waitQueueMember.checkoutTime)
Expand All @@ -396,9 +416,16 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
: 'Timed out while checking out a connection from connection pool',
this.address
);
if (options?.timeout) {
throw new MongoOperationTimeoutError('Timed out during connection checkout', {
cause: timeoutError
});
}
throw timeoutError;
}
throw error;
} finally {
if (timeout !== options?.timeout) timeout?.clear();
}
}

Expand Down Expand Up @@ -764,7 +791,6 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
ConnectionPool.CONNECTION_CHECK_OUT_FAILED,
new ConnectionCheckOutFailedEvent(this, reason, waitQueueMember.checkoutTime, error)
);
waitQueueMember.timeout.clear();
this[kWaitQueue].shift();
waitQueueMember.reject(error);
continue;
Expand All @@ -785,7 +811,6 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
ConnectionPool.CONNECTION_CHECKED_OUT,
new ConnectionCheckedOutEvent(this, connection, waitQueueMember.checkoutTime)
);
waitQueueMember.timeout.clear();

this[kWaitQueue].shift();
waitQueueMember.resolve(connection);
Expand Down Expand Up @@ -828,8 +853,6 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
);
waitQueueMember.resolve(connection);
}

waitQueueMember.timeout.clear();
}
process.nextTick(() => this.processWaitQueue());
});
Expand Down
5 changes: 5 additions & 0 deletions src/collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,11 @@ export class Collection<TSchema extends Document = Document> {
this.s.collectionHint = normalizeHintField(v);
}

/** @internal */
get timeoutMS(): number | undefined {
return this.s.options.timeoutMS;
}

/**
* Inserts a single document into MongoDB. If documents passed in do not contain the **_id** field,
* one will be added to each of the documents missing it by the driver, mutating the document. This behavior
Expand Down
6 changes: 6 additions & 0 deletions src/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,11 @@ export class Db {
return this.s.namespace.toString();
}

/** @internal */
get timeoutMS(): number | undefined {
return this.s.options?.timeoutMS;
}

/**
* Create a new collection on a server with the specified options. Use this to create capped collections.
* More information about command options available at https://www.mongodb.com/docs/manual/reference/command/create/
Expand Down Expand Up @@ -272,6 +277,7 @@ export class Db {
this.client,
new RunCommandOperation(this, command, {
...resolveBSONOptions(options),
timeoutMS: options?.timeoutMS,
session: options?.session,
readPreference: options?.readPreference
})
Expand Down
9 changes: 9 additions & 0 deletions src/error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -857,6 +857,15 @@ export class MongoUnexpectedServerResponseError extends MongoRuntimeError {
}
}

/**
* @internal
*/
export class MongoOperationTimeoutError extends MongoRuntimeError {
override get name(): string {
return 'MongoOperationTimeoutError';
}
}

/**
* An error thrown when the user attempts to add options to a cursor that has already been
* initialized
Expand Down
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ export {
MongoNetworkTimeoutError,
MongoNotConnectedError,
MongoOIDCError,
MongoOperationTimeoutError,
MongoParseError,
MongoRuntimeError,
MongoServerClosedError,
Expand Down
2 changes: 2 additions & 0 deletions src/operations/command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ export interface OperationParent {
writeConcern?: WriteConcern;
readPreference?: ReadPreference;
bsonOptions?: BSONSerializeOptions;
timeoutMS?: number;
}

/** @internal */
Expand Down Expand Up @@ -131,6 +132,7 @@ export abstract class CommandOperation<T> extends AbstractOperation<T> {
const options = {
...this.options,
...this.bsonOptions,
timeout: this.timeout,
readPreference: this.readPreference,
session
};
Expand Down
3 changes: 2 additions & 1 deletion src/operations/find.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ export class FindOperation extends CommandOperation<CursorResponse> {
...this.options,
...this.bsonOptions,
documentsReturnedIn: 'firstBatch',
session
session,
timeout: this.timeout
},
this.explain ? ExplainedCursorResponse : CursorResponse
);
Expand Down
8 changes: 8 additions & 0 deletions src/operations/operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { type BSONSerializeOptions, type Document, resolveBSONOptions } from '..
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
import type { Server } from '../sdam/server';
import type { ClientSession } from '../sessions';
import { type Timeout } from '../timeout';
import type { MongoDBNamespace } from '../utils';

export const Aspect = {
Expand Down Expand Up @@ -57,6 +58,11 @@ export abstract class AbstractOperation<TResult = any> {

options: OperationOptions;

/** @internal */
timeout?: Timeout;
/** @internal */
timeoutMS?: number;

[kSession]: ClientSession | undefined;

static aspects?: Set<symbol>;
Expand All @@ -74,6 +80,8 @@ export abstract class AbstractOperation<TResult = any> {
this.options = options;
this.bypassPinningCheck = !!options.bypassPinningCheck;
this.trySecondaryWrite = false;

this.timeoutMS = options.timeoutMS;
}

/** Must match the first key of the command object sent to the server.
Expand Down
9 changes: 7 additions & 2 deletions src/operations/run_command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ export type RunCommandOptions = {
session?: ClientSession;
/** The read preference */
readPreference?: ReadPreferenceLike;
/** @internal */
timeoutMS?: number;
} & BSONSerializeOptions;

/** @internal */
Expand All @@ -39,10 +41,12 @@ export class RunCommandOperation<T = Document> extends AbstractOperation<T> {
{
...this.options,
readPreference: this.readPreference,
session
session,
timeout: this.timeout
},
this.options.responseType
);

return res;
}
}
Expand All @@ -68,7 +72,8 @@ export class RunAdminCommandOperation<T = Document> extends AbstractOperation<T>
const res: TODO_NODE_3286 = await server.command(this.ns, this.command, {
...this.options,
readPreference: this.readPreference,
session
session,
timeout: this.timeout
});
return res;
}
Expand Down
3 changes: 2 additions & 1 deletion src/sdam/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {
this.incrementOperationCount();
if (conn == null) {
try {
conn = await this.pool.checkOut();
conn = await this.pool.checkOut(options);
if (this.loadBalanced && isPinnableCommand(cmd, session)) {
session?.pin(conn);
}
Expand All @@ -336,6 +336,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {
operationError.code === MONGODB_ERROR_CODES.Reauthenticate
) {
await this.pool.reauthenticate(conn);
// TODO(NODE-5682): Implement CSOT support for socket read/write at the connection layer
try {
const res = await conn.command(ns, cmd, finalOptions, responseType);
throwIfWriteConcernError(res);
Expand Down
Loading

0 comments on commit bd8a9f4

Please sign in to comment.