Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(NODE-3470): retry selects another mongos #3963

Merged
merged 5 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions src/operations/execute_operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
import type { MongoClient } from '../mongo_client';
import { ReadPreference } from '../read_preference';
import type { Server } from '../sdam/server';
import type { ServerDescription } from '../sdam/server_description';
import {
sameServerSelector,
secondaryWritableServerSelector,
Expand Down Expand Up @@ -183,7 +184,8 @@ export async function executeOperation<
return await retryOperation(operation, operationError, {
session,
topology,
selector
selector,
previousServer: server.description
});
}
throw operationError;
Expand All @@ -199,6 +201,7 @@ type RetryOptions = {
session: ClientSession;
topology: Topology;
selector: ReadPreference | ServerSelector;
previousServer: ServerDescription;
};

async function retryOperation<
Expand All @@ -207,7 +210,7 @@ async function retryOperation<
>(
operation: T,
originalError: MongoError,
{ session, topology, selector }: RetryOptions
{ session, topology, selector, previousServer }: RetryOptions
): Promise<TResult> {
const isWriteOperation = operation.hasAspect(Aspect.WRITE_OPERATION);
const isReadOperation = operation.hasAspect(Aspect.READ_OPERATION);
Expand Down Expand Up @@ -243,7 +246,8 @@ async function retryOperation<
// select a new server, and attempt to retry the operation
const server = await topology.selectServerAsync(selector, {
session,
operationName: operation.commandName
operationName: operation.commandName,
previousServer
});

if (isWriteOperation && !supportsRetryableWrites(server)) {
Expand Down
19 changes: 13 additions & 6 deletions src/sdam/server_selection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ export const MIN_SECONDARY_WRITE_WIRE_VERSION = 13;
/** @internal */
export type ServerSelector = (
topologyDescription: TopologyDescription,
servers: ServerDescription[]
servers: ServerDescription[],
deprioritized?: ServerDescription[]
) => ServerDescription[];

/**
Expand Down Expand Up @@ -266,7 +267,8 @@ export function readPreferenceServerSelector(readPreference: ReadPreference): Se

return (
topologyDescription: TopologyDescription,
servers: ServerDescription[]
servers: ServerDescription[],
deprioritized: ServerDescription[] = []
): ServerDescription[] => {
const commonWireVersion = topologyDescription.commonWireVersion;
if (
Expand All @@ -287,13 +289,18 @@ export function readPreferenceServerSelector(readPreference: ReadPreference): Se
return [];
}

if (
topologyDescription.type === TopologyType.Single ||
topologyDescription.type === TopologyType.Sharded
) {
if (topologyDescription.type === TopologyType.Single) {
return latencyWindowReducer(topologyDescription, servers.filter(knownFilter));
}

if (topologyDescription.type === TopologyType.Sharded) {
const filtered = servers.filter(server => {
return !deprioritized.includes(server);
});
const selectable = filtered.length > 0 ? filtered : deprioritized;
return latencyWindowReducer(topologyDescription, selectable.filter(knownFilter));
}

const mode = readPreference.mode;
if (mode === ReadPreference.PRIMARY) {
return servers.filter(primaryFilter);
Expand Down
12 changes: 10 additions & 2 deletions src/sdam/topology.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ export interface ServerSelectionRequest {
timeoutController: TimeoutController;
operationName: string;
waitingLogged: boolean;
previousServer?: ServerDescription;
}

/** @internal */
Expand Down Expand Up @@ -175,6 +176,7 @@ export interface SelectServerOptions {
serverSelectionTimeoutMS?: number;
session?: ClientSession;
operationName: string;
previousServer?: ServerDescription;
baileympearson marked this conversation as resolved.
Show resolved Hide resolved
}

/** @public */
Expand Down Expand Up @@ -598,7 +600,8 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
timeoutController: new TimeoutController(options.serverSelectionTimeoutMS),
startTime: now(),
operationName: options.operationName,
waitingLogged: false
waitingLogged: false,
previousServer: options.previousServer
};

waitQueueMember.timeoutController.signal.addEventListener('abort', () => {
Expand Down Expand Up @@ -930,8 +933,13 @@ function processWaitQueue(topology: Topology) {
let selectedDescriptions;
try {
const serverSelector = waitQueueMember.serverSelector;
const previousServer = waitQueueMember.previousServer;
selectedDescriptions = serverSelector
? serverSelector(topology.description, serverDescriptions)
? serverSelector(
topology.description,
serverDescriptions,
previousServer ? [previousServer] : []
alenakhineika marked this conversation as resolved.
Show resolved Hide resolved
)
: serverDescriptions;
} catch (e) {
waitQueueMember.timeoutController.clear();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
import { expect } from 'chai';

import type { CommandFailedEvent, CommandSucceededEvent } from '../../mongodb';

const TEST_METADATA = { requires: { mongodb: '>=4.2.9', topology: 'sharded' } };
const FAIL_COMMAND = {
configureFailPoint: 'failCommand',
mode: { times: 1 },
data: {
failCommands: ['find'],
errorCode: 6,
closeConnection: true
}
};
const DISABLE_FAIL_COMMAND = {
configureFailPoint: 'failCommand',
mode: 'off',
data: {
failCommands: ['find'],
errorCode: 6,
closeConnection: true
}
};

describe('Server Selection Sharded Retryable Reads Prose tests', function () {
context('Retryable Reads Are Retried on a Different mongos if One is Available', function () {
const commandFailedEvents: CommandFailedEvent[] = [];
let client;
let utilClientOne;
let utilClientTwo;

// This test MUST be executed against a sharded cluster that has at least two
// mongos instances.
// 1. Ensure that a test is run against a sharded cluster that has at least two
// mongoses. If there are more than two mongoses in the cluster, pick two to
// test against.
beforeEach(async function () {
const uri = this.configuration.url({
monitorCommands: true,
useMultipleMongoses: true
});

// 3. Create a client with ``retryReads=true`` that connects to the cluster,
// providing the two selected mongoses as seeds.
client = this.configuration.newClient(uri, {
monitorCommands: true,
retryReads: true
});
client.on('commandFailed', event => {
commandFailedEvents.push(event);
});
await client.connect();
const seeds = client.topology.s.seedlist.map(address => address.toString());

// 2. Create a client per mongos using the direct connection, and configure the
// following fail points on each mongos::
// {
// configureFailPoint: "failCommand",
// mode: { times: 1 },
// data: {
// failCommands: ["find"],
// errorCode: 6,
// closeConnection: true
// }
// }
utilClientOne = this.configuration.newClient(`mongodb://${seeds[0]}`, {
directConnection: true
});
utilClientTwo = this.configuration.newClient(`mongodb://${seeds[1]}`, {
directConnection: true
});
await utilClientOne.db('admin').command(FAIL_COMMAND);
await utilClientTwo.db('admin').command(FAIL_COMMAND);
});

afterEach(async function () {
await client?.close();
await utilClientOne.db('admin').command(DISABLE_FAIL_COMMAND);
await utilClientTwo.db('admin').command(DISABLE_FAIL_COMMAND);
await utilClientOne?.close();
await utilClientTwo?.close();
});

// 4. Enable command monitoring, and execute a ``find`` command that is
// supposed to fail on both mongoses.
// 5. Asserts that there were failed command events from each mongos.
// 6. Disable the fail points.
it('retries on a different mongos', TEST_METADATA, async function () {
await client
.db('test')
.collection('test')
.find()
.toArray()
.catch(() => null);
expect(commandFailedEvents[0].address).to.not.equal(commandFailedEvents[1].address);
});
});

// 1. Ensure that a test is run against a sharded cluster. If there are multiple
// mongoses in the cluster, pick one to test against.
context('Retryable Reads Are Retried on the Same mongos if No Others are Available', function () {
const commandFailedEvents: CommandFailedEvent[] = [];
const commandSucceededEvents: CommandSucceededEvent[] = [];
let client;
let utilClient;

beforeEach(async function () {
const uri = this.configuration.url({
monitorCommands: true
});

// 3. Create a client with ``retryReads=true`` that connects to the cluster,
// providing the selected mongos as the seed.
client = this.configuration.newClient(uri, {
monitorCommands: true,
retryReads: true
});
client.on('commandFailed', event => {
commandFailedEvents.push(event);
});
client.on('commandSucceeded', event => {
commandSucceededEvents.push(event);
});

// 2. Create a client that connects to the mongos using the direct connection,
// and configure the following fail point on the mongos::
// {
// configureFailPoint: "failCommand",
// mode: { times: 1 },
// data: {
// failCommands: ["find"],
// errorCode: 6,
// closeConnection: true
// }
// }
utilClient = this.configuration.newClient(uri, {
directConnection: true
});
await utilClient.db('admin').command(FAIL_COMMAND);
});

afterEach(async function () {
await client?.close();
await utilClient?.db('admin').command(DISABLE_FAIL_COMMAND);
await utilClient?.close();
});

// 4. Enable command monitoring, and execute a ``find`` command.
// 5. Asserts that there was a failed command and a successful command event.
// 6. Disable the fail point.
it('retries on the same mongos', TEST_METADATA, async function () {
await client
.db('test')
.collection('test')
.find()
.toArray()
.catch(() => null);
expect(commandFailedEvents[0].address).to.equal(commandSucceededEvents[0].address);
});
});
});
Loading