Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spanner): add support for change streams transaction exclusion option #2049

Merged
merged 37 commits into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
af733bc
add launch file
alkatrivedi May 13, 2024
97d43ff
feat: change stream option in transaction
alkatrivedi May 13, 2024
23496fd
feat: change stream option in transaction
alkatrivedi May 13, 2024
1f8c2d5
add test for runtransaction with excludeTxnFromChangeStreams option
alkatrivedi May 13, 2024
62cc8e0
fix: lint errors
alkatrivedi May 13, 2024
9bc85be
Merge branch 'googleapis:main' into change-stream
alkatrivedi May 15, 2024
fe0f1d1
Merge branch 'googleapis:main' into change-stream
alkatrivedi May 17, 2024
86bb1be
feat(spanner): add changes in the runPartitionedUpdate and insert met…
alkatrivedi May 20, 2024
cf3bf9f
fix: failed unit test in database file
alkatrivedi May 21, 2024
29a3729
fix: lint errors
alkatrivedi May 21, 2024
a0e1b2e
delete insertion sample
alkatrivedi May 21, 2024
09e2d3b
fix: lint errors
alkatrivedi May 21, 2024
85132f2
fix: lint changes in singer.js file
alkatrivedi May 21, 2024
55a23e7
refactor test file
alkatrivedi May 21, 2024
339f341
fix: lint errors
alkatrivedi May 21, 2024
7d77b34
fix: lint errors
alkatrivedi May 21, 2024
aaeed9d
refactor
alkatrivedi May 21, 2024
c60d029
refactor
alkatrivedi May 21, 2024
614be94
refactor
alkatrivedi May 21, 2024
aebb3a1
refactor
alkatrivedi May 21, 2024
dbed5f3
refactor
alkatrivedi May 21, 2024
ac842a9
refactor: remove extra lines
alkatrivedi May 22, 2024
5a96547
feat: add excludeTransactionOption for runPrtitionedUpdate
alkatrivedi May 22, 2024
03e5be0
docs: for excludeTxnFromChangeStreams function
alkatrivedi May 22, 2024
1567c32
Merge branch 'googleapis:main' into change-stream
alkatrivedi May 22, 2024
9490926
refactor test
alkatrivedi May 22, 2024
7cbef54
refactor test
alkatrivedi May 23, 2024
cd8f711
refactor imports
alkatrivedi May 23, 2024
b6c5001
refactor: test file and _mutate method
alkatrivedi May 23, 2024
4a0d234
refactor: test file and _mutate method
alkatrivedi May 23, 2024
dc7cfaf
refactor: test file and _mutate method
alkatrivedi May 23, 2024
4c38b9c
refactor test
alkatrivedi May 23, 2024
0d1e239
refactor test
alkatrivedi May 23, 2024
ece6d16
fix: test errors
alkatrivedi May 23, 2024
31e2763
add pdml unit tests
alkatrivedi May 23, 2024
5bf2102
add test for mutation
alkatrivedi May 23, 2024
3c45d98
add test for mutation
alkatrivedi May 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 44 additions & 21 deletions src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import {
ExecuteSqlRequest,
RunCallback,
runPartitionedUpdateOptions,
alkatrivedi marked this conversation as resolved.
Show resolved Hide resolved
RunResponse,
RunUpdateCallback,
Snapshot,
Expand Down Expand Up @@ -1527,7 +1528,7 @@
): void;
async getDatabaseDialect(
optionsOrCallback?: CallOptions | GetDatabaseDialectCallback,
cb?: GetDatabaseDialectCallback

Check warning on line 1531 in src/database.ts

View workflow job for this annotation

GitHub Actions / lint

'cb' is defined but never used
): Promise<
| EnumKey<typeof databaseAdmin.spanner.admin.database.v1.DatabaseDialect>
| undefined
Expand Down Expand Up @@ -2092,6 +2093,9 @@
if (options.optimisticLock) {
transaction!.useOptimisticLock();
}
if (options.excludeTxnFromChangeStreams) {
transaction!.excludeTxnFromChangeStreams();
}
if (!err) {
this._releaseOnEnd(session!, transaction!);
}
Expand Down Expand Up @@ -2711,13 +2715,15 @@
* @param {RunUpdateCallback} [callback] Callback function.
* @returns {Promise<RunUpdateResponse>}
*/
runPartitionedUpdate(query: string | ExecuteSqlRequest): Promise<[number]>;
runPartitionedUpdate(
query: string | ExecuteSqlRequest,
query: string | runPartitionedUpdateOptions
): Promise<[number]>;
runPartitionedUpdate(
query: string | runPartitionedUpdateOptions,
callback?: RunUpdateCallback
): void;
runPartitionedUpdate(
query: string | ExecuteSqlRequest,
query: string | runPartitionedUpdateOptions,
callback?: RunUpdateCallback
): void | Promise<[number]> {
this.pool_.getSession((err, session) => {
Expand All @@ -2732,33 +2738,44 @@

_runPartitionedUpdate(
session: Session,
query: string | ExecuteSqlRequest,
query: string | runPartitionedUpdateOptions,
callback?: RunUpdateCallback
): void | Promise<number> {
const transaction = session.partitionedDml();

transaction.begin(err => {
if (err) {
this.pool_.release(session!);
callback!(err, 0);
return;
}
if (typeof query === 'string') {
query = {sql: query} as runPartitionedUpdateOptions;
}

query = Object.assign({}, query) as runPartitionedUpdateOptions;

transaction.runUpdate(query, (err, updateCount) => {
transaction.begin(
{
alkatrivedi marked this conversation as resolved.
Show resolved Hide resolved
excludeTxnFromChangeStreams: query.excludeTxnFromChangeStreams,
},
err => {
if (err) {
if (err.code !== grpc.status.ABORTED) {
this.pool_.release(session!);
callback!(err, 0);
return;
}
this._runPartitionedUpdate(session, query, callback);
} else {
this.pool_.release(session!);
callback!(null, updateCount);
callback!(err, 0);
return;
}
});
});

transaction.runUpdate(query, (err, updateCount) => {
if (err) {
if (err.code !== grpc.status.ABORTED) {
this.pool_.release(session!);
callback!(err, 0);
return;
}
this._runPartitionedUpdate(session, query, callback);
} else {
this.pool_.release(session!);
callback!(null, updateCount);
return;
}
});
}
);
}

/**
Expand Down Expand Up @@ -3059,6 +3076,9 @@
if (options.optimisticLock) {
transaction!.useOptimisticLock();
}
if (options.excludeTxnFromChangeStreams) {
transaction!.excludeTxnFromChangeStreams();
}

const release = this.pool_.release.bind(this.pool_, session!);
const runner = new TransactionRunner(
Expand Down Expand Up @@ -3173,6 +3193,9 @@
if (options.optimisticLock) {
transaction.useOptimisticLock();
}
if (options.excludeTxnFromChangeStreams) {
transaction.excludeTxnFromChangeStreams();
}
const runner = new AsyncTransactionRunner<T>(
session,
transaction,
Expand Down
9 changes: 9 additions & 0 deletions src/table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1079,6 +1079,15 @@ class Table {
return;
}

const excludeTxnFromChangeStreams =
'excludeTxnFromChangeStreams' in options
? options.excludeTxnFromChangeStreams
: false;

if (excludeTxnFromChangeStreams) {
transaction!.excludeTxnFromChangeStreams();
}
alkatrivedi marked this conversation as resolved.
Show resolved Hide resolved

transaction![method](this.name, rows as Key[]);
transaction!.commit(options, callback);
});
Expand Down
4 changes: 4 additions & 0 deletions src/transaction-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ export interface RunTransactionOptions {
timeout?: number;
requestOptions?: Pick<IRequestOptions, 'transactionTag'>;
optimisticLock?: boolean;
excludeTxnFromChangeStreams?: boolean;
}

/**
Expand Down Expand Up @@ -204,6 +205,9 @@ export abstract class Runner<T> {
if (this.options.optimisticLock) {
transaction.useOptimisticLock();
}
if (this.options.excludeTxnFromChangeStreams) {
transaction.excludeTxnFromChangeStreams();
}
if (this.attempts > 0) {
await transaction.begin();
}
Expand Down
37 changes: 33 additions & 4 deletions src/transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,16 @@ export interface RequestOptions {
columnsMetadata?: object;
}

export interface BeginTransactionOptions extends CallOptions {
excludeTxnFromChangeStreams?: boolean;
}

export interface CommitOptions {
requestOptions?: Pick<IRequestOptions, 'priority'>;
returnCommitStats?: boolean;
maxCommitDelay?: spannerClient.protobuf.IDuration;
gaxOptions?: CallOptions;
excludeTxnFromChangeStreams?: true;
}

export interface Statement {
Expand All @@ -127,6 +132,10 @@ export interface ExecuteSqlRequest extends Statement, RequestOptions {
directedReadOptions?: google.spanner.v1.IDirectedReadOptions;
}

export interface runPartitionedUpdateOptions extends ExecuteSqlRequest {
excludeTxnFromChangeStreams?: boolean;
}

export interface KeyRange {
startClosed?: Value[];
startOpen?: Value[];
Expand Down Expand Up @@ -388,11 +397,14 @@ export class Snapshot extends EventEmitter {
* });
* ```
*/
begin(gaxOptions?: CallOptions): Promise<BeginResponse>;
begin(gaxOptions?: BeginTransactionOptions): Promise<BeginResponse>;
alkatrivedi marked this conversation as resolved.
Show resolved Hide resolved
begin(callback: BeginTransactionCallback): void;
begin(gaxOptions: CallOptions, callback: BeginTransactionCallback): void;
begin(
gaxOptionsOrCallback?: CallOptions | BeginTransactionCallback,
gaxOptions: BeginTransactionOptions,
callback: BeginTransactionCallback
): void;
begin(
gaxOptionsOrCallback?: BeginTransactionOptions | BeginTransactionCallback,
cb?: BeginTransactionCallback
): void | Promise<BeginResponse> {
const gaxOpts =
Expand All @@ -402,6 +414,19 @@ export class Snapshot extends EventEmitter {

const session = this.session.formattedName_!;
const options = this._options;

const excludeTxnFromChangeStreams =
'excludeTxnFromChangeStreams' in gaxOpts
? gaxOpts.excludeTxnFromChangeStreams
: false;

if (
excludeTxnFromChangeStreams ||
this._options.excludeTxnFromChangeStreams === true
) {
options.excludeTxnFromChangeStreams = true;
}

const reqOpts: spannerClient.spanner.v1.IBeginTransactionRequest = {
session,
options,
Expand Down Expand Up @@ -1935,7 +1960,7 @@ export class Transaction extends Dml {
} else if (!this._useInRunner) {
reqOpts.singleUseTransaction = this._options;
} else {
this.begin().then(() => this.commit(options, callback), callback);
this.begin(options).then(() => this.commit(options, callback), callback);
return;
}

Expand Down Expand Up @@ -2470,6 +2495,10 @@ export class Transaction extends Dml {
useOptimisticLock(): void {
this._options.readWrite!.readLockMode = ReadLockMode.OPTIMISTIC;
}

excludeTxnFromChangeStreams(): void {
this._options.excludeTxnFromChangeStreams = true;
}
}

/*! Developer Documentation
Expand Down
30 changes: 26 additions & 4 deletions test/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2485,7 +2485,7 @@ describe('Database', () => {

beginStub = (
sandbox.stub(fakePartitionedDml, 'begin') as sinon.SinonStub
).callsFake(callback => callback(null));
).callsFake((object, callback) => callback(null));

runUpdateStub = (
sandbox.stub(fakePartitionedDml, 'runUpdate') as sinon.SinonStub
Expand Down Expand Up @@ -2523,7 +2523,7 @@ describe('Database', () => {
it('should return any begin errors', done => {
const fakeError = new Error('err');

beginStub.callsFake(callback => callback(fakeError));
beginStub.callsFake((object, callback) => callback(fakeError));

const releaseStub = (
sandbox.stub(fakePool, 'release') as sinon.SinonStub
Expand All @@ -2543,8 +2543,8 @@ describe('Database', () => {
database.runPartitionedUpdate(QUERY, fakeCallback);

const [query] = runUpdateStub.lastCall.args;

assert.strictEqual(query, QUERY);
assert.strictEqual(query.sql, QUERY.sql);
assert.deepStrictEqual(query.params, QUERY.params);
assert.ok(fakeCallback.calledOnce);
});

Expand Down Expand Up @@ -2607,6 +2607,28 @@ describe('Database', () => {
});
assert.ok(fakeCallback.calledOnce);
});

it('should ignore excludeTxnFromChangeStreams set for client', () => {
const fakeCallback = sandbox.spy();

database.parent.parent = {
excludeTxnFromChangeStream: true,
};

database.runPartitionedUpdate(
{
excludeTxnFromChangeStream: true,
},
fakeCallback
);

const [query] = runUpdateStub.lastCall.args;

assert.deepStrictEqual(query, {
excludeTxnFromChangeStream: true,
});
assert.ok(fakeCallback.calledOnce);
});
});

describe('runTransaction', () => {
Expand Down
Loading
Loading