diff --git a/src/cmap/wire_protocol/responses.ts b/src/cmap/wire_protocol/responses.ts index 6c166afd61..18afde92e7 100644 --- a/src/cmap/wire_protocol/responses.ts +++ b/src/cmap/wire_protocol/responses.ts @@ -354,4 +354,8 @@ export class ClientBulkWriteCursorResponse extends CursorResponse { get deletedCount() { return this.get('nDeleted', BSONType.int, true); } + + get writeConcernError() { + return this.get('writeConcernError', BSONType.object, false); + } } diff --git a/src/cursor/client_bulk_write_cursor.ts b/src/cursor/client_bulk_write_cursor.ts index 06f34dfc52..3a4e7eb99a 100644 --- a/src/cursor/client_bulk_write_cursor.ts +++ b/src/cursor/client_bulk_write_cursor.ts @@ -1,7 +1,6 @@ import { type Document } from 'bson'; import { type ClientBulkWriteCursorResponse } from '../cmap/wire_protocol/responses'; -import { MongoClientBulkWriteCursorError } from '../error'; import type { MongoClient } from '../mongo_client'; import { ClientBulkWriteOperation } from '../operations/client_bulk_write/client_bulk_write'; import { type ClientBulkWriteCommandBuilder } from '../operations/client_bulk_write/command_builder'; @@ -48,16 +47,11 @@ export class ClientBulkWriteCursor extends AbstractCursor { * We need a way to get the top level cursor response fields for * generating the bulk write result, so we expose this here. */ - get response(): ClientBulkWriteCursorResponse { + get response(): ClientBulkWriteCursorResponse | null { if (this.cursorResponse) return this.cursorResponse; - throw new MongoClientBulkWriteCursorError( - 'No client bulk write cursor response returned from the server.' - ); + return null; } - /** - * Get the last set of operations the cursor executed. - */ get operations(): Document[] { return this.commandBuilder.lastOperations; } diff --git a/src/error.ts b/src/error.ts index 4aed6b9314..a917838948 100644 --- a/src/error.ts +++ b/src/error.ts @@ -1,4 +1,8 @@ import type { Document } from './bson'; +import { + type ClientBulkWriteError, + type ClientBulkWriteResult +} from './operations/client_bulk_write/common'; import type { ServerType } from './sdam/common'; import type { TopologyVersion } from './sdam/server_description'; import type { TopologyDescription } from './sdam/topology_description'; @@ -616,6 +620,44 @@ export class MongoGCPError extends MongoOIDCError { } } +/** + * An error indicating that an error occurred when executing the bulk write. + * + * @public + * @category Error + */ +export class MongoClientBulkWriteError extends MongoServerError { + /** + * Write concern errors that occurred while executing the bulk write. This list may have + * multiple items if more than one server command was required to execute the bulk write. + */ + writeConcernErrors: Document[]; + /** + * Errors that occurred during the execution of individual write operations. This map will + * contain at most one entry if the bulk write was ordered. + */ + writeErrors: Map; + /** + * The results of any successful operations that were performed before the error was + * encountered. + */ + partialResult?: ClientBulkWriteResult; + + /** + * Initialize the client bulk write error. + * @param message - The error message. + */ + constructor(message: ErrorDescription) { + super(message); + this.writeConcernErrors = []; + this.writeErrors = new Map(); + } + + override get name(): string { + return 'MongoClientBulkWriteError'; + } +} + /** * An error indicating that an error occurred when processing bulk write results. * @@ -1047,8 +1089,8 @@ export class MongoInvalidArgumentError extends MongoAPIError { * * @public **/ - constructor(message: string) { - super(message); + constructor(message: string, options?: { cause?: Error }) { + super(message, options); } override get name(): string { diff --git a/src/index.ts b/src/index.ts index 97f964ce54..9538ce1d5c 100644 --- a/src/index.ts +++ b/src/index.ts @@ -46,6 +46,7 @@ export { MongoBatchReExecutionError, MongoChangeStreamError, MongoClientBulkWriteCursorError, + MongoClientBulkWriteError, MongoClientBulkWriteExecutionError, MongoCompatibilityError, MongoCursorExhaustedError, @@ -477,6 +478,7 @@ export type { } from './operations/aggregate'; export type { AnyClientBulkWriteModel, + ClientBulkWriteError, ClientBulkWriteOptions, ClientBulkWriteResult, ClientDeleteManyModel, diff --git a/src/mongo_client.ts b/src/mongo_client.ts index 092e9418b3..4920191036 100644 --- a/src/mongo_client.ts +++ b/src/mongo_client.ts @@ -493,6 +493,11 @@ export class MongoClient extends TypedEventEmitter implements models: AnyClientBulkWriteModel[], options?: ClientBulkWriteOptions ): Promise { + if (this.autoEncrypter) { + throw new MongoInvalidArgumentError( + 'MongoClient bulkWrite does not currently support automatic encryption.' + ); + } return await new ClientBulkWriteExecutor(this, models, options).execute(); } diff --git a/src/operations/client_bulk_write/client_bulk_write.ts b/src/operations/client_bulk_write/client_bulk_write.ts index b04c978114..e901407cd7 100644 --- a/src/operations/client_bulk_write/client_bulk_write.ts +++ b/src/operations/client_bulk_write/client_bulk_write.ts @@ -27,6 +27,14 @@ export class ClientBulkWriteOperation extends CommandOperation(); + // In the case of retries we need to mark where we started this batch. + this.previousModelIndex = this.currentModelIndex; while (this.currentModelIndex < this.models.length) { const model = this.models[this.currentModelIndex]; const ns = model.namespace; const nsIndex = namespaces.get(ns); + // Multi updates are not retryable. + if (model.name === 'deleteMany' || model.name === 'updateMany') { + this.isBatchRetryable = false; + } + if (nsIndex != null) { // Build the operation and serialize it to get the bytes buffer. const operation = buildOperation(model, nsIndex, this.pkFactory); - const operationBuffer = BSON.serialize(operation); + let operationBuffer; + try { + operationBuffer = BSON.serialize(operation); + } catch (cause) { + throw new MongoInvalidArgumentError(`Could not serialize operation to BSON`, { cause }); + } + + validateBufferSize('ops', operationBuffer, maxBsonObjectSize); // Check if the operation buffer can fit in the command. If it can, // then add the operation to the document sequence and increment the @@ -119,9 +158,18 @@ export class ClientBulkWriteCommandBuilder { // construct our nsInfo and ops documents and buffers. namespaces.set(ns, currentNamespaceIndex); const nsInfo = { ns: ns }; - const nsInfoBuffer = BSON.serialize(nsInfo); const operation = buildOperation(model, currentNamespaceIndex, this.pkFactory); - const operationBuffer = BSON.serialize(operation); + let nsInfoBuffer; + let operationBuffer; + try { + nsInfoBuffer = BSON.serialize(nsInfo); + operationBuffer = BSON.serialize(operation); + } catch (cause) { + throw new MongoInvalidArgumentError(`Could not serialize ns info to BSON`, { cause }); + } + + validateBufferSize('nsInfo', nsInfoBuffer, maxBsonObjectSize); + validateBufferSize('ops', operationBuffer, maxBsonObjectSize); // Check if the operation and nsInfo buffers can fit in the command. If they // can, then add the operation and nsInfo to their respective document @@ -179,6 +227,14 @@ export class ClientBulkWriteCommandBuilder { } } +function validateBufferSize(name: string, buffer: Uint8Array, maxBsonObjectSize: number) { + if (buffer.length > maxBsonObjectSize) { + throw new MongoInvalidArgumentError( + `Client bulk write operation ${name} of length ${buffer.length} exceeds the max bson object size of ${maxBsonObjectSize}` + ); + } +} + /** @internal */ interface ClientInsertOperation { insert: number; @@ -293,6 +349,18 @@ export const buildUpdateManyOperation = ( return createUpdateOperation(model, index, true); }; +/** + * Validate the update document. + * @param update - The update document. + */ +function validateUpdate(update: Document) { + if (!hasAtomicOperators(update)) { + throw new MongoAPIError( + 'Client bulk write update models must only contain atomic modifiers (start with $) and must not be empty.' + ); + } +} + /** * Creates a delete operation based on the parameters. */ @@ -301,6 +369,11 @@ function createUpdateOperation( index: number, multi: boolean ): ClientUpdateOperation { + // Update documents provided in UpdateOne and UpdateMany write models are + // required only to contain atomic modifiers (i.e. keys that start with "$"). + // Drivers MUST throw an error if an update document is empty or if the + // document's first key does not start with "$". + validateUpdate(model.update); const document: ClientUpdateOperation = { update: index, multi: multi, @@ -343,6 +416,12 @@ export const buildReplaceOneOperation = ( model: ClientReplaceOneModel, index: number ): ClientReplaceOneOperation => { + if (hasAtomicOperators(model.replacement)) { + throw new MongoAPIError( + 'Client bulk write replace models must not contain atomic modifiers (start with $) and must not be empty.' + ); + } + const document: ClientReplaceOneOperation = { update: index, multi: false, diff --git a/src/operations/client_bulk_write/common.ts b/src/operations/client_bulk_write/common.ts index c41d971f02..11234cf4ea 100644 --- a/src/operations/client_bulk_write/common.ts +++ b/src/operations/client_bulk_write/common.ts @@ -181,6 +181,12 @@ export interface ClientBulkWriteResult { deleteResults?: Map; } +/** @public */ +export interface ClientBulkWriteError { + code: number; + message: string; +} + /** @public */ export interface ClientInsertOneResult { /** diff --git a/src/operations/client_bulk_write/executor.ts b/src/operations/client_bulk_write/executor.ts index 5baf1ed6b6..93acaac216 100644 --- a/src/operations/client_bulk_write/executor.ts +++ b/src/operations/client_bulk_write/executor.ts @@ -1,4 +1,9 @@ import { ClientBulkWriteCursor } from '../../cursor/client_bulk_write_cursor'; +import { + MongoClientBulkWriteError, + MongoClientBulkWriteExecutionError, + MongoServerError +} from '../../error'; import { type MongoClient } from '../../mongo_client'; import { WriteConcern } from '../../write_concern'; import { executeOperation } from '../execute_operation'; @@ -31,9 +36,18 @@ export class ClientBulkWriteExecutor { operations: AnyClientBulkWriteModel[], options?: ClientBulkWriteOptions ) { + if (operations.length === 0) { + throw new MongoClientBulkWriteExecutionError('No client bulk write models were provided.'); + } + this.client = client; this.operations = operations; - this.options = { ...options }; + this.options = { + ordered: true, + bypassDocumentValidation: false, + verboseResults: false, + ...options + }; // If no write concern was provided, we inherit one from the client. if (!this.options.writeConcern) { @@ -65,15 +79,42 @@ export class ClientBulkWriteExecutor { } else { const resultsMerger = new ClientBulkWriteResultsMerger(this.options); // For each command will will create and exhaust a cursor for the results. - let currentBatchOffset = 0; while (commandBuilder.hasNextBatch()) { const cursor = new ClientBulkWriteCursor(this.client, commandBuilder, this.options); - const docs = await cursor.toArray(); - const operations = cursor.operations; - resultsMerger.merge(currentBatchOffset, operations, cursor.response, docs); - // Set the new batch index so we can back back to the index in the original models. - currentBatchOffset += operations.length; + try { + await resultsMerger.merge(cursor); + } catch (error) { + // Write concern errors are recorded in the writeConcernErrors field on MongoClientBulkWriteError. + // When a write concern error is encountered, it should not terminate execution of the bulk write + // for either ordered or unordered bulk writes. However, drivers MUST throw an exception at the end + // of execution if any write concern errors were observed. + if (error instanceof MongoServerError && !(error instanceof MongoClientBulkWriteError)) { + // Server side errors need to be wrapped inside a MongoClientBulkWriteError, where the root + // cause is the error property and a partial result is to be included. + const bulkWriteError = new MongoClientBulkWriteError({ + message: 'Mongo client bulk write encountered an error during execution' + }); + bulkWriteError.cause = error; + bulkWriteError.partialResult = resultsMerger.result; + throw bulkWriteError; + } else { + // Client side errors are just thrown. + throw error; + } + } } + + // If we have write concern errors or unordered write errors at the end we throw. + if (resultsMerger.writeConcernErrors.length > 0 || resultsMerger.writeErrors.size > 0) { + const error = new MongoClientBulkWriteError({ + message: 'Mongo client bulk write encountered errors during execution.' + }); + error.writeConcernErrors = resultsMerger.writeConcernErrors; + error.writeErrors = resultsMerger.writeErrors; + error.partialResult = resultsMerger.result; + throw error; + } + return resultsMerger.result; } } diff --git a/src/operations/client_bulk_write/results_merger.ts b/src/operations/client_bulk_write/results_merger.ts index ca5f3f1604..8114523fde 100644 --- a/src/operations/client_bulk_write/results_merger.ts +++ b/src/operations/client_bulk_write/results_merger.ts @@ -1,6 +1,9 @@ +import { MongoWriteConcernError } from '../..'; import { type Document } from '../../bson'; -import { type ClientBulkWriteCursorResponse } from '../../cmap/wire_protocol/responses'; +import { type ClientBulkWriteCursor } from '../../cursor/client_bulk_write_cursor'; +import { MongoClientBulkWriteError } from '../../error'; import { + type ClientBulkWriteError, type ClientBulkWriteOptions, type ClientBulkWriteResult, type ClientDeleteResult, @@ -15,6 +18,9 @@ import { export class ClientBulkWriteResultsMerger { result: ClientBulkWriteResult; options: ClientBulkWriteOptions; + currentBatchOffset: number; + writeConcernErrors: Document[]; + writeErrors: Map; /** * Instantiate the merger. @@ -22,6 +28,9 @@ export class ClientBulkWriteResultsMerger { */ constructor(options: ClientBulkWriteOptions) { this.options = options; + this.currentBatchOffset = 0; + this.writeConcernErrors = []; + this.writeErrors = new Map(); this.result = { insertedCount: 0, upsertedCount: 0, @@ -47,55 +56,126 @@ export class ClientBulkWriteResultsMerger { * @param documents - The documents in the cursor. * @returns The current result. */ - merge( - currentBatchOffset: number, - operations: Document[], - response: ClientBulkWriteCursorResponse, - documents: Document[] - ): ClientBulkWriteResult { - // Update the counts from the cursor response. - this.result.insertedCount += response.insertedCount; - this.result.upsertedCount += response.upsertedCount; - this.result.matchedCount += response.matchedCount; - this.result.modifiedCount += response.modifiedCount; - this.result.deletedCount += response.deletedCount; - - if (this.options.verboseResults) { - // Iterate all the documents in the cursor and update the result. - for (const document of documents) { + async merge(cursor: ClientBulkWriteCursor): Promise { + let writeConcernErrorResult; + try { + for await (const document of cursor) { // Only add to maps if ok: 1 if (document.ok === 1) { - // Get the corresponding operation from the command. - const operation = operations[document.idx]; - // Handle insert results. - if ('insert' in operation) { - this.result.insertResults?.set(document.idx + currentBatchOffset, { - insertedId: operation.document._id + if (this.options.verboseResults) { + this.processDocument(cursor, document); + } + } else { + // If an individual write error is encountered during an ordered bulk write, drivers MUST + // record the error in writeErrors and immediately throw the exception. Otherwise, drivers + // MUST continue to iterate the results cursor and execute any further bulkWrite batches. + if (this.options.ordered) { + const error = new MongoClientBulkWriteError({ + message: 'Mongo client ordered bulk write encountered a write error.' + }); + error.writeErrors.set(document.idx + this.currentBatchOffset, { + code: document.code, + message: document.errmsg + }); + error.partialResult = this.result; + throw error; + } else { + this.writeErrors.set(document.idx + this.currentBatchOffset, { + code: document.code, + message: document.errmsg }); } - // Handle update results. - if ('update' in operation) { - const result: ClientUpdateResult = { - matchedCount: document.n, - modifiedCount: document.nModified ?? 0, - // Check if the bulk did actually upsert. - didUpsert: document.upserted != null - }; - if (document.upserted) { - result.upsertedId = document.upserted._id; + } + } + } catch (error) { + if (error instanceof MongoWriteConcernError) { + const result = error.result; + writeConcernErrorResult = { + insertedCount: result.nInserted, + upsertedCount: result.nUpserted, + matchedCount: result.nMatched, + modifiedCount: result.nModified, + deletedCount: result.nDeleted, + writeConcernError: result.writeConcernError + }; + if (this.options.verboseResults && result.cursor.firstBatch) { + for (const document of result.cursor.firstBatch) { + if (document.ok === 1) { + this.processDocument(cursor, document); } - this.result.updateResults?.set(document.idx + currentBatchOffset, result); - } - // Handle delete results. - if ('delete' in operation) { - this.result.deleteResults?.set(document.idx + currentBatchOffset, { - deletedCount: document.n - }); } } + } else { + throw error; + } + } finally { + // Update the counts from the cursor response. + if (cursor.response) { + const response = cursor.response; + this.incrementCounts(response); } + + // Increment the batch offset. + this.currentBatchOffset += cursor.operations.length; + } + + // If we have write concern errors ensure they are added. + if (writeConcernErrorResult) { + const writeConcernError = writeConcernErrorResult.writeConcernError as Document; + this.incrementCounts(writeConcernErrorResult); + this.writeConcernErrors.push({ + code: writeConcernError.code, + message: writeConcernError.errmsg + }); } return this.result; } + + /** + * Process an individual document in the results. + * @param cursor - The cursor. + * @param document - The document to process. + */ + private processDocument(cursor: ClientBulkWriteCursor, document: Document) { + // Get the corresponding operation from the command. + const operation = cursor.operations[document.idx]; + // Handle insert results. + if ('insert' in operation) { + this.result.insertResults?.set(document.idx + this.currentBatchOffset, { + insertedId: operation.document._id + }); + } + // Handle update results. + if ('update' in operation) { + const result: ClientUpdateResult = { + matchedCount: document.n, + modifiedCount: document.nModified ?? 0, + // Check if the bulk did actually upsert. + didUpsert: document.upserted != null + }; + if (document.upserted) { + result.upsertedId = document.upserted._id; + } + this.result.updateResults?.set(document.idx + this.currentBatchOffset, result); + } + // Handle delete results. + if ('delete' in operation) { + this.result.deleteResults?.set(document.idx + this.currentBatchOffset, { + deletedCount: document.n + }); + } + } + + /** + * Increment the result counts. + * @param document - The document with the results. + */ + private incrementCounts(document: Document) { + this.result.insertedCount += document.insertedCount; + this.result.upsertedCount += document.upsertedCount; + this.result.matchedCount += document.matchedCount; + this.result.modifiedCount += document.modifiedCount; + this.result.deletedCount += document.deletedCount; + } } diff --git a/src/operations/execute_operation.ts b/src/operations/execute_operation.ts index 97e6045073..ec7c233eec 100644 --- a/src/operations/execute_operation.ts +++ b/src/operations/execute_operation.ts @@ -230,6 +230,10 @@ async function tryOperation< }); } + if (operation.hasAspect(Aspect.COMMAND_BATCHING) && !operation.canRetryWrite) { + throw previousOperationError; + } + if (hasWriteAspect && !isRetryableWriteError(previousOperationError)) throw previousOperationError; @@ -260,6 +264,10 @@ async function tryOperation< } try { + // If tries > 0 and we are command batching we need to reset the batch. + if (tries > 0 && operation.hasAspect(Aspect.COMMAND_BATCHING)) { + operation.resetBatch(); + } return await operation.execute(server, session); } catch (operationError) { if (!(operationError instanceof MongoError)) throw operationError; diff --git a/src/operations/operation.ts b/src/operations/operation.ts index b51cca4020..12f168b76e 100644 --- a/src/operations/operation.ts +++ b/src/operations/operation.ts @@ -11,7 +11,8 @@ export const Aspect = { EXPLAINABLE: Symbol('EXPLAINABLE'), SKIP_COLLATION: Symbol('SKIP_COLLATION'), CURSOR_CREATING: Symbol('CURSOR_CREATING'), - MUST_SELECT_SAME_SERVER: Symbol('MUST_SELECT_SAME_SERVER') + MUST_SELECT_SAME_SERVER: Symbol('MUST_SELECT_SAME_SERVER'), + COMMAND_BATCHING: Symbol('COMMAND_BATCHING') } as const; /** @public */ @@ -98,6 +99,10 @@ export abstract class AbstractOperation { this[kSession] = undefined; } + resetBatch(): boolean { + return true; + } + get canRetryRead(): boolean { return this.hasAspect(Aspect.RETRYABLE) && this.hasAspect(Aspect.READ_OPERATION); } diff --git a/test/integration/crud/crud.prose.test.ts b/test/integration/crud/crud.prose.test.ts index 1ecd960028..de48cbbaca 100644 --- a/test/integration/crud/crud.prose.test.ts +++ b/test/integration/crud/crud.prose.test.ts @@ -8,6 +8,8 @@ import { type Collection, MongoBulkWriteError, type MongoClient, + MongoClientBulkWriteError, + MongoInvalidArgumentError, MongoServerError } from '../../mongodb'; import { filterForCommands } from '../shared'; @@ -283,6 +285,167 @@ describe('CRUD Prose Spec Tests', () => { }); }); + describe('5. MongoClient.bulkWrite collects WriteConcernErrors across batches', function () { + // Test that MongoClient.bulkWrite properly collects and reports writeConcernErrors returned in separate batches. + // This test must only be run on 8.0+ servers. This test must be skipped on Atlas Serverless. + // Construct a MongoClient (referred to as client) with retryWrites: false configured and command monitoring + // enabled to observe CommandStartedEvents. Perform a hello command using client and record the maxWriteBatchSize + // value contained in the response. Then, configure the following fail point with client: + // { + // "configureFailPoint": "failCommand", + // "mode": { "times": 2 }, + // "data": { + // "failCommands": ["bulkWrite"], + // "writeConcernError": { + // "code": 91, + // "errmsg": "Replication is being shut down" + // } + // } + // } + // Construct the following write model (referred to as model): + // InsertOne: { + // "namespace": "db.coll", + // "document": { "a": "b" } + // } + // Construct a list of write models (referred to as models) with model repeated maxWriteBatchSize + 1 times. + // Execute bulkWrite on client with models. Assert that the bulk write fails and returns a BulkWriteError (referred to as error). + // Assert that error.writeConcernErrors has a length of 2. + // Assert that error.partialResult is populated. Assert that error.partialResult.insertedCount is equal to maxWriteBatchSize + 1. + // Assert that two CommandStartedEvents were observed for the bulkWrite command. + let client: MongoClient; + let maxWriteBatchSize; + const models: AnyClientBulkWriteModel[] = []; + const commands: CommandStartedEvent[] = []; + + beforeEach(async function () { + client = this.configuration.newClient({}, { monitorCommands: true, retryWrites: false }); + await client.connect(); + await client.db('db').collection('coll').drop(); + const hello = await client.db('admin').command({ hello: 1 }); + await client.db('admin').command({ + configureFailPoint: 'failCommand', + mode: { times: 2 }, + data: { + failCommands: ['bulkWrite'], + writeConcernError: { + code: 91, + errmsg: 'Replication is being shut down' + } + } + }); + maxWriteBatchSize = hello.maxWriteBatchSize; + + client.on('commandStarted', filterForCommands('bulkWrite', commands)); + commands.length = 0; + + Array.from({ length: maxWriteBatchSize + 1 }, () => { + models.push({ + namespace: 'db.coll', + name: 'insertOne', + document: { a: 'b' } + }); + }); + }); + + afterEach(async function () { + await client.close(); + }); + + it('splits the commands into 2 operations and handles the errors', { + metadata: { requires: { mongodb: '>=8.0.0', serverless: 'forbid' } }, + async test() { + const error = await client.bulkWrite(models).catch(error => error); + expect(error).to.be.instanceOf(MongoClientBulkWriteError); + expect(error.writeConcernErrors.length).to.equal(2); + expect(error.partialResult.insertedCount).to.equal(maxWriteBatchSize + 1); + expect(commands.length).to.equal(2); + } + }); + }); + + describe('6. MongoClient.bulkWrite handles individual WriteErrors across batches', function () { + // Test that MongoClient.bulkWrite handles individual write errors across batches for ordered and unordered bulk writes. + // This test must only be run on 8.0+ servers. This test must be skipped on Atlas Serverless. + // Construct a MongoClient (referred to as client) with command monitoring enabled to observe CommandStartedEvents. + // Perform a hello command using client and record the maxWriteBatchSize value contained in the response. + // Construct a MongoCollection (referred to as collection) with the namespace "db.coll" (referred to as namespace). + // Drop collection. Then, construct the following document (referred to as document): + // { + // "_id": 1 + // } + // Insert document into collection. + // Create the following write model (referred to as model): + // InsertOne { + // "namespace": namespace, + // "document": document + // } + // Construct a list of write models (referred to as models) with model repeated maxWriteBatchSize + 1 times. + let client: MongoClient; + let maxWriteBatchSize; + const models: AnyClientBulkWriteModel[] = []; + const commands: CommandStartedEvent[] = []; + + beforeEach(async function () { + client = this.configuration.newClient({}, { monitorCommands: true, retryWrites: false }); + await client.connect(); + await client.db('db').collection('coll').drop(); + const hello = await client.db('admin').command({ hello: 1 }); + await client.db('db').collection('coll').insertOne({ _id: 1 }); + maxWriteBatchSize = hello.maxWriteBatchSize; + + client.on('commandStarted', filterForCommands('bulkWrite', commands)); + commands.length = 0; + + Array.from({ length: maxWriteBatchSize + 1 }, () => { + models.push({ + namespace: 'db.coll', + name: 'insertOne', + document: { _id: 1 } + }); + }); + }); + + afterEach(async function () { + await client.close(); + }); + + context('when the bulk write is unordered', function () { + // Unordered + // Test that an unordered bulk write collects WriteErrors across batches. + // Execute bulkWrite on client with models and ordered set to false. Assert that the bulk write fails + // and returns a BulkWriteError (referred to as unorderedError). + // Assert that unorderedError.writeErrors has a length of maxWriteBatchSize + 1. + // Assert that two CommandStartedEvents were observed for the bulkWrite command. + it('splits the commands into 2 operations and handles the errors', { + metadata: { requires: { mongodb: '>=8.0.0', serverless: 'forbid' } }, + async test() { + const error = await client.bulkWrite(models, { ordered: false }).catch(error => error); + expect(error).to.be.instanceOf(MongoClientBulkWriteError); + expect(error.writeErrors.size).to.equal(maxWriteBatchSize + 1); + expect(commands.length).to.equal(2); + } + }); + }); + + context('when the bulk write is ordered', function () { + // Ordered + // Test that an ordered bulk write does not execute further batches when a WriteError occurs. + // Execute bulkWrite on client with models and ordered set to true. Assert that the bulk write fails + // and returns a BulkWriteError (referred to as orderedError). + // Assert that orderedError.writeErrors has a length of 1. + // Assert that one CommandStartedEvent was observed for the bulkWrite command. + it('splits the commands into 2 operations and halts on first error', { + metadata: { requires: { mongodb: '>=8.0.0', serverless: 'forbid' } }, + async test() { + const error = await client.bulkWrite(models, { ordered: true }).catch(error => error); + expect(error).to.be.instanceOf(MongoClientBulkWriteError); + expect(error.writeErrors.size).to.equal(1); + expect(commands.length).to.equal(1); + } + }); + }); + }); + describe('7. MongoClient.bulkWrite handles a cursor requiring a getMore', function () { // Test that MongoClient.bulkWrite properly iterates the results cursor when getMore is required. // This test must only be run on 8.0+ servers. This test must be skipped on Atlas Serverless. @@ -430,6 +593,185 @@ describe('CRUD Prose Spec Tests', () => { }); }); + describe('9. MongoClient.bulkWrite handles a getMore error', function () { + // Test that MongoClient.bulkWrite properly handles a failure that occurs when attempting a getMore. + // This test must only be run on 8.0+ servers. This test must be skipped on Atlas Serverless. + // Construct a MongoClient (referred to as client) with command monitoring enabled to observe CommandStartedEvents. + // Perform a hello command using client and record the maxBsonObjectSize value from the response. Then, + // configure the following fail point with client: + // { + // "configureFailPoint": "failCommand", + // "mode": { "times": 1 }, + // "data": { + // "failCommands": ["getMore"], + // "errorCode": 8 + // } + // } + // Construct a MongoCollection (referred to as collection) with the namespace "db.coll" (referred to as namespace). + // Drop collection. Then create the following list of write models (referred to as models): + // UpdateOne { + // "namespace": namespace, + // "filter": { "_id": "a".repeat(maxBsonObjectSize / 2) }, + // "update": { "$set": { "x": 1 } }, + // "upsert": true + // }, + // UpdateOne { + // "namespace": namespace, + // "filter": { "_id": "b".repeat(maxBsonObjectSize / 2) }, + // "update": { "$set": { "x": 1 } }, + // "upsert": true + // }, + // Execute bulkWrite on client with models and verboseResults set to true. Assert that the bulk write + // fails and returns a BulkWriteError (referred to as bulkWriteError). + // Assert that bulkWriteError.error is populated with an error (referred to as topLevelError). Assert + // that topLevelError.errorCode is equal to 8. + // Assert that bulkWriteError.partialResult is populated with a result (referred to as partialResult). + // Assert that partialResult.upsertedCount is equal to 2. Assert that the length of + // partialResult.updateResults is equal to 1. + // Assert that a CommandStartedEvent was observed for the getMore command. + // Assert that a CommandStartedEvent was observed for the killCursors command. + let client: MongoClient; + let maxBsonObjectSize; + const models: AnyClientBulkWriteModel[] = []; + const getMoreCommands: CommandStartedEvent[] = []; + const killCursorsCommands: CommandStartedEvent[] = []; + + beforeEach(async function () { + client = this.configuration.newClient({}, { monitorCommands: true }); + await client.connect(); + await client.db('db').collection('coll').drop(); + const hello = await client.db('admin').command({ hello: 1 }); + maxBsonObjectSize = hello.maxBsonObjectSize; + + await client.db('admin').command({ + configureFailPoint: 'failCommand', + mode: { times: 1 }, + data: { + failCommands: ['getMore'], + errorCode: 8 + } + }); + + client.on('commandStarted', filterForCommands('getMore', getMoreCommands)); + client.on('commandStarted', filterForCommands('killCursors', killCursorsCommands)); + getMoreCommands.length = 0; + killCursorsCommands.length = 0; + + models.push({ + name: 'updateOne', + namespace: 'db.coll', + filter: { _id: 'a'.repeat(maxBsonObjectSize / 2) }, + update: { $set: { x: 1 } }, + upsert: true + }); + models.push({ + name: 'updateOne', + namespace: 'db.coll', + filter: { _id: 'b'.repeat(maxBsonObjectSize / 2) }, + update: { $set: { x: 1 } }, + upsert: true + }); + }); + + afterEach(async function () { + await client.close(); + }); + + it('handles a getMore that errors', { + metadata: { requires: { mongodb: '>=8.0.0', serverless: 'forbid' } }, + async test() { + const error = await client + .bulkWrite(models, { verboseResults: true }) + .catch(error => error); + expect(error).to.be.instanceOf(MongoClientBulkWriteError); + expect(error.cause.code).to.equal(8); + expect(error.partialResult).to.exist; + // TODO: Need to handle batches in cursor one at a time and not call toArray() + expect(error.partialResult.upsertedCount).to.equal(2); + expect(error.partialResult.updateResults.size).to.equal(1); + expect(getMoreCommands.length).to.equal(1); + expect(killCursorsCommands.length).to.equal(1); + } + }); + }); + + describe('10. MongoClient.bulkWrite returns error for unacknowledged too-large insert', function () { + // This test must only be run on 8.0+ servers. This test must be skipped on Atlas Serverless. + // Construct a MongoClient (referred to as client). + // Perform a hello command using client and record the following values from the response: maxBsonObjectSize. + // Then, construct the following document (referred to as document): + // { + // "a": "b".repeat(maxBsonObjectSize) + // } + let client: MongoClient; + let maxBsonObjectSize; + let document: Document; + + beforeEach(async function () { + client = this.configuration.newClient({}, { monitorCommands: true }); + await client.connect(); + await client.db('db').collection('coll').drop(); + const hello = await client.db('admin').command({ hello: 1 }); + maxBsonObjectSize = hello.maxBsonObjectSize; + + document = { + a: 'b'.repeat(maxBsonObjectSize) + }; + }); + + afterEach(async function () { + await client.close(); + }); + + context('when performing inserts', function () { + // With insert + // Construct the following write model (referred to as model): + // InsertOne: { + // "namespace": "db.coll", + // "document": document + // } + // Construct as list of write models (referred to as models) with the one model. + // Call MongoClient.bulkWrite with models and BulkWriteOptions.writeConcern set to an unacknowledged write concern. + // Expect a client-side error due the size. + it('throws an error', { + metadata: { requires: { mongodb: '>=8.0.0', serverless: 'forbid' } }, + async test() { + const error = await client + .bulkWrite([{ name: 'insertOne', namespace: 'db.coll', document: document }], { + writeConcern: { w: 0 } + }) + .catch(error => error); + expect(error.message).to.include('Client bulk write operation ops of length'); + } + }); + }); + + context('when performing replacements', function () { + // With replace + // Construct the following write model (referred to as model): + // ReplaceOne: { + // "namespace": "db.coll", + // "filter": {}, + // "replacement": document + // } + // Construct as list of write models (referred to as models) with the one model. + // Call MongoClient.bulkWrite with models and BulkWriteOptions.writeConcern set to an unacknowledged write concern. + // Expect a client-side error due the size. + it('throws an error', { + metadata: { requires: { mongodb: '>=8.0.0', serverless: 'forbid' } }, + async test() { + const error = await client + .bulkWrite( + [{ name: 'replaceOne', namespace: 'db.coll', filter: {}, replacement: document }], + { writeConcern: { w: 0 } } + ) + .catch(error => error); + expect(error.message).to.include('Client bulk write operation ops of length'); + } + }); + }); + }); + describe('11. MongoClient.bulkWrite batch splits when the addition of a new namespace exceeds the maximum message size', function () { // Test that MongoClient.bulkWrite batch splits a bulk write when the addition of a new namespace to nsInfo causes the size // of the message to exceed maxMessageSizeBytes - 1000. @@ -573,6 +915,133 @@ describe('CRUD Prose Spec Tests', () => { }); }); + describe('12. MongoClient.bulkWrite returns an error if no operations can be added to ops', function () { + // Test that MongoClient.bulkWrite returns an error if an operation provided exceeds maxMessageSizeBytes + // such that an empty ops payload would be sent. + // This test must only be run on 8.0+ servers. This test must be skipped on Atlas Serverless. + // This test may be skipped by drivers that are not able to construct arbitrarily large documents. + // Construct a MongoClient (referred to as client). Perform a hello command using client and record + // the maxMessageSizeBytes value contained in the response. + let client: MongoClient; + let maxMessageSizeBytes; + + beforeEach(async function () { + client = this.configuration.newClient({}); + await client.connect(); + await client.db('db').collection('coll').drop(); + const hello = await client.db('admin').command({ hello: 1 }); + maxMessageSizeBytes = hello.maxMessageSizeBytes; + }); + + afterEach(async function () { + await client.close(); + }); + + context('when the document is too large', function () { + // Case 1: document too large + // Construct the following write model (referred to as largeDocumentModel): + // InsertOne { + // "namespace": "db.coll", + // "document": { "a": "b".repeat(maxMessageSizeBytes) } + // } + // Execute bulkWrite on client with largeDocumentModel. Assert that an error (referred to as error) is returned. + // Assert that error is a client error. + it('raises a client error', { + metadata: { requires: { mongodb: '>=8.0.0', serverless: 'forbid' } }, + async test() { + const model: AnyClientBulkWriteModel = { + name: 'insertOne', + namespace: 'db.coll', + document: { a: 'b'.repeat(maxMessageSizeBytes) } + }; + const error = await client.bulkWrite([model]).catch(error => error); + expect(error).to.be.instanceOf(MongoInvalidArgumentError); + } + }); + }); + + context('when the namespace is too large', function () { + // Case 2: namespace too large + // Construct the following namespace (referred to as namespace): + // "db." + "c".repeat(maxMessageSizeBytes) + // Construct the following write model (referred to as largeNamespaceModel): + // InsertOne { + // "namespace": namespace, + // "document": { "a": "b" } + // } + // Execute bulkWrite on client with largeNamespaceModel. Assert that an error (referred to as error) is returned. + // Assert that error is a client error. + it('raises a client error', { + metadata: { requires: { mongodb: '>=8.0.0', serverless: 'forbid' } }, + async test() { + const namespace = `db.${'c'.repeat(maxMessageSizeBytes)}`; + const model: AnyClientBulkWriteModel = { + name: 'insertOne', + namespace: namespace, + document: { a: 'b' } + }; + const error = await client.bulkWrite([model]).catch(error => error); + expect(error).to.be.instanceOf(MongoInvalidArgumentError); + } + }); + }); + }); + + describe('13. MongoClient.bulkWrite returns an error if auto-encryption is configured', function () { + // This test is expected to be removed when DRIVERS-2888 is resolved. + // Test that MongoClient.bulkWrite returns an error if the client has auto-encryption configured. + // This test must only be run on 8.0+ servers. This test must be skipped on Atlas Serverless. + // Construct a MongoClient (referred to as client) configured with the following AutoEncryptionOpts: + // AutoEncryptionOpts { + // "keyVaultNamespace": "db.coll", + // "kmsProviders": { + // "aws": { + // "accessKeyId": "foo", + // "secretAccessKey": "bar" + // } + // } + // } + // Construct the following write model (referred to as model): + // InsertOne { + // "namespace": "db.coll", + // "document": { "a": "b" } + // } + // Execute bulkWrite on client with model. Assert that an error (referred to as error) is returned. + // Assert that error is a client error containing the message: "bulkWrite does not currently support automatic encryption". + let client: MongoClient; + + beforeEach(async function () { + client = this.configuration.newClient( + {}, + { + autoEncryption: { + keyVaultNamespace: 'db.coll', + kmsProviders: { + aws: { + accessKeyId: 'foo', + secretAccessKey: 'bar' + } + } + } + } + ); + }); + + afterEach(async function () { + await client.close(); + }); + + it('raises a client side error', async function () { + const model: AnyClientBulkWriteModel = { + name: 'insertOne', + namespace: 'db.coll', + document: { a: 'b' } + }; + const error = await client.bulkWrite([model]).catch(error => error); + expect(error.message).to.include('bulkWrite does not currently support automatic encryption'); + }); + }); + describe('14. `explain` helpers allow users to specify `maxTimeMS`', function () { let client: MongoClient; const commands: CommandStartedEvent[] = []; diff --git a/test/integration/crud/crud.spec.test.ts b/test/integration/crud/crud.spec.test.ts index a8a0d2987f..5439c77523 100644 --- a/test/integration/crud/crud.spec.test.ts +++ b/test/integration/crud/crud.spec.test.ts @@ -3,22 +3,6 @@ import * as path from 'path'; import { loadSpecTests } from '../../spec/index'; import { runUnifiedSuite } from '../../tools/unified-spec-runner/runner'; -const clientBulkWriteTests = new RegExp( - [ - 'client bulkWrite operations support errorResponse assertions', - 'an individual operation fails during an ordered bulkWrite', - 'an individual operation fails during an unordered bulkWrite', - 'detailed results are omitted from error when verboseResults is false', - 'a top-level failure occurs during a bulkWrite', - 'a bulk write with only errors does not report a partial result', - 'an empty list of write models is a client-side error', - 'a write concern error occurs during a bulkWrite', - 'client bulkWrite replaceOne prohibits atomic modifiers', - 'client bulkWrite updateOne requires atomic modifiers', - 'client bulkWrite updateMany requires atomic modifiers' - ].join('|') -); - const unacknowledgedHintTests = [ 'Unacknowledged updateOne with hint document on 4.2+ server', 'Unacknowledged updateOne with hint string on 4.2+ server', @@ -59,13 +43,11 @@ describe('CRUD unified', function () { runUnifiedSuite( loadSpecTests(path.join('crud', 'unified')), ({ description }, { isLoadBalanced }) => { - return description.match(clientBulkWriteTests) - ? 'TODO(NODE-6257): implement client level bulk write' - : unacknowledgedHintTests.includes(description) - ? `TODO(NODE-3541)` - : isLoadBalanced && loadBalancedCollationTests.includes(description) - ? `TODO(NODE-6280): fix collation for find and modify commands on load balanced mode` - : false; + return unacknowledgedHintTests.includes(description) + ? `TODO(NODE-3541)` + : isLoadBalanced && loadBalancedCollationTests.includes(description) + ? `TODO(NODE-6280): fix collation for find and modify commands on load balanced mode` + : false; } ); }); diff --git a/test/integration/retryable-writes/retryable_writes.spec.test.ts b/test/integration/retryable-writes/retryable_writes.spec.test.ts index 1c9e510e4f..87e8c1edc9 100644 --- a/test/integration/retryable-writes/retryable_writes.spec.test.ts +++ b/test/integration/retryable-writes/retryable_writes.spec.test.ts @@ -3,22 +3,6 @@ import * as path from 'path'; import { loadSpecTests } from '../../spec'; import { runUnifiedSuite } from '../../tools/unified-spec-runner/runner'; -const clientBulkWriteTests = [ - 'client bulkWrite with one network error succeeds after retry', - 'client bulkWrite with two network errors fails after retry', - 'client bulkWrite with no multi: true operations succeeds after retryable top-level error', - 'client bulkWrite with multi: true operations fails after retryable top-level error', - 'client bulkWrite with no multi: true operations succeeds after retryable writeConcernError', - 'client bulkWrite with multi: true operations fails after retryable writeConcernError', - 'client bulkWrite with retryWrites: false does not retry', - 'client.clientBulkWrite succeeds after retryable handshake network error', - 'client.clientBulkWrite succeeds after retryable handshake server error (ShutdownInProgress)' -]; - describe('Retryable Writes (unified)', function () { - runUnifiedSuite(loadSpecTests(path.join('retryable-writes', 'unified')), ({ description }) => { - return clientBulkWriteTests.includes(description) - ? `TODO(NODE-6257): implement client-level bulk write.` - : false; - }); + runUnifiedSuite(loadSpecTests(path.join('retryable-writes', 'unified'))); }); diff --git a/test/spec/retryable-writes/unified/client-bulkWrite-clientErrors.json b/test/spec/retryable-writes/unified/client-bulkWrite-clientErrors.json index e2c0fb9c0a..d16e0c9c8d 100644 --- a/test/spec/retryable-writes/unified/client-bulkWrite-clientErrors.json +++ b/test/spec/retryable-writes/unified/client-bulkWrite-clientErrors.json @@ -8,7 +8,8 @@ "replicaset", "sharded", "load-balanced" - ] + ], + "serverless": "forbid" } ], "createEntities": [ diff --git a/test/spec/retryable-writes/unified/client-bulkWrite-clientErrors.yml b/test/spec/retryable-writes/unified/client-bulkWrite-clientErrors.yml index 85696e89db..e5214b90f8 100644 --- a/test/spec/retryable-writes/unified/client-bulkWrite-clientErrors.yml +++ b/test/spec/retryable-writes/unified/client-bulkWrite-clientErrors.yml @@ -6,6 +6,7 @@ runOnRequirements: - replicaset - sharded - load-balanced + serverless: forbid createEntities: - client: diff --git a/test/spec/retryable-writes/unified/client-bulkWrite-serverErrors.json b/test/spec/retryable-writes/unified/client-bulkWrite-serverErrors.json index 4a0b210eb5..f58c82bcc7 100644 --- a/test/spec/retryable-writes/unified/client-bulkWrite-serverErrors.json +++ b/test/spec/retryable-writes/unified/client-bulkWrite-serverErrors.json @@ -8,7 +8,8 @@ "replicaset", "sharded", "load-balanced" - ] + ], + "serverless": "forbid" } ], "createEntities": [ diff --git a/test/spec/retryable-writes/unified/client-bulkWrite-serverErrors.yml b/test/spec/retryable-writes/unified/client-bulkWrite-serverErrors.yml index 23d2c622ee..722e5cc8e0 100644 --- a/test/spec/retryable-writes/unified/client-bulkWrite-serverErrors.yml +++ b/test/spec/retryable-writes/unified/client-bulkWrite-serverErrors.yml @@ -6,6 +6,7 @@ runOnRequirements: - replicaset - sharded - load-balanced + serverless: forbid createEntities: - client: diff --git a/test/spec/retryable-writes/unified/handshakeError.json b/test/spec/retryable-writes/unified/handshakeError.json index 3c46463759..93cb2e849e 100644 --- a/test/spec/retryable-writes/unified/handshakeError.json +++ b/test/spec/retryable-writes/unified/handshakeError.json @@ -1,6 +1,6 @@ { "description": "retryable writes handshake failures", - "schemaVersion": "1.3", + "schemaVersion": "1.4", "runOnRequirements": [ { "minServerVersion": "4.2", @@ -57,7 +57,8 @@ "description": "client.clientBulkWrite succeeds after retryable handshake network error", "runOnRequirements": [ { - "minServerVersion": "8.0" + "minServerVersion": "8.0", + "serverless": "forbid" } ], "operations": [ @@ -165,7 +166,8 @@ "description": "client.clientBulkWrite succeeds after retryable handshake server error (ShutdownInProgress)", "runOnRequirements": [ { - "minServerVersion": "8.0" + "minServerVersion": "8.0", + "serverless": "forbid" } ], "operations": [ diff --git a/test/spec/retryable-writes/unified/handshakeError.yml b/test/spec/retryable-writes/unified/handshakeError.yml index 131bbf2e5c..1743463370 100644 --- a/test/spec/retryable-writes/unified/handshakeError.yml +++ b/test/spec/retryable-writes/unified/handshakeError.yml @@ -2,7 +2,7 @@ description: "retryable writes handshake failures" -schemaVersion: "1.3" +schemaVersion: "1.4" # For `serverless: forbid` runOnRequirements: - minServerVersion: "4.2" @@ -53,6 +53,7 @@ tests: - description: "client.clientBulkWrite succeeds after retryable handshake network error" runOnRequirements: - minServerVersion: "8.0" # `bulkWrite` added to server 8.0 + serverless: forbid operations: - name: failPoint object: testRunner @@ -98,6 +99,7 @@ tests: - description: "client.clientBulkWrite succeeds after retryable handshake server error (ShutdownInProgress)" runOnRequirements: - minServerVersion: "8.0" # `bulkWrite` added to server 8.0 + serverless: forbid operations: - name: failPoint object: testRunner diff --git a/test/tools/unified-spec-runner/match.ts b/test/tools/unified-spec-runner/match.ts index f92004c776..bb4ba99a44 100644 --- a/test/tools/unified-spec-runner/match.ts +++ b/test/tools/unified-spec-runner/match.ts @@ -23,6 +23,7 @@ import { type Document, Long, MongoBulkWriteError, + MongoClientBulkWriteError, MongoError, MongoServerError, ObjectId, @@ -765,7 +766,11 @@ export function expectErrorCheck( } if (expected.errorCode != null) { - expect(error, expectMessage).to.have.property('code', expected.errorCode); + if (error instanceof MongoClientBulkWriteError) { + expect(error.cause).to.have.property('code', expected.errorCode); + } else { + expect(error, expectMessage).to.have.property('code', expected.errorCode); + } } if (expected.errorCodeName != null) { @@ -773,7 +778,10 @@ export function expectErrorCheck( } if (expected.errorLabelsContain != null) { - const mongoError = error as MongoError; + let mongoError = error as MongoError; + if (error instanceof MongoClientBulkWriteError) { + mongoError = error.cause as MongoError; + } for (const errorLabel of expected.errorLabelsContain) { expect( mongoError.hasErrorLabel(errorLabel), @@ -793,10 +801,18 @@ export function expectErrorCheck( } if (expected.expectResult != null) { - resultCheck(error, expected.expectResult as any, entities); + if ('partialResult' in error) { + resultCheck(error.partialResult, expected.expectResult as any, entities); + } else { + resultCheck(error, expected.expectResult as any, entities); + } } if (expected.errorResponse != null) { - resultCheck(error, expected.errorResponse, entities); + if (error instanceof MongoClientBulkWriteError) { + resultCheck(error.cause, expected.errorResponse, entities); + } else { + resultCheck(error, expected.errorResponse, entities); + } } } diff --git a/test/unit/index.test.ts b/test/unit/index.test.ts index 883cc4b4ba..595f372c43 100644 --- a/test/unit/index.test.ts +++ b/test/unit/index.test.ts @@ -74,6 +74,7 @@ const EXPECTED_EXPORTS = [ 'MongoClient', 'MongoClientAuthProviders', 'MongoClientBulkWriteCursorError', + 'MongoClientBulkWriteError', 'MongoClientBulkWriteExecutionError', 'MongoCompatibilityError', 'MongoCryptAzureKMSRequestError', diff --git a/test/unit/operations/client_bulk_write/command_builder.test.ts b/test/unit/operations/client_bulk_write/command_builder.test.ts index e92966795b..ef3cc6f0b0 100644 --- a/test/unit/operations/client_bulk_write/command_builder.test.ts +++ b/test/unit/operations/client_bulk_write/command_builder.test.ts @@ -34,7 +34,7 @@ describe('ClientBulkWriteCommandBuilder', function () { ordered: false, comment: { bulk: 'write' } }); - const command = builder.buildBatch(48000000, 100000); + const command = builder.buildBatch(48000000, 100000, 16777216); it('sets the bulkWrite command', function () { expect(command.bulkWrite).to.equal(1); @@ -79,7 +79,7 @@ describe('ClientBulkWriteCommandBuilder', function () { document: { _id: id, name: 1 } }; const builder = new ClientBulkWriteCommandBuilder([model], {}); - const command = builder.buildBatch(48000000, 100000); + const command = builder.buildBatch(48000000, 100000, 16777216); it('sets the bulkWrite command', function () { expect(command.bulkWrite).to.equal(1); @@ -122,8 +122,8 @@ describe('ClientBulkWriteCommandBuilder', function () { document: { _id: idTwo, name: 2 } }; const builder = new ClientBulkWriteCommandBuilder([modelOne, modelTwo], {}); - const commandOne = builder.buildBatch(48000000, 1); - const commandTwo = builder.buildBatch(48000000, 1); + const commandOne = builder.buildBatch(48000000, 1, 16777216); + const commandTwo = builder.buildBatch(48000000, 1, 16777216); it('splits the operations into multiple commands', function () { expect(commandOne.ops.documents).to.deep.equal([ @@ -149,8 +149,8 @@ describe('ClientBulkWriteCommandBuilder', function () { document: { _id: idTwo, name: 2 } }; const builder = new ClientBulkWriteCommandBuilder([modelOne, modelTwo], {}); - const commandOne = builder.buildBatch(1090, 100000); - const commandTwo = builder.buildBatch(1090, 100000); + const commandOne = builder.buildBatch(1090, 100000, 16777216); + const commandTwo = builder.buildBatch(1090, 100000, 16777216); it('splits the operations into multiple commands', function () { expect(commandOne.ops.documents).to.deep.equal([ @@ -176,7 +176,7 @@ describe('ClientBulkWriteCommandBuilder', function () { document: { _id: idTwo, name: 2 } }; const builder = new ClientBulkWriteCommandBuilder([modelOne, modelTwo], {}); - const command = builder.buildBatch(48000000, 100000); + const command = builder.buildBatch(48000000, 100000, 16777216); it('sets the bulkWrite command', function () { expect(command.bulkWrite).to.equal(1); @@ -210,7 +210,7 @@ describe('ClientBulkWriteCommandBuilder', function () { document: { _id: idTwo, name: 2 } }; const builder = new ClientBulkWriteCommandBuilder([modelOne, modelTwo], {}); - const command = builder.buildBatch(48000000, 100000); + const command = builder.buildBatch(48000000, 100000, 16777216); it('sets the bulkWrite command', function () { expect(command.bulkWrite).to.equal(1); @@ -253,7 +253,7 @@ describe('ClientBulkWriteCommandBuilder', function () { document: { _id: idThree, name: 2 } }; const builder = new ClientBulkWriteCommandBuilder([modelOne, modelTwo, modelThree], {}); - const command = builder.buildBatch(48000000, 100000); + const command = builder.buildBatch(48000000, 100000, 16777216); it('sets the bulkWrite command', function () { expect(command.bulkWrite).to.equal(1); diff --git a/test/unit/operations/client_bulk_write/results_merger.test.ts b/test/unit/operations/client_bulk_write/results_merger.test.ts index 342502eebb..c9a954e694 100644 --- a/test/unit/operations/client_bulk_write/results_merger.test.ts +++ b/test/unit/operations/client_bulk_write/results_merger.test.ts @@ -5,9 +5,40 @@ import { ClientBulkWriteCursorResponse, type ClientBulkWriteResult, ClientBulkWriteResultsMerger, + type Document, Long } from '../../../mongodb'; +class MockCursor { + operations: Document[]; + documents: Document[]; + response: ClientBulkWriteCursorResponse; + + constructor( + operations: Document[], + documents: Document[], + response: ClientBulkWriteCursorResponse + ) { + this.operations = operations; + this.documents = documents; + this.response = response; + } + + async next() { + return this.documents.shift(); + } + + async *[Symbol.asyncIterator](): AsyncGenerator { + while (true) { + const document = await this.next(); + if (document == null) { + return; + } + yield document; + } + } +} + describe('ClientBulkWriteResultsMerger', function () { describe('#constructor', function () { const resultsMerger = new ClientBulkWriteResultsMerger({}); @@ -76,8 +107,8 @@ describe('ClientBulkWriteResultsMerger', function () { const merger = new ClientBulkWriteResultsMerger({ verboseResults: true }); let result: ClientBulkWriteResult; - before(function () { - result = merger.merge(0, operations, response, documents); + before(async function () { + result = await merger.merge(new MockCursor(operations, documents, response)); }); it('merges the inserted count', function () { @@ -169,8 +200,8 @@ describe('ClientBulkWriteResultsMerger', function () { const merger = new ClientBulkWriteResultsMerger({ verboseResults: false }); let result: ClientBulkWriteResult; - before(function () { - result = merger.merge(0, operations, response, documents); + before(async function () { + result = await merger.merge(new MockCursor(operations, documents, response)); }); it('merges the inserted count', function () { @@ -207,106 +238,6 @@ describe('ClientBulkWriteResultsMerger', function () { }); }); }); - - context('when merging on a later batch', function () { - context('when requesting verbose results', function () { - // An example verbose response from the server without errors: - // { - // cursor: { - // id: Long('0'), - // firstBatch: [ { ok: 1, idx: 0, n: 1 }, { ok: 1, idx: 1, n: 1 } ], - // ns: 'admin.$cmd.bulkWrite' - // }, - // nErrors: 0, - // nInserted: 2, - // nMatched: 0, - // nModified: 0, - // nUpserted: 0, - // nDeleted: 0, - // ok: 1 - // } - context('when there are no errors', function () { - const operations = [ - { insert: 0, document: { _id: 1 } }, - { update: 0 }, - { update: 0 }, - { delete: 0 } - ]; - const documents = [ - { ok: 1, idx: 0, n: 1 }, // Insert - { ok: 1, idx: 1, n: 1, nModified: 1 }, // Update match - { ok: 1, idx: 2, n: 0, upserted: { _id: 1 } }, // Update no match with upsert - { ok: 1, idx: 3, n: 1 } // Delete - ]; - const serverResponse = { - cursor: { - id: new Long('0'), - firstBatch: documents, - ns: 'admin.$cmd.bulkWrite' - }, - nErrors: 0, - nInserted: 1, - nMatched: 1, - nModified: 1, - nUpserted: 1, - nDeleted: 1, - ok: 1 - }; - const response = new ClientBulkWriteCursorResponse(BSON.serialize(serverResponse), 0); - const merger = new ClientBulkWriteResultsMerger({ verboseResults: true }); - let result: ClientBulkWriteResult; - - before(function () { - result = merger.merge(20, operations, response, documents); - }); - - it('merges the inserted count', function () { - expect(result.insertedCount).to.equal(1); - }); - - it('sets insert results', function () { - expect(result.insertResults.get(20).insertedId).to.equal(1); - }); - - it('merges the upserted count', function () { - expect(result.upsertedCount).to.equal(1); - }); - - it('merges the matched count', function () { - expect(result.matchedCount).to.equal(1); - }); - - it('merges the modified count', function () { - expect(result.modifiedCount).to.equal(1); - }); - - it('sets the update results', function () { - expect(result.updateResults.get(21)).to.deep.equal({ - matchedCount: 1, - modifiedCount: 1, - didUpsert: false - }); - }); - - it('sets the upsert results', function () { - expect(result.updateResults.get(22)).to.deep.equal({ - matchedCount: 0, - modifiedCount: 0, - upsertedId: 1, - didUpsert: true - }); - }); - - it('merges the deleted count', function () { - expect(result.deletedCount).to.equal(1); - }); - - it('sets the delete results', function () { - expect(result.deleteResults.get(23).deletedCount).to.equal(1); - }); - }); - }); - }); }); }); });