Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
nbbeeken committed Feb 15, 2022
1 parent a7b18e0 commit 9d7894b
Show file tree
Hide file tree
Showing 42 changed files with 623 additions and 585 deletions.
6 changes: 6 additions & 0 deletions docs/errors.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
- [`MongoDriverError`](#MongoDriverError)
- [`MongoAPIError`](#MongoAPIError)
- [`MongoRuntimeError`](#MongoRuntimeError)
- [`MongoUnexpectedServerResponseError`](#MongoUnexpectedServerResponseError)
- [`MongoNetworkError`](#MongoNetworkError)
- [`MongoServerError`](#MongoServerError)
- [`MongoSystemError`](#MongoSystemError)
Expand Down Expand Up @@ -90,6 +91,11 @@ This class should **never** be directly instantiated.
| **MongoChangeStreamError** | Thrown when an error is encountered when operating on a ChangeStream. |
| **MongoGridFSStreamError** | Thrown when an unexpected state is reached when operating on a GridFS Stream. |
| **MongoGridFSChunkError** | Thrown when a malformed or invalid chunk is encountered when reading from a GridFS Stream. |
| **MongoUnexpectedServerResponseError** | Thrown when the driver receives a **parsable** response it did not expect from the server |

### MongoUnexpectedServerResponseError

xxx

### `MongoNetworkError`

Expand Down
51 changes: 31 additions & 20 deletions global.d.ts
Original file line number Diff line number Diff line change
@@ -1,32 +1,40 @@
import { OneOrMore } from './src/mongo_types';
import type { TestConfiguration } from './test/tools/runner/config';

type WithExclusion<T extends string> = `!${T}`
type WithExclusion<T extends string> = `!${T}`;
/** Defined in test/tools/runner/filters/mongodb_topology_filter.js (topologyTypeToString) */
type TopologyTypes = 'single' | 'replicaset' | 'sharded' | 'load-balanced';
type TopologyTypeRequirement = OneOrMore<TopologyTypes> | OneOrMore<WithExclusion<TopologyTypes>>
type TopologyTypeRequirement = OneOrMore<TopologyTypes> | OneOrMore<WithExclusion<TopologyTypes>>;

declare global {
interface MongoDBMetadataUI {
requires?: {
topology?: TopologyTypeRequirement;
mongodb?: string;
os?: NodeJS.Platform | `!${NodeJS.Platform}`;
apiVersion?: '1';
clientSideEncryption?: boolean;
serverless?: 'forbid' | 'allow' | 'require';
auth: 'enabled' | 'disabled'
};
interface MongoDBMetadataUI {
requires?: {
topology?: TopologyTypeRequirement;
mongodb?: string;
os?: NodeJS.Platform | `!${NodeJS.Platform}`;
apiVersion?: '1';
clientSideEncryption?: boolean;
serverless?: 'forbid' | 'allow' | 'require';
auth: 'enabled' | 'disabled';
};

sessions?: {
skipLeakTests?: boolean;
};
}
sessions?: {
skipLeakTests?: boolean;
};
}

interface MetadataAndTest<Fn> {
metadata: MongoDBMetadataUI;
test: Fn;
}

namespace Chai {
interface Assertion {
/** @deprecated Used only by the legacy spec runner, the unified runner implements the unified spec expectations */
matchMongoSpec: (anything: any) => Chai.Assertion;
}
}

interface MetadataAndTest<Fn> {
metadata: MongoDBMetadataUI;
test: Fn;
}
namespace Mocha {
interface TestFunction {
(title: string, metadata: MongoDBMetadataUI, fn: Mocha.Func): Mocha.Test;
Expand All @@ -42,6 +50,9 @@ interface MetadataAndTest<Fn> {

interface Test {
metadata: MongoDBMetadataUI;

/** @deprecated Attach spec to a test if you need access to it in a beforeEach hook */
spec: Record<string, any>;
}

interface Runnable {
Expand Down
8 changes: 6 additions & 2 deletions src/change_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import Denque = require('denque');
import type { Readable } from 'stream';

import type { Document, Timestamp } from './bson';
import { MONGODB_WIRE_VERSION } from './cmap/wire_protocol/constants';
import { Collection } from './collection';
import {
AbstractCursor,
Expand Down Expand Up @@ -462,7 +463,10 @@ export class ChangeStreamCursor<TSchema extends Document = Document> extends Abs
const resumeKey =
this.options.startAfter && !this.hasReceived ? 'startAfter' : 'resumeAfter';
Reflect.set(result, resumeKey, this.resumeToken);
} else if (this.startAtOperationTime && maxWireVersion(this.server) >= 7) {
} else if (
this.startAtOperationTime &&
maxWireVersion(this.server) >= MONGODB_WIRE_VERSION.REPLICA_SET_TRANSACTIONS
) {
result.startAtOperationTime = this.startAtOperationTime;
}
}
Expand Down Expand Up @@ -513,7 +517,7 @@ export class ChangeStreamCursor<TSchema extends Document = Document> extends Abs
this.startAtOperationTime == null &&
this.resumeAfter == null &&
this.startAfter == null &&
maxWireVersion(server) >= 7
maxWireVersion(server) >= MONGODB_WIRE_VERSION.REPLICA_SET_TRANSACTIONS
) {
this.startAtOperationTime = response.operationTime;
}
Expand Down
3 changes: 2 additions & 1 deletion src/cmap/auth/mongo_credentials.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import type { Document } from '../../bson';
import { MongoAPIError, MongoMissingCredentialsError } from '../../error';
import { emitWarningOnce } from '../../utils';
import { MONGODB_WIRE_VERSION } from '../wire_protocol/constants';
import { AUTH_MECHS_AUTH_SRC_EXTERNAL, AuthMechanism } from './providers';

// https://github.com/mongodb/specifications/blob/master/source/auth/auth.rst
Expand All @@ -16,7 +17,7 @@ function getDefaultAuthMechanism(hello?: Document): AuthMechanism {
}

// Fallback to legacy selection method. If wire version >= 3, use scram-sha-1
if (hello.maxWireVersion >= 3) {
if (hello.maxWireVersion >= MONGODB_WIRE_VERSION.RELEASE_2_7_7) {
return AuthMechanism.MONGODB_SCRAM_SHA1;
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/cmap/auth/mongodb_aws.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
MongoRuntimeError
} from '../../error';
import { Callback, maxWireVersion, ns } from '../../utils';
import { MONGODB_WIRE_VERSION } from '../wire_protocol/constants';
import { AuthContext, AuthProvider } from './auth_provider';
import { MongoCredentials } from './mongo_credentials';
import { AuthMechanism } from './providers';
Expand Down Expand Up @@ -44,7 +45,7 @@ export class MongoDBAWS extends AuthProvider {
}
const { sign } = aws4;

if (maxWireVersion(connection) < 9) {
if (maxWireVersion(connection) < MONGODB_WIRE_VERSION.RESUMABLE_INITIAL_SYNC) {
callback(
new MongoCompatibilityError(
'MONGODB-AWS authentication requires MongoDB version 4.4 or later'
Expand Down
90 changes: 59 additions & 31 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import {
import type { Stream } from './connect';
import { MessageStream, OperationDescription } from './message_stream';
import { StreamDescription, StreamDescriptionOptions } from './stream_description';
import { MONGODB_WIRE_VERSION } from './wire_protocol/constants';
import { applyCommonQueryOptions, getReadPreference, isSharded } from './wire_protocol/shared';

/** @internal */
Expand Down Expand Up @@ -109,8 +110,10 @@ export interface CommandOptions extends BSONSerializeOptions {
noResponse?: boolean;
omitReadPreference?: boolean;

// FIXME: NODE-2802
willRetryWrite?: boolean;
// FIXME(NODE-2802): Currently the CommandOptions take a property willRetryWrite which is a hint from executeOperation that the txnNum should be applied to this command.
// Applying a session to a command should happen as part of command construction, most likely in the CommandOperation#executeCommand method,
// where we have access to the details we need to determine if a txnNum should also be applied.
willRetryWrite?: true;

// FIXME: NODE-2781
writeConcern?: WriteConcernOptions | WriteConcern | W;
Expand Down Expand Up @@ -248,9 +251,35 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
/* ignore errors, listen to `close` instead */
});

this[kMessageStream].on('error', error => this.handleIssue({ destroy: error }));
stream.on('close', () => this.handleIssue({ isClose: true }));
stream.on('timeout', () => this.handleIssue({ isTimeout: true, destroy: true }));
const eventHandlerFactory = (event: 'error' | 'close' | 'timeout') =>
[
event,
(error?: Error) => {
if (this.closed) {
return;
}
this.closed = true;

switch (event) {
case 'error':
this.onError(error);
break;
case 'close':
this.onClose();
break;
case 'timeout':
this.onTimeout();
break;
}

this[kQueue].clear();
this.emit(Connection.CLOSE);
}
] as const;

this[kMessageStream].on(...eventHandlerFactory('error'));
stream.on(...eventHandlerFactory('close'));
stream.on(...eventHandlerFactory('timeout'));

// hook the message stream up to the passed in stream
stream.pipe(this[kMessageStream]);
Expand Down Expand Up @@ -306,33 +335,29 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
this[kLastUseTime] = now();
}

handleIssue(issue: { isTimeout?: boolean; isClose?: boolean; destroy?: boolean | Error }): void {
if (this.closed) {
return;
}
onTimeout() {
this[kStream].destroy();
const beforeHandshake = {
beforeHandshake: this.hello == null
};
const msg = `connection ${this.id} to ${this.address} timed out`;

if (issue.destroy) {
this[kStream].destroy(typeof issue.destroy === 'boolean' ? undefined : issue.destroy);
for (const [, op] of this[kQueue]) {
op.cb(new MongoNetworkTimeoutError(msg, beforeHandshake));
}
}

this.closed = true;

onClose() {
for (const [, op] of this[kQueue]) {
if (issue.isTimeout) {
op.cb(
new MongoNetworkTimeoutError(`connection ${this.id} to ${this.address} timed out`, {
beforeHandshake: this.hello == null
})
);
} else if (issue.isClose) {
op.cb(new MongoNetworkError(`connection ${this.id} to ${this.address} closed`));
} else {
op.cb(typeof issue.destroy === 'boolean' ? undefined : issue.destroy);
}
op.cb(new MongoNetworkError(`connection ${this.id} to ${this.address} closed`));
}
}

this[kQueue].clear();
this.emit(Connection.CLOSE);
onError(error?: Error) {
this[kStream].destroy(error);
for (const [, op] of this[kQueue]) {
op.cb(error);
}
}

destroy(): void;
Expand Down Expand Up @@ -541,7 +566,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
return;
}

if (wireVersion < 4) {
if (wireVersion < MONGODB_WIRE_VERSION.FIND_COMMAND) {
const getMoreOp = new GetMore(ns.toString(), cursorId, { numberToReturn: options.batchSize });
const queryOptions = applyCommonQueryOptions(
{},
Expand Down Expand Up @@ -595,7 +620,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
throw new MongoRuntimeError(`Invalid list of cursor ids provided: ${cursorIds}`);
}

if (maxWireVersion(this) < 4) {
if (maxWireVersion(this) < MONGODB_WIRE_VERSION.FIND_COMMAND) {
try {
write(
this,
Expand Down Expand Up @@ -653,12 +678,12 @@ export class CryptoConnection extends Connection {
}

const serverWireVersion = maxWireVersion(this);
if (serverWireVersion === 0) {
if (serverWireVersion === MONGODB_WIRE_VERSION.UNKNOWN) {
// This means the initial handshake hasn't happened yet
return super.command(ns, cmd, options, callback);
}

if (serverWireVersion < 8) {
if (serverWireVersion < MONGODB_WIRE_VERSION.SHARDED_TRANSACTIONS) {
callback(
new MongoCompatibilityError('Auto-encryption requires a minimum MongoDB version of 4.2')
);
Expand Down Expand Up @@ -695,7 +720,10 @@ function supportsOpMsg(conn: Connection) {
return false;
}

return maxWireVersion(conn) >= 6 && !description.__nodejs_mock_server__;
return (
maxWireVersion(conn) >= MONGODB_WIRE_VERSION.SUPPORTS_OP_MSG &&
!description.__nodejs_mock_server__ // mock server does not support OP_MSG
);
}

function messageHandler(conn: Connection) {
Expand Down
3 changes: 3 additions & 0 deletions src/cmap/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ import type { ConnectionPool } from './connection_pool';

/**
* An error indicating a connection pool is closed
*
* @public
*
* @category Error
*/
export class PoolClosedError extends MongoDriverError {
Expand Down
51 changes: 49 additions & 2 deletions src/cmap/wire_protocol/constants.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,54 @@
/**
* All known MongoDB Wire versions, used to determine basic feature support
* - see the [server enum](https://github.com/mongodb/mongo/blob/fe4cf6134b16f102591053d6f4fe11e5cc0eb3ec/src/mongo/db/wire_version.h#L57)
* - (note: the is a link to a commit so you can find the file, you should then check the main branch)
*/
export const MONGODB_WIRE_VERSION = Object.freeze({
/** A helper wire version, 0 means no information about what is supported on the server is known */
UNKNOWN: 0,
/** Everything before we started tracking. */
RELEASE_2_4_AND_BEFORE: 0,
/** The aggregation command may now be requested to return cursors. */
AGG_RETURNS_CURSORS: 1,
/** insert, update, and delete batch command */
BATCH_COMMANDS: 2,
/** support SCRAM-SHA1, listIndexes, listCollections, new explain */
RELEASE_2_7_7: 3,
/** Support find and getMore commands, as well as OP_COMMAND in mongod (but not mongos). */
FIND_COMMAND: 4,
/** Supports all write commands take a write concern. */
COMMANDS_ACCEPT_WRITE_CONCERN: 5,
/** Supports the new OP_MSG wireprotocol (3.6+). */
SUPPORTS_OP_MSG: 6,
/** Supports replica set transactions (4.0+). */
REPLICA_SET_TRANSACTIONS: 7,
/** Supports sharded transactions (4.2+). */
SHARDED_TRANSACTIONS: 8,
/** Supports resumable initial sync (4.4+). */
RESUMABLE_INITIAL_SYNC: 9,
/** Supports features available from 4.7 and onwards. */
WIRE_VERSION_47: 10,
/** Supports features available from 4.8 and onwards. */
WIRE_VERSION_48: 11,
/**
* Supports features available from 4.9 and onwards.
* - EstimatedDocumentCountOperation can run as an aggregation
*/
WIRE_VERSION_49: 12,
/**
* Supports features available from 5.0 and onwards.
* - Writes to secondaries $out/$merge
* - Snapshot reads
*/
WIRE_VERSION_50: 13,
/** Supports features available from 5.1 and onwards. */
WIRE_VERSION_51: 14
} as const);

export const MIN_SUPPORTED_SERVER_VERSION = '3.6';
export const MAX_SUPPORTED_SERVER_VERSION = '5.1';
export const MIN_SUPPORTED_WIRE_VERSION = 6;
export const MAX_SUPPORTED_WIRE_VERSION = 14;
export const MIN_SUPPORTED_WIRE_VERSION = MONGODB_WIRE_VERSION.SUPPORTS_OP_MSG;
export const MAX_SUPPORTED_WIRE_VERSION = MONGODB_WIRE_VERSION.WIRE_VERSION_51;
export const OP_REPLY = 1;
export const OP_UPDATE = 2001;
export const OP_INSERT = 2002;
Expand Down
Loading

0 comments on commit 9d7894b

Please sign in to comment.