Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spanner): add support for batchWrite #2054

Merged
merged 22 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ Samples are in the [`samples/`](https://github.com/googleapis/nodejs-spanner/tre
| Backups-restore | [source code](https://github.com/googleapis/nodejs-spanner/blob/main/samples/backups-restore.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-spanner&page=editor&open_in_editor=samples/backups-restore.js,samples/README.md) |
| Backups-update | [source code](https://github.com/googleapis/nodejs-spanner/blob/main/samples/backups-update.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-spanner&page=editor&open_in_editor=samples/backups-update.js,samples/README.md) |
| Backups | [source code](https://github.com/googleapis/nodejs-spanner/blob/main/samples/backups.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-spanner&page=editor&open_in_editor=samples/backups.js,samples/README.md) |
| Batch Write | [source code](https://github.com/googleapis/nodejs-spanner/blob/main/samples/batch-write.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-spanner&page=editor&open_in_editor=samples/batch-write.js,samples/README.md) |
| Batch | [source code](https://github.com/googleapis/nodejs-spanner/blob/main/samples/batch.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-spanner&page=editor&open_in_editor=samples/batch.js,samples/README.md) |
| CRUD | [source code](https://github.com/googleapis/nodejs-spanner/blob/main/samples/crud.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-spanner&page=editor&open_in_editor=samples/crud.js,samples/README.md) |
| Creates a new database with a specific default leader | [source code](https://github.com/googleapis/nodejs-spanner/blob/main/samples/database-create-with-default-leader.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-spanner&page=editor&open_in_editor=samples/database-create-with-default-leader.js,samples/README.md) |
Expand Down
18 changes: 18 additions & 0 deletions samples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ and automatic, synchronous replication for high availability.
* [Backups-restore](#backups-restore)
* [Backups-update](#backups-update)
* [Backups](#backups)
* [Batch Write](#batch-write)
* [Batch](#batch)
* [CRUD](#crud)
* [Creates a new database with a specific default leader](#creates-a-new-database-with-a-specific-default-leader)
Expand Down Expand Up @@ -354,6 +355,23 @@ __Usage:__



### Batch Write

View the [source code](https://github.com/googleapis/nodejs-spanner/blob/main/samples/batch-write.js).

[![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-spanner&page=editor&open_in_editor=samples/batch-write.js,samples/README.md)

__Usage:__


`node batch-write.js <INSTANCE_ID> <DATABASE_ID> <PROJECT_ID>`


-----




### Batch

View the [source code](https://github.com/googleapis/nodejs-spanner/blob/main/samples/batch.js).
Expand Down
123 changes: 123 additions & 0 deletions samples/batch-write.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/**
* Copyright 2024 Google LLC
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// sample-metadata:
// title: Batch Write
// usage: node batch-write.js <INSTANCE_ID> <DATABASE_ID> <PROJECT_ID>

'use strict';

async function main(
instanceId = 'my-instance',
databaseId = 'my-database',
projectId = 'my-project-id'
) {
// [START spanner_batch_write_at_least_once]

// Imports the Google Cloud client library
const {Spanner, MutationGroup} = require('@google-cloud/spanner');

/**
* TODO(developer): Uncomment the following lines before running the sample.
*/
// const instanceId = 'my-instance';
// const databaseId = 'my-database';
// const projectId = 'my-project-id';

// Creates a client
const spanner = new Spanner({
projectId: projectId,
});

// Gets a reference to a Cloud Spanner instance and database
const instance = spanner.instance(instanceId);
const database = instance.database(databaseId);

// Create Mutation Groups
alkatrivedi marked this conversation as resolved.
Show resolved Hide resolved
/**
* Related mutations should be placed in a group, such as insert mutations for both a parent and a child row.
* A group must contain related mutations.
* Please see {@link https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.v1#google.spanner.v1.BatchWriteRequest.MutationGroup}
* for more details and examples.
*/
const mutationGroup1 = new MutationGroup();
mutationGroup1.insert('Singers', {
SingerId: 1,
FirstName: 'Scarlet',
LastName: 'Terry',
});

const mutationGroup2 = new MutationGroup();
mutationGroup2.insert('Singers', {
SingerId: 2,
FirstName: 'Marc',
});
mutationGroup2.insert('Singers', {
SingerId: 3,
FirstName: 'Catalina',
LastName: 'Smith',
});
mutationGroup2.insert('Albums', {
AlbumId: 1,
SingerId: 2,
AlbumTitle: 'Total Junk',
});
mutationGroup2.insert('Albums', {
AlbumId: 2,
SingerId: 3,
AlbumTitle: 'Go, Go, Go',
});

const options = {
transactionTag: 'batch-write-tag',
};

try {
database
.batchWriteAtLeastOnce([mutationGroup1, mutationGroup2], options)
.on('error', console.error)
.on('data', response => {
// Check the response code of each response to determine whether the mutation group(s) were applied successfully.
if (response.status.code === 0) {
alkatrivedi marked this conversation as resolved.
Show resolved Hide resolved
console.log(
`Mutation group indexes ${
response.indexes
}, have been applied with commit timestamp ${Spanner.timestamp(
response.commitTimestamp
).toJSON()}`
);
}
// Mutation groups that fail to commit trigger a response with a non-zero status code.
else {
console.log(
alkatrivedi marked this conversation as resolved.
Show resolved Hide resolved
`Mutation group indexes ${response.indexes}, could not be applied with error code ${response.status.code}, and error message ${response.status.message}`
);
}
})
.on('end', () => {
console.log('Request completed successfully');
});
} catch (err) {
console.log(err);
}
// [END spanner_batch_write_at_least_once]
}

process.on('unhandledRejection', err => {
console.error(err.message);
process.exitCode = 1;
});

main(...process.argv.slice(2));
22 changes: 22 additions & 0 deletions samples/system-test/spanner.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const requestTagCommand = 'node request-tag.js';
const timestampCmd = 'node timestamp.js';
const structCmd = 'node struct.js';
const dmlCmd = 'node dml.js';
const batchWriteCmd = 'node batch-write.js';
const datatypesCmd = 'node datatypes.js';
const backupsCmd = 'node backups.js';
const instanceCmd = 'node instance.js';
Expand Down Expand Up @@ -967,6 +968,27 @@ describe('Autogenerated Admin Clients', () => {
assert.match(output, new RegExp('Virginia Watson'));
});

// batch_write
it('should perform CRUD operations using batch write', async () => {
const output = execSync(
`${batchWriteCmd} ${INSTANCE_ID} ${DATABASE_ID} ${PROJECT_ID}`
).toString();

const successRegex =
/Mutation group indexes [\d,]+ have been applied with commit timestamp \d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z/;
const failureRegex =
/Mutation group indexes [\d,]+, could not be applied with error code \d+, and error message .+/;

const successMatch = successRegex.test(output);
const errorMatch = failureRegex.test(output);

if (successMatch || errorMatch) {
assert.include(output, 'Request completed successfully');
} else {
assert.ifError(output);
}
});

// create_table_with_datatypes
it('should create Venues example table with supported datatype columns', async () => {
const output = execSync(
Expand Down
121 changes: 121 additions & 0 deletions src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@
} from './session-pool';
import {CreateTableCallback, CreateTableResponse, Table} from './table';
import {
BatchWriteOptions,
ExecuteSqlRequest,
MutationGroup,
RunCallback,
RunResponse,
RunUpdateCallback,
Expand Down Expand Up @@ -1531,7 +1533,7 @@
): void;
async getDatabaseDialect(
optionsOrCallback?: CallOptions | GetDatabaseDialectCallback,
cb?: GetDatabaseDialectCallback

Check warning on line 1536 in src/database.ts

View workflow job for this annotation

GitHub Actions / lint

'cb' is defined but never used
): Promise<
| EnumKey<typeof databaseAdmin.spanner.admin.database.v1.DatabaseDialect>
| undefined
Expand Down Expand Up @@ -3210,6 +3212,123 @@
}
}
}

/**
* Write a batch of mutations to Spanner.
*
* All mutations in a group are committed atomically. However, mutations across
* groups can be committed non-atomically in an unspecified order and thus, they
* must be independent of each other. Partial failure is possible, i.e., some groups
* may have been committed successfully, while some may have failed. The results of
* individual batches are streamed into the response as the batches are applied.
*
* batchWriteAtLeastOnce requests are not replay protected, meaning that each mutation group may
* be applied more than once. Replays of non-idempotent mutations may have undesirable
* effects. For example, replays of an insert mutation may produce an already exists
* error or if you use generated or commit timestamp-based keys, it may result in additional
* rows being added to the mutation's table. We recommend structuring your mutation groups to
* be idempotent to avoid this issue.
*
* @method Spanner#batchWriteAtLeastOnce
*
* @param {MutationGroup[]} [mutationGroups] The group of mutations to be applied.
* @param {BatchWriteOptions} [options] Options object for batch write request.
*
* @returns {ReadableStream} An object stream which emits
* {@link protos.google.spanner.v1.BatchWriteResponse|BatchWriteResponse}
* on 'data' event.
*
* @example
* ```
* const {Spanner} = require('@google-cloud/spanner');
* const spanner = new Spanner();
*
* const instance = spanner.instance('my-instance');
* const database = instance.database('my-database');
* const mutationGroup = new MutationGroup();
* mutationGroup.insert('Singers', {
* SingerId: '1',
* FirstName: 'Marc',
* LastName: 'Richards',
* });
*
* database.batchWriteAtLeastOnce([mutationGroup])
* .on('error', console.error)
* .on('data', response => {
* console.log('response: ', response);
* })
* .on('end', () => {
* console.log('Request completed successfully');
olavloite marked this conversation as resolved.
Show resolved Hide resolved
* });
*
* //-
* // If you anticipate many results, you can end a stream early to prevent
* // unnecessary processing and API requests.
* //-
* database.batchWriteAtLeastOnce()
* .on('data', response => {
* this.end();
* });
* ```
*/
batchWriteAtLeastOnce(
mutationGroups: MutationGroup[],
options?: BatchWriteOptions
): NodeJS.ReadableStream {
const proxyStream: Transform = through.obj();

this.pool_.getSession((err, session) => {
if (err) {
proxyStream.destroy(err);
return;
}
const gaxOpts = extend(true, {}, options?.gaxOptions);
const reqOpts = Object.assign(
{} as spannerClient.spanner.v1.BatchWriteRequest,
{
session: session!.formattedName_!,
mutationGroups: mutationGroups.map(mg => mg.proto()),
requestOptions: options?.requestOptions,
}
);
let dataReceived = false;
let dataStream = this.requestStream({
client: 'SpannerClient',
method: 'batchWrite',
reqOpts,
gaxOpts,
headers: this.resourceHeader_,
});
dataStream
.once('data', () => (dataReceived = true))
.once('error', err => {
if (
!dataReceived &&
isSessionNotFoundError(err as grpc.ServiceError)
) {
// If there's a 'Session not found' error and we have not yet received
// any data, we can safely retry the writes on a new session.
// Register the error on the session so the pool can discard it.
if (session) {
session.lastError = err as grpc.ServiceError;
}
// Remove the current data stream from the end user stream.
dataStream.unpipe(proxyStream);
dataStream.end();
// Create a new stream and add it to the end user stream.
dataStream = this.batchWriteAtLeastOnce(mutationGroups, options);
dataStream.pipe(proxyStream);
} else {
proxyStream.destroy(err);
}
})
.once('end', () => this.pool_.release(session!))
.pipe(proxyStream);
});

return proxyStream as NodeJS.ReadableStream;
}

/**
* Create a Session object.
*
Expand Down Expand Up @@ -3515,6 +3634,7 @@
promisifyAll(Database, {
exclude: [
'batchTransaction',
'batchWriteAtLeastOnce',
'getRestoreInfo',
'getState',
'getDatabaseDialect',
Expand All @@ -3536,6 +3656,7 @@
'create',
'batchCreateSessions',
'batchTransaction',
'batchWriteAtLeastOnce',
'close',
'createBatchTransaction',
'createSession',
Expand Down
16 changes: 15 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,12 @@ import {
import {Session} from './session';
import {SessionPool} from './session-pool';
import {Table} from './table';
import {PartitionedDml, Snapshot, Transaction} from './transaction';
import {
MutationGroup,
PartitionedDml,
Snapshot,
Transaction,
} from './transaction';
import grpcGcpModule = require('grpc-gcp');
const grpcGcp = grpcGcpModule(grpc);
import * as v1 from './v1';
Expand Down Expand Up @@ -2011,6 +2016,15 @@ export {Snapshot};
*/
export {Transaction};

/**
* {@link MutationGroup} class.
*
* @name Spanner.MutationGroup
* @see MutationGroup
* @type {Constructor}
*/
export {MutationGroup};

/**
* @type {object}
* @property {constructor} DatabaseAdminClient
Expand Down
Loading
Loading