Skip to content

Commit

Permalink
feat(NODE-6337): implement client bulk write batching
Browse files Browse the repository at this point in the history
  • Loading branch information
durran committed Sep 25, 2024
1 parent 3d3da40 commit 35e62b7
Show file tree
Hide file tree
Showing 14 changed files with 735 additions and 196 deletions.
76 changes: 59 additions & 17 deletions src/cmap/commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -429,10 +429,66 @@ export interface OpMsgOptions {

/** @internal */
export class DocumentSequence {
field: string;
documents: Document[];
serializedDocumentsLength: number;
private chunks: Uint8Array[];
private header?: Buffer;

constructor(documents: Document[]) {
this.documents = documents;
/**
* Create a new document sequence for the provided field.
* @param field - The field it will replace.
*/
constructor(field: string, documents?: Document[]) {
this.field = field;
this.documents = [];
this.chunks = [];
this.serializedDocumentsLength = 0;
this.init();
if (documents) {
for (const doc of documents) {
this.push(doc, BSON.serialize(doc));
}
}
}

/**
* Initialize the buffer chunks.
*/
private init() {
// Document sequences starts with type 1 at the first byte.
const buffer = Buffer.allocUnsafe(1 + 4 + this.field.length + 1);
buffer[0] = 1;
// Third part is the field name at offset 5 with trailing null byte.
encodeUTF8Into(buffer, `${this.field}\0`, 5);
this.chunks.push(buffer);
this.header = buffer;
}

/**
* Push a document to the document sequence. Will serialize the document
* as well and return the current serialized length of all documents.
* @param document - The document to add.
* @param buffer - The serialized document in raw BSON.
* @returns The new totoal document sequence length.
*/
push(document: Document, buffer: Uint8Array): number {
this.serializedDocumentsLength += buffer.length;
// Push the document.
this.documents.push(document);
// Push the document raw bson.
this.chunks.push(buffer);
// Write the new length.
this.header?.writeInt32LE(4 + this.field.length + 1 + this.serializedDocumentsLength, 1);
return this.serializedDocumentsLength + (this.header?.length ?? 0);
}

/**
* Get the fully serialized bytes for the document sequence section.
* @returns The section bytes.
*/
toBin(): Uint8Array {
return Buffer.concat(this.chunks);
}
}

Expand Down Expand Up @@ -543,21 +599,7 @@ export class OpMsgRequest {
const chunks = [];
for (const [key, value] of Object.entries(document)) {
if (value instanceof DocumentSequence) {
// Document sequences starts with type 1 at the first byte.
const buffer = Buffer.allocUnsafe(1 + 4 + key.length + 1);
buffer[0] = 1;
// Third part is the field name at offset 5 with trailing null byte.
encodeUTF8Into(buffer, `${key}\0`, 5);
chunks.push(buffer);
// Fourth part are the documents' bytes.
let docsLength = 0;
for (const doc of value.documents) {
const docBson = this.serializeBson(doc);
docsLength += docBson.length;
chunks.push(docBson);
}
// Second part of the sequence is the length at offset 1;
buffer.writeInt32LE(4 + key.length + 1 + docsLength, 1);
chunks.push(value.toBin());
// Why are we removing the field from the command? This is because it needs to be
// removed in the OP_MSG request first section, and DocumentSequence is not a
// BSON type and is specific to the MongoDB wire protocol so there's nothing
Expand Down
4 changes: 2 additions & 2 deletions src/cursor/client_bulk_write_cursor.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { Document } from '../bson';
import { type ClientBulkWriteCursorResponse } from '../cmap/wire_protocol/responses';
import { MongoBulkWriteCursorError } from '../error';
import { MongoClientBulkWriteCursorError } from '../error';
import type { MongoClient } from '../mongo_client';
import { ClientBulkWriteOperation } from '../operations/client_bulk_write/client_bulk_write';
import { type ClientBulkWriteOptions } from '../operations/client_bulk_write/common';
Expand Down Expand Up @@ -44,7 +44,7 @@ export class ClientBulkWriteCursor extends AbstractCursor {
*/
get response(): ClientBulkWriteCursorResponse {
if (this.cursorResponse) return this.cursorResponse;
throw new MongoBulkWriteCursorError(
throw new MongoClientBulkWriteCursorError(
'No client bulk write cursor response returned from the server.'
);
}
Expand Down
31 changes: 29 additions & 2 deletions src/error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ export class MongoGCPError extends MongoOIDCError {
* @public
* @category Error
*/
export class MongoBulkWriteCursorError extends MongoRuntimeError {
export class MongoClientBulkWriteCursorError extends MongoRuntimeError {
/**
* **Do not use this constructor!**
*
Expand All @@ -639,7 +639,34 @@ export class MongoBulkWriteCursorError extends MongoRuntimeError {
}

override get name(): string {
return 'MongoBulkWriteCursorError';
return 'MongoClientBulkWriteCursorError';
}
}

/**
* An error indicating that an error occurred on the client when executing a client bulk write.
*
* @public
* @category Error
*/
export class MongoClientBulkWriteExecutionError extends MongoRuntimeError {
/**
* **Do not use this constructor!**
*
* Meant for internal use only.
*
* @remarks
* This class is only meant to be constructed within the driver. This constructor is
* not subject to semantic versioning compatibility guarantees and may change at any time.
*
* @public
**/
constructor(message: string) {
super(message);
}

override get name(): string {
return 'MongoClientBulkWriteExecutionError';
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ export {
MongoAWSError,
MongoAzureError,
MongoBatchReExecutionError,
MongoBulkWriteCursorError,
MongoChangeStreamError,
MongoClientBulkWriteCursorError,
MongoClientBulkWriteExecutionError,
MongoCompatibilityError,
MongoCursorExhaustedError,
MongoCursorInUseError,
Expand Down
142 changes: 132 additions & 10 deletions src/operations/client_bulk_write/command_builder.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { type Document } from '../../bson';
import { BSON, type Document } from '../../bson';
import { DocumentSequence } from '../../cmap/commands';
import { type PkFactory } from '../../mongo_client';
import type { Filter, OptionalId, UpdateFilter, WithoutId } from '../../mongo_types';
Expand Down Expand Up @@ -28,6 +28,11 @@ export interface ClientBulkWriteCommand {
comment?: any;
}

/**
* The bytes overhead for the extra fields added post command generation.
*/
const MESSAGE_OVERHEAD_BYTES = 1000;

/** @internal */
export class ClientBulkWriteCommandBuilder {
models: AnyClientBulkWriteModel[];
Expand Down Expand Up @@ -62,32 +67,148 @@ export class ClientBulkWriteCommandBuilder {
/**
* Build the bulk write commands from the models.
*/
buildCommands(): ClientBulkWriteCommand[] {
buildCommands(maxMessageSizeBytes: number, maxWriteBatchSize: number): ClientBulkWriteCommand[] {
// Iterate the models to build the ops and nsInfo fields.
const operations = [];
// We need to do this in a loop which creates one command each up
// to the max bson size or max message size.
const commands: ClientBulkWriteCommand[] = [];
let currentCommandLength = 0;
let currentNamespaceIndex = 0;
let currentCommand: ClientBulkWriteCommand = this.baseCommand();
const namespaces = new Map<string, number>();

for (const model of this.models) {
const ns = model.namespace;
const index = namespaces.get(ns);

/**
* Convenience function for resetting everything when a new batch
* is started.
*/
const reset = () => {
commands.push(currentCommand);
namespaces.clear();
currentNamespaceIndex = 0;
currentCommand = this.baseCommand();
namespaces.set(ns, currentNamespaceIndex);
};

if (index != null) {
operations.push(buildOperation(model, index, this.pkFactory));
// Pushing to the ops document sequence returns the bytes length added.
const operation = buildOperation(model, index, this.pkFactory);
const operationBuffer = BSON.serialize(operation);

// Check if the operation buffer can fit in the current command. If it can,
// then add the operation to the document sequence and increment the
// current length as long as the ops don't exceed the maxWriteBatchSize.
if (
currentCommandLength + operationBuffer.length < maxMessageSizeBytes &&
currentCommand.ops.documents.length < maxWriteBatchSize
) {
// Pushing to the ops document sequence returns the bytes length added.
currentCommandLength =
MESSAGE_OVERHEAD_BYTES + this.addOperation(currentCommand, operation, operationBuffer);
} else {
// We need to batch. Push the current command to the commands
// array and create a new current command. We aslo need to clear the namespaces
// map for the new command.
reset();

const nsInfo = { ns: ns };
const nsInfoBuffer = BSON.serialize(nsInfo);
currentCommandLength =
MESSAGE_OVERHEAD_BYTES +
this.addOperationAndNsInfo(
currentCommand,
operation,
operationBuffer,
nsInfo,
nsInfoBuffer
);
}
} else {
namespaces.set(ns, currentNamespaceIndex);
operations.push(buildOperation(model, currentNamespaceIndex, this.pkFactory));
const nsInfo = { ns: ns };
const nsInfoBuffer = BSON.serialize(nsInfo);
const operation = buildOperation(model, currentNamespaceIndex, this.pkFactory);
const operationBuffer = BSON.serialize(operation);

// Check if the operation and nsInfo buffers can fit in the command. If they
// can, then add the operation and nsInfo to their respective document
// sequences and increment the current length as long as the ops don't exceed
// the maxWriteBatchSize.
if (
currentCommandLength + nsInfoBuffer.length + operationBuffer.length <
maxMessageSizeBytes &&
currentCommand.ops.documents.length < maxWriteBatchSize
) {
currentCommandLength =
MESSAGE_OVERHEAD_BYTES +
this.addOperationAndNsInfo(
currentCommand,
operation,
operationBuffer,
nsInfo,
nsInfoBuffer
);
} else {
// We need to batch. Push the current command to the commands
// array and create a new current command. Aslo clear the namespaces map.
reset();

currentCommandLength =
MESSAGE_OVERHEAD_BYTES +
this.addOperationAndNsInfo(
currentCommand,
operation,
operationBuffer,
nsInfo,
nsInfoBuffer
);
}
// We've added a new namespace, increment the namespace index.
currentNamespaceIndex++;
}
}

const nsInfo = Array.from(namespaces.keys(), ns => ({ ns }));
// After we've finisihed iterating all the models put the last current command
// only if there are operations in it.
if (currentCommand.ops.documents.length > 0) {
commands.push(currentCommand);
}

// The base command.
return commands;
}

private addOperation(
command: ClientBulkWriteCommand,
operation: Document,
operationBuffer: Uint8Array
): number {
// Pushing to the ops document sequence returns the bytes length added.
return command.ops.push(operation, operationBuffer);
}

private addOperationAndNsInfo(
command: ClientBulkWriteCommand,
operation: Document,
operationBuffer: Uint8Array,
nsInfo: Document,
nsInfoBuffer: Uint8Array
): number {
// Pushing to the nsInfo document sequence returns the bytes length added.
const nsInfoLength = command.nsInfo.push(nsInfo, nsInfoBuffer);
const opsLength = this.addOperation(command, operation, operationBuffer);
return nsInfoLength + opsLength;
}

private baseCommand(): ClientBulkWriteCommand {
const command: ClientBulkWriteCommand = {
bulkWrite: 1,
errorsOnly: this.errorsOnly,
ordered: this.options.ordered ?? true,
ops: new DocumentSequence(operations),
nsInfo: new DocumentSequence(nsInfo)
ops: new DocumentSequence('ops'),
nsInfo: new DocumentSequence('nsInfo')
};
// Add bypassDocumentValidation if it was present in the options.
if (this.options.bypassDocumentValidation != null) {
Expand All @@ -103,7 +224,8 @@ export class ClientBulkWriteCommandBuilder {
if (this.options.comment !== undefined) {
command.comment = this.options.comment;
}
return [command];

return command;
}
}

Expand Down
Loading

0 comments on commit 35e62b7

Please sign in to comment.