Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(NODE-4783): handle orphaned operation descriptions #3463

Merged
merged 14 commits into from
Nov 16, 2022
32 changes: 30 additions & 2 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
MongoMissingDependencyError,
MongoNetworkError,
MongoNetworkTimeoutError,
MongoRuntimeError,
MongoServerError,
MongoWriteConcernError
} from '../error';
Expand Down Expand Up @@ -68,6 +69,8 @@ const kAutoEncrypter = Symbol('autoEncrypter');
/** @internal */
const kDelayedTimeoutId = Symbol('delayedTimeoutId');

const INVALID_QUEUE_SIZE = 'Connection internal queue contains more than 1 operation description';

/** @internal */
export interface CommandOptions extends BSONSerializeOptions {
command?: boolean;
Expand Down Expand Up @@ -369,7 +372,29 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {

// always emit the message, in case we are streaming
this.emit('message', message);
const operationDescription = this[kQueue].get(message.responseTo);
let operationDescription = this[kQueue].get(message.responseTo);

// Protect against multiplexing.
if (this[kQueue].size > 1) {
dariakp marked this conversation as resolved.
Show resolved Hide resolved
this.onError(new MongoRuntimeError(INVALID_QUEUE_SIZE));
return;
}

if (!operationDescription && this.isMonitoringConnection) {
jyemin marked this conversation as resolved.
Show resolved Hide resolved
// This is how we recover when the initial hello's requestId is not
// the responseTo when hello responses have been skipped:

// Get the first orphaned operation description.
const entry = this[kQueue].entries().next();
dariakp marked this conversation as resolved.
Show resolved Hide resolved
if (entry) {
const [requestId, orphaned]: [number, OperationDescription] = entry.value;
// If the orphaned operation description exists then set it.
operationDescription = orphaned;
// Remove the entry with the bad request id from the queue.
this[kQueue].delete(requestId);
}
}

if (!operationDescription) {
return;
durran marked this conversation as resolved.
Show resolved Hide resolved
}
Expand All @@ -381,7 +406,10 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
// making the `responseTo` change on each response
this[kQueue].delete(message.responseTo);
if ('moreToCome' in message && message.moreToCome) {
// requeue the callback for next synthetic request
// If the operation description check above does find an orphaned
// description and sets the operationDescription then this line will put one
// back in the queue with the correct requestId and will resolve not being able
// to find the next one via the responseTo of the next streaming hello.
this[kQueue].set(message.requestId, operationDescription);
} else if (operationDescription.socketTimeoutOverride) {
this[kStream].setTimeout(this.socketTimeoutMS);
Expand Down
13 changes: 13 additions & 0 deletions test/tools/utils.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { EJSON } from 'bson';
import * as BSON from 'bson';
import { expect } from 'chai';
import { Readable } from 'stream';
import { setTimeout } from 'timers';
import { inspect, promisify } from 'util';

Expand Down Expand Up @@ -354,6 +355,18 @@ export class TestBuilder {
}
}

export function bufferToStream(buffer) {
durran marked this conversation as resolved.
Show resolved Hide resolved
const stream = new Readable();
if (Array.isArray(buffer)) {
buffer.forEach(b => stream.push(b));
} else {
stream.push(buffer);
}

stream.push(null);
return stream;
}

export function generateOpMsgBuffer(document: Document): Buffer {
const header = Buffer.alloc(4 * 4 + 4);

Expand Down
Loading