Skip to content

Commit

Permalink
feat(NODE-6337): implement client bulk write batching
Browse files Browse the repository at this point in the history
  • Loading branch information
durran committed Sep 24, 2024
1 parent c7fc4e2 commit 734113d
Show file tree
Hide file tree
Showing 6 changed files with 334 additions and 176 deletions.
78 changes: 61 additions & 17 deletions src/cmap/commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -429,10 +429,68 @@ export interface OpMsgOptions {

/** @internal */
export class DocumentSequence {
field: string;
documents: Document[];
serializedDocumentsLength: number;
private chunks: Uint8Array[];
private header?: Buffer;

constructor(documents: Document[]) {
this.documents = documents;
/**
* Create a new document sequence for the provided field.
* @param field - The field it will replace.
*/
constructor(field: string, documents?: Document[]) {
this.field = field;
this.documents = [];
this.chunks = [];
this.serializedDocumentsLength = 0;
this.init();
if (documents) {
for (const doc of documents) {
this.push(doc);
}
}
}

/**
* Initialize the buffer chunks.
*/
private init() {
// Document sequences starts with type 1 at the first byte.
const buffer = Buffer.allocUnsafe(1 + 4 + this.field.length + 1);
buffer[0] = 1;
// Third part is the field name at offset 5 with trailing null byte.
encodeUTF8Into(buffer, `${this.field}\0`, 5);
this.chunks.push(buffer);
this.header = buffer;
}

/**
* Push a document to the document sequence. Will serialize the document
* as well and return the current serialized length of all documents.
* @param document - The document to add.
* @returns The serialized documents length.
*/
push(document: Document): number {
// First serialize the document and recalculate the documents length.
const docBuffer = BSON.serialize(document);
this.serializedDocumentsLength += docBuffer.length;
// Push the document.
this.documents.push(document);
// Push the document raw bson.
this.chunks.push(docBuffer);
// Write the new length.
this.header?.writeInt32LE(4 + this.field.length + 1 + this.serializedDocumentsLength, 1);
return this.serializedDocumentsLength;
}

/**
* Get the fully serialized bytes for the document sequence section.
* @returns The section bytes.
*/
toBin(): Uint8Array {
// TODO: What to do if no documents?
return Buffer.concat(this.chunks);
}
}

Expand Down Expand Up @@ -543,21 +601,7 @@ export class OpMsgRequest {
const chunks = [];
for (const [key, value] of Object.entries(document)) {
if (value instanceof DocumentSequence) {
// Document sequences starts with type 1 at the first byte.
const buffer = Buffer.allocUnsafe(1 + 4 + key.length + 1);
buffer[0] = 1;
// Third part is the field name at offset 5 with trailing null byte.
encodeUTF8Into(buffer, `${key}\0`, 5);
chunks.push(buffer);
// Fourth part are the documents' bytes.
let docsLength = 0;
for (const doc of value.documents) {
const docBson = this.serializeBson(doc);
docsLength += docBson.length;
chunks.push(docBson);
}
// Second part of the sequence is the length at offset 1;
buffer.writeInt32LE(4 + key.length + 1 + docsLength, 1);
chunks.push(value.toBin());
// Why are we removing the field from the command? This is because it needs to be
// removed in the OP_MSG request first section, and DocumentSequence is not a
// BSON type and is specific to the MongoDB wire protocol so there's nothing
Expand Down
6 changes: 4 additions & 2 deletions src/operations/client_bulk_write/command_builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ export class ClientBulkWriteCommandBuilder {
*/
buildCommands(): ClientBulkWriteCommand[] {
// Iterate the models to build the ops and nsInfo fields.
// We need to do this in a loop which creates one command each up
// to the max bson size or max message size.
const operations = [];
let currentNamespaceIndex = 0;
const namespaces = new Map<string, number>();
Expand All @@ -86,8 +88,8 @@ export class ClientBulkWriteCommandBuilder {
bulkWrite: 1,
errorsOnly: this.errorsOnly,
ordered: this.options.ordered ?? true,
ops: new DocumentSequence(operations),
nsInfo: new DocumentSequence(nsInfo)
ops: new DocumentSequence('ops', operations),
nsInfo: new DocumentSequence('nsInfo', nsInfo)
};
// Add bypassDocumentValidation if it was present in the options.
if (this.options.bypassDocumentValidation != null) {
Expand Down
6 changes: 5 additions & 1 deletion src/operations/client_bulk_write/executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,14 @@ async function executeAcknowledged(
): Promise<ClientBulkWriteResult> {
const resultsMerger = new ClientBulkWriteResultsMerger(options);
// For each command will will create and exhaust a cursor for the results.
let currentBatchOffset = 0;
for (const command of commands) {
const cursor = new ClientBulkWriteCursor(client, command, options);
const docs = await cursor.toArray();
resultsMerger.merge(command.ops.documents, cursor.response, docs);
const operations = command.ops.documents;
resultsMerger.merge(currentBatchOffset, operations, cursor.response, docs);
// Set the new batch index so we can back back to the index in the original models.
currentBatchOffset += operations.length;
}
return resultsMerger.result;
}
Expand Down
12 changes: 9 additions & 3 deletions src/operations/client_bulk_write/results_merger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,13 @@ export class ClientBulkWriteResultsMerger {

/**
* Merge the results in the cursor to the existing result.
* @param currentBatchOffset - The offset index to the original models.
* @param response - The cursor response.
* @param documents - The documents in the cursor.
* @returns The current result.
*/
merge(
currentBatchOffset: number,
operations: Document[],
response: ClientBulkWriteCursorResponse,
documents: Document[]
Expand All @@ -67,7 +69,9 @@ export class ClientBulkWriteResultsMerger {
const operation = operations[document.idx];
// Handle insert results.
if ('insert' in operation) {
this.result.insertResults?.set(document.idx, { insertedId: operation.document._id });
this.result.insertResults?.set(document.idx + currentBatchOffset, {
insertedId: operation.document._id
});
}
// Handle update results.
if ('update' in operation) {
Expand All @@ -81,11 +85,13 @@ export class ClientBulkWriteResultsMerger {
if (document.upserted) {
result.upsertedId = document.upserted._id;
}
this.result.updateResults?.set(document.idx, result);
this.result.updateResults?.set(document.idx + currentBatchOffset, result);
}
// Handle delete results.
if ('delete' in operation) {
this.result.deleteResults?.set(document.idx, { deletedCount: document.n });
this.result.deleteResults?.set(document.idx + currentBatchOffset, {
deletedCount: document.n
});
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions test/unit/cmap/commands.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ describe('commands', function () {
context('when there is one document sequence', function () {
const command = {
test: 1,
field: new DocumentSequence([{ test: 1 }])
field: new DocumentSequence('field', [{ test: 1 }])
};
const msg = new OpMsgRequest('admin', command, {});
const buffers = msg.toBin();
Expand Down Expand Up @@ -53,8 +53,8 @@ describe('commands', function () {
context('when there are multiple document sequences', function () {
const command = {
test: 1,
fieldOne: new DocumentSequence([{ test: 1 }]),
fieldTwo: new DocumentSequence([{ test: 1 }])
fieldOne: new DocumentSequence('fieldOne', [{ test: 1 }]),
fieldTwo: new DocumentSequence('fieldTwo', [{ test: 1 }])
};
const msg = new OpMsgRequest('admin', command, {});
const buffers = msg.toBin();
Expand Down
Loading

0 comments on commit 734113d

Please sign in to comment.