Skip to content

Commit

Permalink
perf(NODE-5928): consolidate signal use and abort promise wrap
Browse files Browse the repository at this point in the history
  • Loading branch information
nbbeeken committed Feb 15, 2024
1 parent 10a5c5a commit ef9e103
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 185 deletions.
64 changes: 41 additions & 23 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ import { type CancellationToken, TypedEventEmitter } from '../mongo_types';
import type { ReadPreferenceLike } from '../read_preference';
import { applySession, type ClientSession, updateSessionFromResponse } from '../sessions';
import {
abortable,
BufferPool,
calculateDurationInMs,
type Callback,
HostAddress,
maxWireVersion,
type MongoDBNamespace,
now,
promiseWithResolvers,
uuidV4
} from '../utils';
import type { WriteConcern } from '../write_concern';
Expand Down Expand Up @@ -161,15 +161,14 @@ function streamIdentifier(stream: Stream, options: ConnectionOptions): string {
export class Connection extends TypedEventEmitter<ConnectionEvents> {
public id: number | '<monitor>';
public address: string;
public lastHelloMS?: number;
public lastHelloMS = -1;
public serverApi?: ServerApi;
public helloOk?: boolean;
public helloOk = false;
public authContext?: AuthContext;
public delayedTimeoutId: NodeJS.Timeout | null = null;
public generation: number;
public readonly description: Readonly<StreamDescription>;
/**
* @public
* Represents if the connection has been established:
* - TCP handshake
* - TLS negotiated
Expand All @@ -180,15 +179,16 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
public established: boolean;

private lastUseTime: number;
private socketTimeoutMS: number;
private monitorCommands: boolean;
private socket: Stream;
private controller: AbortController;
private messageStream: Readable;
private socketWrite: (buffer: Uint8Array) => Promise<void>;
private clusterTime: Document | null = null;
/** @internal */
override mongoLogger: MongoLogger | undefined;

private readonly socketTimeoutMS: number;
private readonly monitorCommands: boolean;
private readonly socket: Stream;
private readonly controller: AbortController;
private readonly signal: AbortSignal;
private readonly messageStream: Readable;
private readonly socketWrite: (buffer: Uint8Array) => Promise<void>;
private readonly aborted: Promise<never>;

/** @event */
static readonly COMMAND_STARTED = COMMAND_STARTED;
Expand Down Expand Up @@ -221,7 +221,21 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
this.lastUseTime = now();

this.socket = stream;

// TODO: Remove signal from connection layer
this.controller = new AbortController();
const { signal } = this.controller;
this.signal = signal;
const { promise: aborted, reject } = promiseWithResolvers<never>();
aborted.then(undefined, () => null); // Prevent unhandled rejection
this.signal.addEventListener(
'abort',
function onAbort() {
reject(signal.reason);
},
{ once: true }
);
this.aborted = aborted;

this.messageStream = this.socket
.on('error', this.onError.bind(this))
Expand All @@ -232,13 +246,13 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {

const socketWrite = promisify(this.socket.write.bind(this.socket));
this.socketWrite = async buffer => {
return abortable(socketWrite(buffer), { signal: this.controller.signal });
return Promise.race([socketWrite(buffer), this.aborted]);
};
}

/** Indicates that the connection (including underlying TCP socket) has been closed. */
public get closed(): boolean {
return this.controller.signal.aborted;
return this.signal.aborted;
}

public get hello() {
Expand Down Expand Up @@ -407,7 +421,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
}

private async *sendWire(message: WriteProtocolMessageType, options: CommandOptions) {
this.controller.signal.throwIfAborted();
this.throwIfAborted();

if (typeof options.socketTimeoutMS === 'number') {
this.socket.setTimeout(options.socketTimeoutMS);
Expand All @@ -426,7 +440,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
return;
}

this.controller.signal.throwIfAborted();
this.throwIfAborted();

for await (const response of this.readMany()) {
this.socket.setTimeout(0);
Expand All @@ -447,7 +461,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
}

yield document;
this.controller.signal.throwIfAborted();
this.throwIfAborted();

if (typeof options.socketTimeoutMS === 'number') {
this.socket.setTimeout(options.socketTimeoutMS);
Expand Down Expand Up @@ -481,7 +495,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {

let document;
try {
this.controller.signal.throwIfAborted();
this.throwIfAborted();
for await (document of this.sendWire(message, options)) {
if (!Buffer.isBuffer(document) && document.writeConcernError) {
throw new MongoWriteConcernError(document.writeConcernError, document);
Expand Down Expand Up @@ -511,7 +525,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
}

yield document;
this.controller.signal.throwIfAborted();
this.throwIfAborted();
}
} catch (error) {
if (this.shouldEmitAndLogCommand) {
Expand Down Expand Up @@ -554,7 +568,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
command: Document,
options: CommandOptions = {}
): Promise<Document> {
this.controller.signal.throwIfAborted();
this.throwIfAborted();
for await (const document of this.sendCommand(ns, command, options)) {
return document;
}
Expand All @@ -568,16 +582,20 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
replyListener: Callback
) {
const exhaustLoop = async () => {
this.controller.signal.throwIfAborted();
this.throwIfAborted();
for await (const reply of this.sendCommand(ns, command, options)) {
replyListener(undefined, reply);
this.controller.signal.throwIfAborted();
this.throwIfAborted();
}
throw new MongoUnexpectedServerResponseError('Server ended moreToCome unexpectedly');
};
exhaustLoop().catch(replyListener);
}

private throwIfAborted() {
this.signal.throwIfAborted();
}

/**
* @internal
*
Expand Down Expand Up @@ -611,7 +629,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
* Note that `for-await` loops call `return` automatically when the loop is exited.
*/
private async *readMany(): AsyncGenerator<OpMsgResponse | OpQueryResponse> {
for await (const message of onData(this.messageStream, { signal: this.controller.signal })) {
for await (const message of onData(this.messageStream, { signal: this.signal })) {
const response = await decompressResponse(message);
yield response;

Expand Down
30 changes: 0 additions & 30 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1283,36 +1283,6 @@ export function isHostMatch(match: RegExp, host?: string): boolean {
return host && match.test(host.toLowerCase()) ? true : false;
}

/**
* Takes a promise and races it with a promise wrapping the abort event of the optionally provided signal.
* The given promise is _always_ ordered before the signal's abort promise.
* When given an already rejected promise and an already aborted signal, the promise's rejection takes precedence.
*
* @see https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise/race
*
* @param promise - A promise to discard if the signal aborts
* @param options - An options object carrying an optional signal
*/
export async function abortable<T>(
promise: Promise<T>,
{ signal }: { signal: AbortSignal }
): Promise<T> {
const { promise: aborted, reject } = promiseWithResolvers<never>();

function rejectOnAbort() {
reject(signal.reason);
}

if (signal.aborted) rejectOnAbort();
else signal.addEventListener('abort', rejectOnAbort, { once: true });

try {
return await Promise.race([promise, aborted]);
} finally {
signal.removeEventListener('abort', rejectOnAbort);
}
}

export function promiseWithResolvers<T>() {
let resolve!: Parameters<ConstructorParameters<typeof Promise<T>>[0]>[0];
let reject!: Parameters<ConstructorParameters<typeof Promise<T>>[0]>[1];
Expand Down
132 changes: 0 additions & 132 deletions test/unit/utils.test.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import { expect } from 'chai';
import * as sinon from 'sinon';
import { setTimeout } from 'timers';

import {
abortable,
BufferPool,
ByteUtils,
compareObjectId,
Expand All @@ -21,7 +19,6 @@ import {
shuffle,
TimeoutController
} from '../mongodb';
import { sleep } from '../tools/utils';
import { createTimerSandbox } from './timer_sandbox';

describe('driver utils', function () {
Expand Down Expand Up @@ -1077,133 +1074,4 @@ describe('driver utils', function () {
});
});
});

describe('abortable()', () => {
const goodError = new Error('good error');
const badError = new Error('unexpected bad error!');
const expectedValue = "don't panic";

context('always removes the abort listener it attaches', () => {
let controller;
let removeEventListenerSpy;
let addEventListenerSpy;

beforeEach(() => {
controller = new AbortController();
addEventListenerSpy = sinon.spy(controller.signal, 'addEventListener');
removeEventListenerSpy = sinon.spy(controller.signal, 'removeEventListener');
});

afterEach(() => sinon.restore());

const expectListenerCleanup = () => {
expect(addEventListenerSpy).to.have.been.calledOnce;
expect(removeEventListenerSpy).to.have.been.calledOnce;
};

it('when promise rejects', async () => {
await abortable(Promise.reject(goodError), { signal: controller.signal }).catch(e => e);
expectListenerCleanup();
});

it('when promise resolves', async () => {
await abortable(Promise.resolve(expectedValue), { signal: controller.signal });
expectListenerCleanup();
});

it('when signal aborts', async () => {
setTimeout(() => controller.abort(goodError));
await abortable(new Promise(() => null), { signal: controller.signal }).catch(e => e);
expectListenerCleanup();
});
});

context('when given already rejected promise with already aborted signal', () => {
it('returns promise rejection', async () => {
const controller = new AbortController();
const { signal } = controller;
controller.abort(badError);
const result = await abortable(Promise.reject(goodError), { signal }).catch(e => e);
expect(result).to.deep.equal(goodError);
});
});

context('when given already resolved promise with already aborted signal', () => {
it('returns promise resolution', async () => {
const controller = new AbortController();
const { signal } = controller;
controller.abort(badError);
const result = await abortable(Promise.resolve(expectedValue), { signal }).catch(e => e);
expect(result).to.deep.equal(expectedValue);
});
});

context('when given already rejected promise with not yet aborted signal', () => {
it('returns promise rejection', async () => {
const controller = new AbortController();
const { signal } = controller;
const result = await abortable(Promise.reject(goodError), { signal }).catch(e => e);
expect(result).to.deep.equal(goodError);
});
});

context('when given already resolved promise with not yet aborted signal', () => {
it('returns promise resolution', async () => {
const controller = new AbortController();
const { signal } = controller;
const result = await abortable(Promise.resolve(expectedValue), { signal }).catch(e => e);
expect(result).to.deep.equal(expectedValue);
});
});

context('when given unresolved promise with an already aborted signal', () => {
it('returns signal reason', async () => {
const controller = new AbortController();
const { signal } = controller;
controller.abort(goodError);
const result = await abortable(new Promise(() => null), { signal }).catch(e => e);
expect(result).to.deep.equal(goodError);
});
});

context('when given eventually rejecting promise with not yet aborted signal', () => {
const eventuallyReject = async () => {
await sleep(1);
throw goodError;
};

it('returns promise rejection', async () => {
const controller = new AbortController();
const { signal } = controller;
const result = await abortable(eventuallyReject(), { signal }).catch(e => e);
expect(result).to.deep.equal(goodError);
});
});

context('when given eventually resolving promise with not yet aborted signal', () => {
const eventuallyResolve = async () => {
await sleep(1);
return expectedValue;
};

it('returns promise resolution', async () => {
const controller = new AbortController();
const { signal } = controller;
const result = await abortable(eventuallyResolve(), { signal }).catch(e => e);
expect(result).to.deep.equal(expectedValue);
});
});

context('when given unresolved promise with eventually aborted signal', () => {
it('returns signal reason', async () => {
const controller = new AbortController();
const { signal } = controller;

setTimeout(() => controller.abort(goodError), 1);

const result = await abortable(new Promise(() => null), { signal }).catch(e => e);
expect(result).to.deep.equal(goodError);
});
});
});
});

0 comments on commit ef9e103

Please sign in to comment.