diff --git a/src/operations/execute_operation.ts b/src/operations/execute_operation.ts index 4fc3b8acfb..21d8b7e11b 100644 --- a/src/operations/execute_operation.ts +++ b/src/operations/execute_operation.ts @@ -18,6 +18,7 @@ import { import type { MongoClient } from '../mongo_client'; import { ReadPreference } from '../read_preference'; import type { Server } from '../sdam/server'; +import type { ServerDescription } from '../sdam/server_description'; import { sameServerSelector, secondaryWritableServerSelector, @@ -183,7 +184,8 @@ export async function executeOperation< return await retryOperation(operation, operationError, { session, topology, - selector + selector, + previousServer: server.description }); } throw operationError; @@ -199,6 +201,7 @@ type RetryOptions = { session: ClientSession; topology: Topology; selector: ReadPreference | ServerSelector; + previousServer: ServerDescription; }; async function retryOperation< @@ -207,7 +210,7 @@ async function retryOperation< >( operation: T, originalError: MongoError, - { session, topology, selector }: RetryOptions + { session, topology, selector, previousServer }: RetryOptions ): Promise { const isWriteOperation = operation.hasAspect(Aspect.WRITE_OPERATION); const isReadOperation = operation.hasAspect(Aspect.READ_OPERATION); @@ -243,7 +246,8 @@ async function retryOperation< // select a new server, and attempt to retry the operation const server = await topology.selectServerAsync(selector, { session, - operationName: operation.commandName + operationName: operation.commandName, + previousServer }); if (isWriteOperation && !supportsRetryableWrites(server)) { diff --git a/src/sdam/server_selection.ts b/src/sdam/server_selection.ts index c7c2b563cd..2ffae44241 100644 --- a/src/sdam/server_selection.ts +++ b/src/sdam/server_selection.ts @@ -14,7 +14,8 @@ export const MIN_SECONDARY_WRITE_WIRE_VERSION = 13; /** @internal */ export type ServerSelector = ( topologyDescription: TopologyDescription, - servers: ServerDescription[] + servers: ServerDescription[], + deprioritized?: ServerDescription[] ) => ServerDescription[]; /** @@ -266,7 +267,8 @@ export function readPreferenceServerSelector(readPreference: ReadPreference): Se return ( topologyDescription: TopologyDescription, - servers: ServerDescription[] + servers: ServerDescription[], + deprioritized: ServerDescription[] = [] ): ServerDescription[] => { const commonWireVersion = topologyDescription.commonWireVersion; if ( @@ -287,13 +289,18 @@ export function readPreferenceServerSelector(readPreference: ReadPreference): Se return []; } - if ( - topologyDescription.type === TopologyType.Single || - topologyDescription.type === TopologyType.Sharded - ) { + if (topologyDescription.type === TopologyType.Single) { return latencyWindowReducer(topologyDescription, servers.filter(knownFilter)); } + if (topologyDescription.type === TopologyType.Sharded) { + const filtered = servers.filter(server => { + return !deprioritized.includes(server); + }); + const selectable = filtered.length > 0 ? filtered : deprioritized; + return latencyWindowReducer(topologyDescription, selectable.filter(knownFilter)); + } + const mode = readPreference.mode; if (mode === ReadPreference.PRIMARY) { return servers.filter(primaryFilter); diff --git a/src/sdam/topology.ts b/src/sdam/topology.ts index f8378cc95f..e5c318cb7b 100644 --- a/src/sdam/topology.ts +++ b/src/sdam/topology.ts @@ -110,6 +110,7 @@ export interface ServerSelectionRequest { timeoutController: TimeoutController; operationName: string; waitingLogged: boolean; + previousServer?: ServerDescription; } /** @internal */ @@ -175,6 +176,7 @@ export interface SelectServerOptions { serverSelectionTimeoutMS?: number; session?: ClientSession; operationName: string; + previousServer?: ServerDescription; } /** @public */ @@ -598,7 +600,8 @@ export class Topology extends TypedEventEmitter { timeoutController: new TimeoutController(options.serverSelectionTimeoutMS), startTime: now(), operationName: options.operationName, - waitingLogged: false + waitingLogged: false, + previousServer: options.previousServer }; waitQueueMember.timeoutController.signal.addEventListener('abort', () => { @@ -930,8 +933,13 @@ function processWaitQueue(topology: Topology) { let selectedDescriptions; try { const serverSelector = waitQueueMember.serverSelector; + const previousServer = waitQueueMember.previousServer; selectedDescriptions = serverSelector - ? serverSelector(topology.description, serverDescriptions) + ? serverSelector( + topology.description, + serverDescriptions, + previousServer ? [previousServer] : [] + ) : serverDescriptions; } catch (e) { waitQueueMember.timeoutController.clear(); diff --git a/test/integration/server-selection/server_selection.prose.sharded_retryable_reads.test.ts b/test/integration/server-selection/server_selection.prose.sharded_retryable_reads.test.ts new file mode 100644 index 0000000000..3b72fe93eb --- /dev/null +++ b/test/integration/server-selection/server_selection.prose.sharded_retryable_reads.test.ts @@ -0,0 +1,161 @@ +import { expect } from 'chai'; + +import type { CommandFailedEvent, CommandSucceededEvent } from '../../mongodb'; + +const TEST_METADATA = { requires: { mongodb: '>=4.2.9', topology: 'sharded' } }; +const FAIL_COMMAND = { + configureFailPoint: 'failCommand', + mode: { times: 1 }, + data: { + failCommands: ['find'], + errorCode: 6, + closeConnection: true + } +}; +const DISABLE_FAIL_COMMAND = { + configureFailPoint: 'failCommand', + mode: 'off', + data: { + failCommands: ['find'], + errorCode: 6, + closeConnection: true + } +}; + +describe('Server Selection Sharded Retryable Reads Prose tests', function () { + context('Retryable Reads Are Retried on a Different mongos if One is Available', function () { + const commandFailedEvents: CommandFailedEvent[] = []; + let client; + let utilClientOne; + let utilClientTwo; + + // This test MUST be executed against a sharded cluster that has at least two + // mongos instances. + // 1. Ensure that a test is run against a sharded cluster that has at least two + // mongoses. If there are more than two mongoses in the cluster, pick two to + // test against. + beforeEach(async function () { + const uri = this.configuration.url({ + monitorCommands: true, + useMultipleMongoses: true + }); + + // 3. Create a client with ``retryReads=true`` that connects to the cluster, + // providing the two selected mongoses as seeds. + client = this.configuration.newClient(uri, { + monitorCommands: true, + retryReads: true + }); + client.on('commandFailed', event => { + commandFailedEvents.push(event); + }); + await client.connect(); + const seeds = client.topology.s.seedlist.map(address => address.toString()); + + // 2. Create a client per mongos using the direct connection, and configure the + // following fail points on each mongos:: + // { + // configureFailPoint: "failCommand", + // mode: { times: 1 }, + // data: { + // failCommands: ["find"], + // errorCode: 6, + // closeConnection: true + // } + // } + utilClientOne = this.configuration.newClient(`mongodb://${seeds[0]}`, { + directConnection: true + }); + utilClientTwo = this.configuration.newClient(`mongodb://${seeds[1]}`, { + directConnection: true + }); + await utilClientOne.db('admin').command(FAIL_COMMAND); + await utilClientTwo.db('admin').command(FAIL_COMMAND); + }); + + afterEach(async function () { + await client?.close(); + await utilClientOne.db('admin').command(DISABLE_FAIL_COMMAND); + await utilClientTwo.db('admin').command(DISABLE_FAIL_COMMAND); + await utilClientOne?.close(); + await utilClientTwo?.close(); + }); + + // 4. Enable command monitoring, and execute a ``find`` command that is + // supposed to fail on both mongoses. + // 5. Asserts that there were failed command events from each mongos. + // 6. Disable the fail points. + it('retries on a different mongos', TEST_METADATA, async function () { + await client + .db('test') + .collection('test') + .find() + .toArray() + .catch(() => null); + expect(commandFailedEvents[0].address).to.not.equal(commandFailedEvents[1].address); + }); + }); + + // 1. Ensure that a test is run against a sharded cluster. If there are multiple + // mongoses in the cluster, pick one to test against. + context('Retryable Reads Are Retried on the Same mongos if No Others are Available', function () { + const commandFailedEvents: CommandFailedEvent[] = []; + const commandSucceededEvents: CommandSucceededEvent[] = []; + let client; + let utilClient; + + beforeEach(async function () { + const uri = this.configuration.url({ + monitorCommands: true + }); + + // 3. Create a client with ``retryReads=true`` that connects to the cluster, + // providing the selected mongos as the seed. + client = this.configuration.newClient(uri, { + monitorCommands: true, + retryReads: true + }); + client.on('commandFailed', event => { + commandFailedEvents.push(event); + }); + client.on('commandSucceeded', event => { + commandSucceededEvents.push(event); + }); + + // 2. Create a client that connects to the mongos using the direct connection, + // and configure the following fail point on the mongos:: + // { + // configureFailPoint: "failCommand", + // mode: { times: 1 }, + // data: { + // failCommands: ["find"], + // errorCode: 6, + // closeConnection: true + // } + // } + utilClient = this.configuration.newClient(uri, { + directConnection: true + }); + await utilClient.db('admin').command(FAIL_COMMAND); + }); + + afterEach(async function () { + await client?.close(); + await utilClient?.db('admin').command(DISABLE_FAIL_COMMAND); + await utilClient?.close(); + }); + + // 4. Enable command monitoring, and execute a ``find`` command. + // 5. Asserts that there was a failed command and a successful command event. + // 6. Disable the fail point. + it('retries on the same mongos', TEST_METADATA, async function () { + await client + .db('test') + .collection('test') + .find() + .toArray() + .catch(() => null); + expect(commandFailedEvents[0].address).to.equal(commandSucceededEvents[0].address); + }); + }); +}); diff --git a/test/integration/server-selection/server_selection.prose.sharded_retryable_writes.test.ts b/test/integration/server-selection/server_selection.prose.sharded_retryable_writes.test.ts new file mode 100644 index 0000000000..8b1ea113df --- /dev/null +++ b/test/integration/server-selection/server_selection.prose.sharded_retryable_writes.test.ts @@ -0,0 +1,159 @@ +import { expect } from 'chai'; + +import type { CommandFailedEvent, CommandSucceededEvent } from '../../mongodb'; + +const TEST_METADATA = { requires: { mongodb: '>=4.3.1', topology: 'sharded' } }; +const FAIL_COMMAND = { + configureFailPoint: 'failCommand', + mode: { times: 1 }, + data: { + failCommands: ['insert'], + errorCode: 6, + errorLabels: ['RetryableWriteError'], + closeConnection: true + } +}; +const DISABLE_FAIL_COMMAND = { + configureFailPoint: 'failCommand', + mode: 'off', + data: { + failCommands: ['find'], + errorCode: 6, + errorLabels: ['RetryableWriteError'], + closeConnection: true + } +}; + +describe('Server Selection Sharded Retryable Writes Prose tests', function () { + context( + 'Test that in a sharded cluster writes are retried on a different mongos if one available', + function () { + const commandFailedEvents: CommandFailedEvent[] = []; + let client; + let utilClientOne; + let utilClientTwo; + // This test MUST be executed against a sharded cluster that has at least two mongos instances. + // This test requires MongoDB 4.3.1+ for the errorLabels fail point option. + // Ensure that a test is run against a sharded cluster that has at least two mongoses. If there are more than two mongoses in the cluster, pick two to test against. + beforeEach(async function () { + const uri = this.configuration.url({ + monitorCommands: true, + useMultipleMongoses: true + }); + + // Create a client with retryWrites=true that connects to the cluster, providing the two selected mongoses as seeds. + // Enable command monitoring, and execute a write command that is supposed to fail on both mongoses. + client = this.configuration.newClient(uri, { + monitorCommands: true, + retryWrites: true + }); + client.on('commandFailed', event => { + commandFailedEvents.push(event); + }); + await client.connect(); + const seeds = client.topology.s.seedlist.map(address => address.toString()); + + // Create a client per mongos using the direct connection, and configure the following fail point on each mongos: + // { + // configureFailPoint: "failCommand", + // mode: { times: 1 }, + // data: { + // failCommands: ["insert"], + // errorCode: 6, + // errorLabels: ["RetryableWriteError"], + // closeConnection: true + // } + // } + utilClientOne = this.configuration.newClient(`mongodb://${seeds[0]}`, { + directConnection: true + }); + utilClientTwo = this.configuration.newClient(`mongodb://${seeds[1]}`, { + directConnection: true + }); + await utilClientOne.db('admin').command(FAIL_COMMAND); + await utilClientTwo.db('admin').command(FAIL_COMMAND); + }); + + afterEach(async function () { + await client?.close(); + await utilClientOne.db('admin').command(DISABLE_FAIL_COMMAND); + await utilClientTwo.db('admin').command(DISABLE_FAIL_COMMAND); + await utilClientOne?.close(); + await utilClientTwo?.close(); + }); + + // Asserts that there were failed command events from each mongos. + // Disable the fail points. + it('retries on a different mongos', TEST_METADATA, async function () { + await client + .db('test') + .collection('test') + .insertOne({ a: 1 }) + .catch(() => null); + expect(commandFailedEvents[0].address).to.not.equal(commandFailedEvents[1].address); + }); + } + ); + + context( + 'Test that in a sharded cluster writes are retried on the same mongos if no other is available', + function () { + // This test MUST be executed against a sharded cluster and requires MongoDB 4.3.1+ for the errorLabels fail point option. + // Ensure that a test is run against a sharded cluster. If there are multiple mongoses in the cluster, pick one to test against. + const commandFailedEvents: CommandFailedEvent[] = []; + const commandSucceededEvents: CommandSucceededEvent[] = []; + let client; + let utilClient; + + beforeEach(async function () { + const uri = this.configuration.url({ + monitorCommands: true + }); + // Create a client that connects to the mongos using the direct connection, and configure the following fail point on the mongos: + // { + // configureFailPoint: "failCommand", + // mode: { times: 1 }, + // data: { + // failCommands: ["insert"], + // errorCode: 6, + // errorLabels: ["RetryableWriteError"], + // closeConnection: true + // } + // } + client = this.configuration.newClient(uri, { + monitorCommands: true, + retryWrites: true + }); + client.on('commandFailed', event => { + commandFailedEvents.push(event); + }); + client.on('commandSucceeded', event => { + commandSucceededEvents.push(event); + }); + // Create a client with retryWrites=true that connects to the cluster, providing the selected mongos as the seed. + // Enable command monitoring, and execute a write command that is supposed to fail. + utilClient = this.configuration.newClient(uri, { + directConnection: true + }); + await utilClient.db('admin').command(FAIL_COMMAND); + }); + + afterEach(async function () { + await client?.close(); + await utilClient?.db('admin').command(DISABLE_FAIL_COMMAND); + await utilClient?.close(); + }); + + // Asserts that there was a failed command and a successful command event. + // Disable the fail point. + it('retries on the same mongos', TEST_METADATA, async function () { + await client + .db('test') + .collection('test') + .insert({ a: 1 }) + .catch(() => null); + expect(commandFailedEvents[0].address).to.equal(commandSucceededEvents[0].address); + }); + } + ); +}); diff --git a/test/unit/sdam/server_selection.test.js b/test/unit/sdam/server_selection.test.ts similarity index 84% rename from test/unit/sdam/server_selection.test.js rename to test/unit/sdam/server_selection.test.ts index 324142cf35..925d40eef0 100644 --- a/test/unit/sdam/server_selection.test.js +++ b/test/unit/sdam/server_selection.test.ts @@ -1,17 +1,17 @@ -'use strict'; - -const { expect } = require('chai'); -const sinon = require('sinon'); -const { ObjectId } = require('../../mongodb'); -const { ReadPreference } = require('../../mongodb'); -const { +import { expect } from 'chai'; +import * as sinon from 'sinon'; + +import { + MIN_SECONDARY_WRITE_WIRE_VERSION, + ObjectId, + ReadPreference, + readPreferenceServerSelector, sameServerSelector, secondaryWritableServerSelector, - MIN_SECONDARY_WRITE_WIRE_VERSION -} = require('../../mongodb'); -const { ServerDescription } = require('../../mongodb'); -const { TopologyDescription } = require('../../mongodb'); -const { TopologyType } = require('../../mongodb'); + ServerDescription, + TopologyDescription, + TopologyType +} from '../../mongodb'; describe('server selection', function () { const primary = new ServerDescription('127.0.0.1:27017', { @@ -24,10 +24,19 @@ describe('server selection', function () { secondary: true, ok: 1 }); + const secondaryTwo = new ServerDescription('127.0.0.1:27024', { + setName: 'test', + secondary: true, + ok: 1 + }); const mongos = new ServerDescription('127.0.0.1:27019', { msg: 'isdbgrid', ok: 1 }); + const mongosTwo = new ServerDescription('127.0.0.1:27023', { + msg: 'isdbgrid', + ok: 1 + }); const loadBalancer = new ServerDescription('127.0.0.1:27020', { ok: 1 }, { loadBalanced: true }); const single = new ServerDescription('127.0.0.1:27021', { isWritablePrimary: true, @@ -37,6 +46,93 @@ describe('server selection', function () { ok: 0 }); + describe('#readPreferenceServerSelector', function () { + let selector; + let servers; + + context('when the topology is sharded', function () { + const topologyDescription = new TopologyDescription( + TopologyType.Sharded, + new Map(), + 'test', + MIN_SECONDARY_WRITE_WIRE_VERSION, + new ObjectId(), + MIN_SECONDARY_WRITE_WIRE_VERSION + ); + + beforeEach(function () { + selector = readPreferenceServerSelector(ReadPreference.secondaryPreferred); + }); + + context('when there are deprioritized servers', function () { + context('when there are other servers', function () { + beforeEach(function () { + servers = selector(topologyDescription, [mongos], [mongosTwo]); + }); + + it('returns a server from the other servers', function () { + expect(servers).to.deep.equal([mongos]); + }); + }); + + context('when there are no other servers', function () { + beforeEach(function () { + servers = selector(topologyDescription, [], [mongosTwo]); + }); + + it('returns a server from the deprioritized servers', function () { + expect(servers).to.deep.equal([mongosTwo]); + }); + }); + }); + + context('when there are no deprioritised servers', function () { + beforeEach(function () { + servers = selector(topologyDescription, [mongos]); + }); + + it('returns a server from the other servers', function () { + expect(servers).to.deep.equal([mongos]); + }); + }); + }); + + context('when the topology is not sharded', function () { + const topologyDescription = new TopologyDescription( + TopologyType.ReplicaSetWithPrimary, + new Map(), + 'test', + MIN_SECONDARY_WRITE_WIRE_VERSION, + new ObjectId(), + MIN_SECONDARY_WRITE_WIRE_VERSION + ); + + beforeEach(function () { + selector = readPreferenceServerSelector(ReadPreference.secondary); + }); + + context('when there are deprioritized servers', function () { + beforeEach(function () { + servers = selector(topologyDescription, [secondaryTwo], [secondary]); + }); + + it('selects from all server lists', function () { + expect(servers).to.contain.oneOf([secondary, secondaryTwo]); + }); + }); + + context('when there are no deprioritised servers', function () { + beforeEach(function () { + servers = selector(topologyDescription, [secondary], []); + }); + + it('selects from all non-deprioritised servers', function () { + expect(servers).to.deep.equal([secondary]); + }); + }); + }); + }); + describe('#sameServerSelector', function () { const topologyDescription = sinon.stub(); const serverDescriptions = new Map();