Skip to content

Commit

Permalink
fix _id generation in bulk writes && misc cleanups
Browse files Browse the repository at this point in the history
  • Loading branch information
baileympearson committed Mar 7, 2024
1 parent c093a45 commit eded423
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 38 deletions.
56 changes: 26 additions & 30 deletions src/bulk/common.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { promisify } from 'util';

import { type BSONSerializeOptions, type Document, ObjectId, resolveBSONOptions } from '../bson';
import { type BSONSerializeOptions, type Document, resolveBSONOptions } from '../bson';
import type { Collection } from '../collection';
import {
type AnyError,
Expand All @@ -12,6 +12,7 @@ import {
} from '../error';
import type { Filter, OneOrMore, OptionalId, UpdateFilter, WithoutId } from '../mongo_types';
import type { CollationOptions, CommandOperationOptions } from '../operations/command';
import { maybeAddIdToDocuments } from '../operations/common_functions';
import { DeleteOperation, type DeleteStatement, makeDeleteStatement } from '../operations/delete';
import { executeOperation } from '../operations/execute_operation';
import { InsertOperation } from '../operations/insert';
Expand Down Expand Up @@ -917,7 +918,7 @@ export abstract class BulkOperationBase {
* Create a new OrderedBulkOperation or UnorderedBulkOperation instance
* @internal
*/
constructor(collection: Collection, options: BulkWriteOptions, isOrdered: boolean) {
constructor(private collection: Collection, options: BulkWriteOptions, isOrdered: boolean) {
// determine whether bulkOperation is ordered or unordered
this.isOrdered = isOrdered;

Expand Down Expand Up @@ -1032,9 +1033,9 @@ export abstract class BulkOperationBase {
* ```
*/
insert(document: Document): BulkOperationBase {
if (document._id == null && !shouldForceServerObjectId(this)) {
document._id = new ObjectId();
}
maybeAddIdToDocuments(this.collection, document, {
forceServerObjectId: this.shouldForceServerObjectId()
});

return this.addToOperationsList(BatchType.INSERT, document);
}
Expand Down Expand Up @@ -1093,21 +1094,16 @@ export abstract class BulkOperationBase {
throw new MongoInvalidArgumentError('Operation must be an object with an operation key');
}
if ('insertOne' in op) {
const forceServerObjectId = shouldForceServerObjectId(this);
if (op.insertOne && op.insertOne.document == null) {
// NOTE: provided for legacy support, but this is a malformed operation
if (forceServerObjectId !== true && (op.insertOne as Document)._id == null) {
(op.insertOne as Document)._id = new ObjectId();
}

return this.addToOperationsList(BatchType.INSERT, op.insertOne);
}
const forceServerObjectId = this.shouldForceServerObjectId();
const document =
op.insertOne && op.insertOne.document == null
? // NOTE: provided for legacy support, but this is a malformed operation
(op.insertOne as Document)
: op.insertOne.document;

if (forceServerObjectId !== true && op.insertOne.document._id == null) {
op.insertOne.document._id = new ObjectId();
}
maybeAddIdToDocuments(this.collection, document, { forceServerObjectId });

return this.addToOperationsList(BatchType.INSERT, op.insertOne.document);
return this.addToOperationsList(BatchType.INSERT, document);
}

if ('replaceOne' in op || 'updateOne' in op || 'updateMany' in op) {
Expand Down Expand Up @@ -1268,6 +1264,18 @@ export abstract class BulkOperationBase {
batchType: BatchType,
document: Document | UpdateStatement | DeleteStatement
): this;

private shouldForceServerObjectId(): boolean {
if (typeof this.s.options.forceServerObjectId === 'boolean') {
return this.s.options.forceServerObjectId;
}

if (typeof this.s.collection.s.db.options?.forceServerObjectId === 'boolean') {
return this.s.collection.s.db.options?.forceServerObjectId;
}

return false;
}
}

Object.defineProperty(BulkOperationBase.prototype, 'length', {
Expand All @@ -1277,18 +1285,6 @@ Object.defineProperty(BulkOperationBase.prototype, 'length', {
}
});

function shouldForceServerObjectId(bulkOperation: BulkOperationBase): boolean {
if (typeof bulkOperation.s.options.forceServerObjectId === 'boolean') {
return bulkOperation.s.options.forceServerObjectId;
}

if (typeof bulkOperation.s.collection.s.db.options?.forceServerObjectId === 'boolean') {
return bulkOperation.s.collection.s.db.options?.forceServerObjectId;
}

return false;
}

function isInsertBatch(batch: Batch): boolean {
return batch.batchType === BatchType.INSERT;
}
Expand Down
21 changes: 16 additions & 5 deletions src/operations/common_functions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,26 +43,37 @@ export async function indexInformation(
return info;
}

export function prepareDocs(
export function maybeAddIdToDocuments(
coll: Collection,
docs: Document[],
options: { forceServerObjectId?: boolean }
): Document[] {
): Document[];
export function maybeAddIdToDocuments(
coll: Collection,
docs: Document,
options: { forceServerObjectId?: boolean }
): Document;
export function maybeAddIdToDocuments(
coll: Collection,
docOrDocs: Document[] | Document,
options: { forceServerObjectId?: boolean }
): Document[] | Document {
const forceServerObjectId =
typeof options.forceServerObjectId === 'boolean'
? options.forceServerObjectId
: coll.s.db.options?.forceServerObjectId;

// no need to modify the docs if server sets the ObjectId
if (forceServerObjectId === true) {
return docs;
return docOrDocs;
}

return docs.map(doc => {
const transform = (doc: Document): Document => {
if (doc._id == null) {
doc._id = coll.s.pkFactory.createPk();
}

return doc;
});
};
return Array.isArray(docOrDocs) ? docOrDocs.map(transform) : transform(docOrDocs);
}
8 changes: 5 additions & 3 deletions src/operations/insert.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import type { MongoDBNamespace } from '../utils';
import { WriteConcern } from '../write_concern';
import { BulkWriteOperation } from './bulk_write';
import { CommandOperation, type CommandOperationOptions } from './command';
import { prepareDocs } from './common_functions';
import { maybeAddIdToDocuments } from './common_functions';
import { AbstractOperation, Aspect, defineAspects } from './operation';

/** @internal */
Expand Down Expand Up @@ -69,7 +69,7 @@ export interface InsertOneResult<TSchema = Document> {

export class InsertOneOperation extends InsertOperation {
constructor(collection: Collection, doc: Document, options: InsertOneOptions) {
super(collection.s.namespace, prepareDocs(collection, [doc], options), options);
super(collection.s.namespace, maybeAddIdToDocuments(collection, [doc], options), options);
}

override async execute(
Expand Down Expand Up @@ -131,7 +131,9 @@ export class InsertManyOperation extends AbstractOperation<InsertManyResult> {
const writeConcern = WriteConcern.fromOptions(options);
const bulkWriteOperation = new BulkWriteOperation(
coll,
prepareDocs(coll, this.docs, options).map(document => ({ insertOne: { document } })),
maybeAddIdToDocuments(coll, this.docs, options).map(document => ({
insertOne: { document }
})),
options
);

Expand Down

0 comments on commit eded423

Please sign in to comment.