Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(NODE-6350): add typescript support to client bulkWrite API #4257

Merged
merged 11 commits into from
Oct 20, 2024
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,7 @@ export type {
export type {
AnyClientBulkWriteModel,
ClientBulkWriteError,
ClientBulkWriteModel,
ClientBulkWriteOptions,
ClientBulkWriteResult,
ClientDeleteManyModel,
Expand Down
16 changes: 10 additions & 6 deletions src/mongo_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import {
} from './mongo_logger';
import { TypedEventEmitter } from './mongo_types';
import {
type AnyClientBulkWriteModel,
type ClientBulkWriteModel,
type ClientBulkWriteOptions,
type ClientBulkWriteResult
} from './operations/client_bulk_write/common';
Expand Down Expand Up @@ -331,7 +331,6 @@ export type MongoClientEvents = Pick<TopologyEvents, (typeof MONGO_CLIENT_EVENTS
};

/** @internal */

const kOptions = Symbol('options');

/**
Expand Down Expand Up @@ -489,16 +488,21 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements
* @param options - The client bulk write options.
* @returns A ClientBulkWriteResult for acknowledged writes and ok: 1 for unacknowledged writes.
*/
async bulkWrite(
models: AnyClientBulkWriteModel[],
async bulkWrite<SchemaMap extends Record<string, Document> = Record<string, Document>>(
models: ReadonlyArray<ClientBulkWriteModel<SchemaMap>>,
options?: ClientBulkWriteOptions
): Promise<ClientBulkWriteResult | { ok: 1 }> {
): Promise<ClientBulkWriteResult> {
if (this.autoEncrypter) {
throw new MongoInvalidArgumentError(
'MongoClient bulkWrite does not currently support automatic encryption.'
);
}
return await new ClientBulkWriteExecutor(this, models, options).execute();
// We do not need schema type information past this point ("as any" is fine)
return await new ClientBulkWriteExecutor(
this,
models as any,
baileympearson marked this conversation as resolved.
Show resolved Hide resolved
resolveOptions(this, options)
).execute();
}

/**
Expand Down
28 changes: 17 additions & 11 deletions src/operations/client_bulk_write/command_builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const MESSAGE_OVERHEAD_BYTES = 1000;

/** @internal */
export class ClientBulkWriteCommandBuilder {
models: AnyClientBulkWriteModel[];
models: ReadonlyArray<AnyClientBulkWriteModel<Document>>;
options: ClientBulkWriteOptions;
pkFactory: PkFactory;
/** The current index in the models array that is being processed. */
Expand All @@ -53,7 +53,7 @@ export class ClientBulkWriteCommandBuilder {
* @param models - The client write models.
*/
constructor(
models: AnyClientBulkWriteModel[],
models: ReadonlyArray<AnyClientBulkWriteModel<Document>>,
options: ClientBulkWriteOptions,
pkFactory?: PkFactory
) {
Expand Down Expand Up @@ -248,7 +248,7 @@ interface ClientInsertOperation {
* @returns the operation.
*/
export const buildInsertOneOperation = (
model: ClientInsertOneModel,
model: ClientInsertOneModel<Document>,
index: number,
pkFactory: PkFactory
): ClientInsertOperation => {
Expand All @@ -275,7 +275,10 @@ export interface ClientDeleteOperation {
* @param index - The namespace index.
* @returns the operation.
*/
export const buildDeleteOneOperation = (model: ClientDeleteOneModel, index: number): Document => {
export const buildDeleteOneOperation = (
model: ClientDeleteOneModel<Document>,
index: number
): Document => {
return createDeleteOperation(model, index, false);
};

Expand All @@ -285,15 +288,18 @@ export const buildDeleteOneOperation = (model: ClientDeleteOneModel, index: numb
* @param index - The namespace index.
* @returns the operation.
*/
export const buildDeleteManyOperation = (model: ClientDeleteManyModel, index: number): Document => {
export const buildDeleteManyOperation = (
model: ClientDeleteManyModel<Document>,
index: number
): Document => {
return createDeleteOperation(model, index, true);
};

/**
* Creates a delete operation based on the parameters.
*/
function createDeleteOperation(
model: ClientDeleteOneModel | ClientDeleteManyModel,
model: ClientDeleteOneModel<Document> | ClientDeleteManyModel<Document>,
index: number,
multi: boolean
): ClientDeleteOperation {
Expand Down Expand Up @@ -330,7 +336,7 @@ export interface ClientUpdateOperation {
* @returns the operation.
*/
export const buildUpdateOneOperation = (
model: ClientUpdateOneModel,
model: ClientUpdateOneModel<Document>,
index: number
): ClientUpdateOperation => {
return createUpdateOperation(model, index, false);
Expand All @@ -343,7 +349,7 @@ export const buildUpdateOneOperation = (
* @returns the operation.
*/
export const buildUpdateManyOperation = (
model: ClientUpdateManyModel,
model: ClientUpdateManyModel<Document>,
index: number
): ClientUpdateOperation => {
return createUpdateOperation(model, index, true);
Expand All @@ -365,7 +371,7 @@ function validateUpdate(update: Document) {
* Creates a delete operation based on the parameters.
*/
function createUpdateOperation(
model: ClientUpdateOneModel | ClientUpdateManyModel,
model: ClientUpdateOneModel<Document> | ClientUpdateManyModel<Document>,
index: number,
multi: boolean
): ClientUpdateOperation {
Expand Down Expand Up @@ -413,7 +419,7 @@ export interface ClientReplaceOneOperation {
* @returns the operation.
*/
export const buildReplaceOneOperation = (
model: ClientReplaceOneModel,
model: ClientReplaceOneModel<Document>,
index: number
): ClientReplaceOneOperation => {
if (hasAtomicOperators(model.replacement)) {
Expand Down Expand Up @@ -442,7 +448,7 @@ export const buildReplaceOneOperation = (

/** @internal */
export function buildOperation(
model: AnyClientBulkWriteModel,
model: AnyClientBulkWriteModel<Document>,
index: number,
pkFactory: PkFactory
): Document {
Expand Down
77 changes: 55 additions & 22 deletions src/operations/client_bulk_write/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,50 +32,50 @@ export interface ClientWriteModel {
}

/** @public */
export interface ClientInsertOneModel extends ClientWriteModel {
export interface ClientInsertOneModel<TSchema> extends ClientWriteModel {
name: 'insertOne';
/** The document to insert. */
document: OptionalId<Document>;
document: OptionalId<TSchema>;
}

/** @public */
export interface ClientDeleteOneModel extends ClientWriteModel {
export interface ClientDeleteOneModel<TSchema> extends ClientWriteModel {
name: 'deleteOne';
/**
* The filter used to determine if a document should be deleted.
* For a deleteOne operation, the first match is removed.
*/
filter: Filter<Document>;
filter: Filter<TSchema>;
/** Specifies a collation. */
collation?: CollationOptions;
/** The index to use. If specified, then the query system will only consider plans using the hinted index. */
hint?: Hint;
}

/** @public */
export interface ClientDeleteManyModel extends ClientWriteModel {
export interface ClientDeleteManyModel<TSchema> extends ClientWriteModel {
name: 'deleteMany';
/**
* The filter used to determine if a document should be deleted.
* For a deleteMany operation, all matches are removed.
*/
filter: Filter<Document>;
filter: Filter<TSchema>;
/** Specifies a collation. */
collation?: CollationOptions;
/** The index to use. If specified, then the query system will only consider plans using the hinted index. */
hint?: Hint;
}

/** @public */
export interface ClientReplaceOneModel extends ClientWriteModel {
export interface ClientReplaceOneModel<TSchema> extends ClientWriteModel {
name: 'replaceOne';
/**
* The filter used to determine if a document should be replaced.
* For a replaceOne operation, the first match is replaced.
*/
filter: Filter<Document>;
filter: Filter<TSchema>;
/** The document with which to replace the matched document. */
replacement: WithoutId<Document>;
replacement: WithoutId<TSchema>;
/** Specifies a collation. */
collation?: CollationOptions;
/** The index to use. If specified, then the query system will only consider plans using the hinted index. */
Expand All @@ -85,19 +85,19 @@ export interface ClientReplaceOneModel extends ClientWriteModel {
}

/** @public */
export interface ClientUpdateOneModel extends ClientWriteModel {
export interface ClientUpdateOneModel<TSchema> extends ClientWriteModel {
name: 'updateOne';
/**
* The filter used to determine if a document should be updated.
* For an updateOne operation, the first match is updated.
*/
filter: Filter<Document>;
filter: Filter<TSchema>;
/**
* The modifications to apply. The value can be either:
* UpdateFilter<Document> - A document that contains update operator expressions,
* Document[] - an aggregation pipeline.
*/
update: UpdateFilter<Document> | Document[];
update: UpdateFilter<TSchema> | Document[];
/** A set of filters specifying to which array elements an update should apply. */
arrayFilters?: Document[];
/** Specifies a collation. */
Expand All @@ -109,19 +109,19 @@ export interface ClientUpdateOneModel extends ClientWriteModel {
}

/** @public */
export interface ClientUpdateManyModel extends ClientWriteModel {
export interface ClientUpdateManyModel<TSchema> extends ClientWriteModel {
name: 'updateMany';
/**
* The filter used to determine if a document should be updated.
* For an updateMany operation, all matches are updated.
*/
filter: Filter<Document>;
filter: Filter<TSchema>;
/**
* The modifications to apply. The value can be either:
* UpdateFilter<Document> - A document that contains update operator expressions,
* Document[] - an aggregation pipeline.
*/
update: UpdateFilter<Document> | Document[];
update: UpdateFilter<TSchema> | Document[];
/** A set of filters specifying to which array elements an update should apply. */
arrayFilters?: Document[];
/** Specifies a collation. */
Expand All @@ -137,16 +137,49 @@ export interface ClientUpdateManyModel extends ClientWriteModel {
* to MongoClient#bulkWrite.
* @public
*/
export type AnyClientBulkWriteModel =
| ClientInsertOneModel
| ClientReplaceOneModel
| ClientUpdateOneModel
| ClientUpdateManyModel
| ClientDeleteOneModel
| ClientDeleteManyModel;
export type AnyClientBulkWriteModel<TSchema extends Document> =
| ClientInsertOneModel<TSchema>
| ClientReplaceOneModel<TSchema>
| ClientUpdateOneModel<TSchema>
| ClientUpdateManyModel<TSchema>
| ClientDeleteOneModel<TSchema>
| ClientDeleteManyModel<TSchema>;

/**
* Take a Typescript type that maps namespaces to schema types.
baileympearson marked this conversation as resolved.
Show resolved Hide resolved
* @public
*
* @example
* ```ts
* type MongoDBSchemas = {
* 'db.books': Book;
* 'db.authors': Author;
* }
*
* const model: ClientBulkWriteModel<MongoDBSchemas> = {
* namespace: 'db.books'
* name: 'insertOne',
* document: { title: 'Practical MongoDB Aggregations', authorName: 3 } // error `authorName` cannot be number
* };
* ```
*
* The type of the `namespace` field narrows other parts of the BulkWriteModel to use the correct schema for type assertions.
*
*/
export type ClientBulkWriteModel<
SchemaMap extends Record<string, Document> = Record<string, Document>
> = {
[Namespace in keyof SchemaMap]: AnyClientBulkWriteModel<SchemaMap[Namespace]> & {
namespace: Namespace;
};
}[keyof SchemaMap];

/** @public */
export interface ClientBulkWriteResult {
/**
* Whether the bulk write was acknowledged.
*/
acknowledged: boolean;
/**
* The total number of documents inserted across all insert operations.
*/
Expand Down
14 changes: 8 additions & 6 deletions src/operations/client_bulk_write/executor.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { type Document } from 'bson';

import { ClientBulkWriteCursor } from '../../cursor/client_bulk_write_cursor';
import {
MongoClientBulkWriteError,
Expand All @@ -22,9 +24,9 @@ import { ClientBulkWriteResultsMerger } from './results_merger';
* @internal
*/
export class ClientBulkWriteExecutor {
client: MongoClient;
options: ClientBulkWriteOptions;
operations: AnyClientBulkWriteModel[];
private readonly client: MongoClient;
private readonly options: ClientBulkWriteOptions;
private readonly operations: ReadonlyArray<AnyClientBulkWriteModel<Document>>;

/**
* Instantiate the executor.
Expand All @@ -34,7 +36,7 @@ export class ClientBulkWriteExecutor {
*/
constructor(
client: MongoClient,
operations: AnyClientBulkWriteModel[],
operations: ReadonlyArray<AnyClientBulkWriteModel<Document>>,
options?: ClientBulkWriteOptions
) {
if (operations.length === 0) {
Expand Down Expand Up @@ -75,7 +77,7 @@ export class ClientBulkWriteExecutor {
* for each, then merge the results into one.
* @returns The result.
*/
async execute(): Promise<ClientBulkWriteResult | { ok: 1 }> {
async execute(): Promise<ClientBulkWriteResult> {
// The command builder will take the user provided models and potential split the batch
// into multiple commands due to size.
const pkFactory = this.client.s.options.pkFactory;
Expand All @@ -90,7 +92,7 @@ export class ClientBulkWriteExecutor {
const operation = new ClientBulkWriteOperation(commandBuilder, this.options);
await executeOperation(this.client, operation);
}
return { ok: 1 };
return ClientBulkWriteResultsMerger.unacknowledged();
} else {
const resultsMerger = new ClientBulkWriteResultsMerger(this.options);
// For each command will will create and exhaust a cursor for the results.
Expand Down
Loading