Skip to content

Commit

Permalink
feat(NODE-6337): implement client bulk write batching (#4248)
Browse files Browse the repository at this point in the history
  • Loading branch information
durran authored Oct 1, 2024
1 parent 85f7dcf commit d56e235
Show file tree
Hide file tree
Showing 16 changed files with 1,047 additions and 287 deletions.
70 changes: 53 additions & 17 deletions src/cmap/commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -429,10 +429,60 @@ export interface OpMsgOptions {

/** @internal */
export class DocumentSequence {
field: string;
documents: Document[];
serializedDocumentsLength: number;
private chunks: Uint8Array[];
private header: Buffer;

constructor(documents: Document[]) {
this.documents = documents;
/**
* Create a new document sequence for the provided field.
* @param field - The field it will replace.
*/
constructor(field: string, documents?: Document[]) {
this.field = field;
this.documents = [];
this.chunks = [];
this.serializedDocumentsLength = 0;
// Document sequences starts with type 1 at the first byte.
// Field strings must always be UTF-8.
const buffer = Buffer.allocUnsafe(1 + 4 + this.field.length + 1);
buffer[0] = 1;
// Third part is the field name at offset 5 with trailing null byte.
encodeUTF8Into(buffer, `${this.field}\0`, 5);
this.chunks.push(buffer);
this.header = buffer;
if (documents) {
for (const doc of documents) {
this.push(doc, BSON.serialize(doc));
}
}
}

/**
* Push a document to the document sequence. Will serialize the document
* as well and return the current serialized length of all documents.
* @param document - The document to add.
* @param buffer - The serialized document in raw BSON.
* @returns The new total document sequence length.
*/
push(document: Document, buffer: Uint8Array): number {
this.serializedDocumentsLength += buffer.length;
// Push the document.
this.documents.push(document);
// Push the document raw bson.
this.chunks.push(buffer);
// Write the new length.
this.header?.writeInt32LE(4 + this.field.length + 1 + this.serializedDocumentsLength, 1);
return this.serializedDocumentsLength + this.header.length;
}

/**
* Get the fully serialized bytes for the document sequence section.
* @returns The section bytes.
*/
toBin(): Uint8Array {
return Buffer.concat(this.chunks);
}
}

Expand Down Expand Up @@ -543,21 +593,7 @@ export class OpMsgRequest {
const chunks = [];
for (const [key, value] of Object.entries(document)) {
if (value instanceof DocumentSequence) {
// Document sequences starts with type 1 at the first byte.
const buffer = Buffer.allocUnsafe(1 + 4 + key.length + 1);
buffer[0] = 1;
// Third part is the field name at offset 5 with trailing null byte.
encodeUTF8Into(buffer, `${key}\0`, 5);
chunks.push(buffer);
// Fourth part are the documents' bytes.
let docsLength = 0;
for (const doc of value.documents) {
const docBson = this.serializeBson(doc);
docsLength += docBson.length;
chunks.push(docBson);
}
// Second part of the sequence is the length at offset 1;
buffer.writeInt32LE(4 + key.length + 1 + docsLength, 1);
chunks.push(value.toBin());
// Why are we removing the field from the command? This is because it needs to be
// removed in the OP_MSG request first section, and DocumentSequence is not a
// BSON type and is specific to the MongoDB wire protocol so there's nothing
Expand Down
29 changes: 21 additions & 8 deletions src/cursor/client_bulk_write_cursor.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import type { Document } from '../bson';
import { type Document } from 'bson';

import { type ClientBulkWriteCursorResponse } from '../cmap/wire_protocol/responses';
import { MongoBulkWriteCursorError } from '../error';
import { MongoClientBulkWriteCursorError } from '../error';
import type { MongoClient } from '../mongo_client';
import { ClientBulkWriteOperation } from '../operations/client_bulk_write/client_bulk_write';
import { type ClientBulkWriteCommandBuilder } from '../operations/client_bulk_write/command_builder';
import { type ClientBulkWriteOptions } from '../operations/client_bulk_write/common';
import { executeOperation } from '../operations/execute_operation';
import type { ClientSession } from '../sessions';
Expand All @@ -24,17 +26,21 @@ export interface ClientBulkWriteCursorOptions
* @internal
*/
export class ClientBulkWriteCursor extends AbstractCursor {
public readonly command: Document;
commandBuilder: ClientBulkWriteCommandBuilder;
/** @internal */
private cursorResponse?: ClientBulkWriteCursorResponse;
/** @internal */
private clientBulkWriteOptions: ClientBulkWriteOptions;

/** @internal */
constructor(client: MongoClient, command: Document, options: ClientBulkWriteOptions = {}) {
constructor(
client: MongoClient,
commandBuilder: ClientBulkWriteCommandBuilder,
options: ClientBulkWriteOptions = {}
) {
super(client, new MongoDBNamespace('admin', '$cmd'), options);

this.command = command;
this.commandBuilder = commandBuilder;
this.clientBulkWriteOptions = options;
}

Expand All @@ -44,22 +50,29 @@ export class ClientBulkWriteCursor extends AbstractCursor {
*/
get response(): ClientBulkWriteCursorResponse {
if (this.cursorResponse) return this.cursorResponse;
throw new MongoBulkWriteCursorError(
throw new MongoClientBulkWriteCursorError(
'No client bulk write cursor response returned from the server.'
);
}

/**
* Get the last set of operations the cursor executed.
*/
get operations(): Document[] {
return this.commandBuilder.lastOperations;
}

clone(): ClientBulkWriteCursor {
const clonedOptions = mergeOptions({}, this.clientBulkWriteOptions);
delete clonedOptions.session;
return new ClientBulkWriteCursor(this.client, this.command, {
return new ClientBulkWriteCursor(this.client, this.commandBuilder, {
...clonedOptions
});
}

/** @internal */
async _initialize(session: ClientSession): Promise<InitialCursorResponse> {
const clientBulkWriteOperation = new ClientBulkWriteOperation(this.command, {
const clientBulkWriteOperation = new ClientBulkWriteOperation(this.commandBuilder, {
...this.clientBulkWriteOptions,
...this.cursorOptions,
session
Expand Down
31 changes: 29 additions & 2 deletions src/error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ export class MongoGCPError extends MongoOIDCError {
* @public
* @category Error
*/
export class MongoBulkWriteCursorError extends MongoRuntimeError {
export class MongoClientBulkWriteCursorError extends MongoRuntimeError {
/**
* **Do not use this constructor!**
*
Expand All @@ -639,7 +639,34 @@ export class MongoBulkWriteCursorError extends MongoRuntimeError {
}

override get name(): string {
return 'MongoBulkWriteCursorError';
return 'MongoClientBulkWriteCursorError';
}
}

/**
* An error indicating that an error occurred on the client when executing a client bulk write.
*
* @public
* @category Error
*/
export class MongoClientBulkWriteExecutionError extends MongoRuntimeError {
/**
* **Do not use this constructor!**
*
* Meant for internal use only.
*
* @remarks
* This class is only meant to be constructed within the driver. This constructor is
* not subject to semantic versioning compatibility guarantees and may change at any time.
*
* @public
**/
constructor(message: string) {
super(message);
}

override get name(): string {
return 'MongoClientBulkWriteExecutionError';
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ export {
MongoAWSError,
MongoAzureError,
MongoBatchReExecutionError,
MongoBulkWriteCursorError,
MongoChangeStreamError,
MongoClientBulkWriteCursorError,
MongoClientBulkWriteExecutionError,
MongoCompatibilityError,
MongoCursorExhaustedError,
MongoCursorInUseError,
Expand Down
50 changes: 43 additions & 7 deletions src/operations/client_bulk_write/client_bulk_write.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,28 @@
import { type Document } from 'bson';

import { MongoClientBulkWriteExecutionError, ServerType } from '../../beta';
import { ClientBulkWriteCursorResponse } from '../../cmap/wire_protocol/responses';
import type { Server } from '../../sdam/server';
import type { ClientSession } from '../../sessions';
import { MongoDBNamespace } from '../../utils';
import { CommandOperation } from '../command';
import { Aspect, defineAspects } from '../operation';
import { type ClientBulkWriteCommandBuilder } from './command_builder';
import { type ClientBulkWriteOptions } from './common';

/**
* Executes a single client bulk write operation within a potential batch.
* @internal
*/
export class ClientBulkWriteOperation extends CommandOperation<ClientBulkWriteCursorResponse> {
command: Document;
commandBuilder: ClientBulkWriteCommandBuilder;
override options: ClientBulkWriteOptions;

override get commandName() {
return 'bulkWrite' as const;
}

constructor(command: Document, options: ClientBulkWriteOptions) {
constructor(commandBuilder: ClientBulkWriteCommandBuilder, options: ClientBulkWriteOptions) {
super(undefined, options);
this.command = command;
this.commandBuilder = commandBuilder;
this.options = options;
this.ns = new MongoDBNamespace('admin', '$cmd');
}
Expand All @@ -37,9 +37,45 @@ export class ClientBulkWriteOperation extends CommandOperation<ClientBulkWriteCu
server: Server,
session: ClientSession | undefined
): Promise<ClientBulkWriteCursorResponse> {
return await super.executeCommand(server, session, this.command, ClientBulkWriteCursorResponse);
let command;

if (server.description.type === ServerType.LoadBalancer) {
if (session) {
// Checkout a connection to build the command.
const connection = await server.pool.checkOut();
// Pin the connection to the session so it get used to execute the command and we do not
// perform a double check-in/check-out.
session.pin(connection);
command = this.commandBuilder.buildBatch(
connection.hello?.maxMessageSizeBytes,
connection.hello?.maxWriteBatchSize
);
} else {
throw new MongoClientBulkWriteExecutionError(
'Session provided to the client bulk write operation must be present.'
);
}
} else {
// At this point we have a server and the auto connect code has already
// run in executeOperation, so the server description will be populated.
// We can use that to build the command.
if (!server.description.maxWriteBatchSize || !server.description.maxMessageSizeBytes) {
throw new MongoClientBulkWriteExecutionError(
'In order to execute a client bulk write, both maxWriteBatchSize and maxMessageSizeBytes must be provided by the servers hello response.'
);
}
command = this.commandBuilder.buildBatch(
server.description.maxMessageSizeBytes,
server.description.maxWriteBatchSize
);
}
return await super.executeCommand(server, session, command, ClientBulkWriteCursorResponse);
}
}

// Skipping the collation as it goes on the individual ops.
defineAspects(ClientBulkWriteOperation, [Aspect.WRITE_OPERATION, Aspect.SKIP_COLLATION]);
defineAspects(ClientBulkWriteOperation, [
Aspect.WRITE_OPERATION,
Aspect.SKIP_COLLATION,
Aspect.CURSOR_CREATING
]);
Loading

0 comments on commit d56e235

Please sign in to comment.