Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Leader aware routing #1783

Merged
merged 16 commits into from
Apr 26, 2023
Merged
7 changes: 3 additions & 4 deletions src/batch-transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@ import * as extend from 'extend';
import * as is from 'is';
import {Snapshot} from './transaction';
import {google} from '../protos/protos';
import {Session, Database} from '.';
import {Session, Database, Spanner} from '.';
surbhigarg92 marked this conversation as resolved.
Show resolved Hide resolved
import {
CLOUD_RESOURCE_HEADER,
addLeaderAwareRoutingHeader,
} from '../src/common';
import {Spanner} from '.';

export interface TransactionIdentifier {
session: string | Session;
Expand Down Expand Up @@ -138,7 +137,7 @@ class BatchTransaction extends Snapshot {
delete reqOpts.types;

const headers: {[k: string]: string} = {};
if ((this.session.parent.parent.parent as Spanner).routeToLeaderEnabled) {
if (this._getSpanner().routeToLeaderEnabled) {
addLeaderAwareRoutingHeader(headers);
}

Expand Down Expand Up @@ -236,7 +235,7 @@ class BatchTransaction extends Snapshot {
delete reqOpts.ranges;

const headers: {[k: string]: string} = {};
if ((this.session.parent.parent.parent as Spanner).routeToLeaderEnabled) {
if (this._getSpanner().routeToLeaderEnabled) {
addLeaderAwareRoutingHeader(headers);
}

Expand Down
18 changes: 14 additions & 4 deletions src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,9 @@ import {
Schema,
addLeaderAwareRoutingHeader,
} from './common';
import {Spanner} from '.';
import {Duplex, Readable, Transform} from 'stream';
import {PreciseDate} from '@google-cloud/precise-date';
import {EnumKey, RequestConfig, TranslateEnumKeys} from '.';
import {EnumKey, RequestConfig, TranslateEnumKeys, Spanner} from '.';
import arrify = require('arrify');
import {ServiceError} from 'google-gax';
import IPolicy = google.iam.v1.IPolicy;
Expand Down Expand Up @@ -526,7 +525,7 @@ class Database extends common.GrpcServiceObject {
};

const headers = this.resourceHeader_;
if ((this.instance.parent as Spanner).routeToLeaderEnabled) {
if (this._getSpanner().routeToLeaderEnabled) {
addLeaderAwareRoutingHeader(headers);
}

Expand Down Expand Up @@ -799,7 +798,7 @@ class Database extends common.GrpcServiceObject {
options.databaseRole || this.databaseRole || null;

const headers = this.resourceHeader_;
if ((this.instance.parent as Spanner).routeToLeaderEnabled) {
if (this._getSpanner().routeToLeaderEnabled) {
addLeaderAwareRoutingHeader(headers);
}

Expand Down Expand Up @@ -3273,6 +3272,17 @@ class Database extends common.GrpcServiceObject {
const databaseName = name.split('/').pop();
return instanceName + '/databases/' + databaseName;
}

/**
* Gets the Spanner object
*
* @private
*
* @returns {Spanner}
*/
private _getSpanner(): Spanner {
return this.instance.parent as Spanner;
}
}

/*! Developer Documentation
Expand Down
13 changes: 12 additions & 1 deletion src/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ export class Session extends common.GrpcServiceObject {
};

surbhigarg92 marked this conversation as resolved.
Show resolved Hide resolved
const headers = this.resourceHeader_;
if ((this.parent.parent.parent as Spanner).routeToLeaderEnabled) {
if (this._getSpanner().routeToLeaderEnabled) {
addLeaderAwareRoutingHeader(headers);
}
return this.request(
Expand Down Expand Up @@ -522,6 +522,17 @@ export class Session extends common.GrpcServiceObject {
const sessionName = name.split('/').pop();
return databaseName + '/sessions/' + sessionName;
}

/**
* Gets the Spanner object
*
* @private
*
* @returns {Spanner}
*/
private _getSpanner(): Spanner {
return this.parent.parent.parent as Spanner;
}
}

/*! Developer Documentation
Expand Down
26 changes: 18 additions & 8 deletions src/transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,8 @@ import {google} from '../protos/protos';
import IAny = google.protobuf.IAny;
import IQueryOptions = google.spanner.v1.ExecuteSqlRequest.IQueryOptions;
import IRequestOptions = google.spanner.v1.IRequestOptions;
import {Database} from '.';
import {Database, Spanner} from '.';
import ReadLockMode = google.spanner.v1.TransactionOptions.ReadWrite.ReadLockMode;
import {Spanner} from '.';

export type Rows = Array<Row | Json>;
const RETRY_INFO_TYPE = 'type.googleapis.com/google.rpc.retryinfo';
Expand Down Expand Up @@ -377,7 +376,7 @@ export class Snapshot extends EventEmitter {

const headers = this.resourceHeader_;
if (
(this.session.parent.parent.parent as Spanner).routeToLeaderEnabled &&
this._getSpanner().routeToLeaderEnabled &&
(this._options.readWrite !== undefined ||
this._options.partitionedDml !== undefined)
) {
Expand Down Expand Up @@ -618,7 +617,7 @@ export class Snapshot extends EventEmitter {

const headers = this.resourceHeader_;
if (
(this.session.parent.parent.parent as Spanner).routeToLeaderEnabled &&
this._getSpanner().routeToLeaderEnabled &&
(this._options.readWrite !== undefined ||
surbhigarg92 marked this conversation as resolved.
Show resolved Hide resolved
this._options.partitionedDml !== undefined)
) {
Expand Down Expand Up @@ -1102,7 +1101,7 @@ export class Snapshot extends EventEmitter {

const headers = this.resourceHeader_;
if (
(this.session.parent.parent.parent as Spanner).routeToLeaderEnabled &&
this._getSpanner().routeToLeaderEnabled &&
(this._options.readWrite !== undefined ||
this._options.partitionedDml !== undefined)
) {
Expand Down Expand Up @@ -1332,6 +1331,17 @@ export class Snapshot extends EventEmitter {
.once('end', () => this._idWaiter.emit('end'))
);
}

/**
* Gets the Spanner object
*
* @private
*
* @returns {Spanner}
*/
protected _getSpanner(): Spanner {
return this.session.parent.parent.parent as Spanner;
}
}

/*! Developer Documentation
Expand Down Expand Up @@ -1653,7 +1663,7 @@ export class Transaction extends Dml {
} as spannerClient.spanner.v1.ExecuteBatchDmlRequest;

const headers = this.resourceHeader_;
if ((this.session.parent.parent.parent as Spanner).routeToLeaderEnabled) {
if (this._getSpanner().routeToLeaderEnabled) {
addLeaderAwareRoutingHeader(headers);
}

Expand Down Expand Up @@ -1827,7 +1837,7 @@ export class Transaction extends Dml {
);

const headers = this.resourceHeader_;
if ((this.session.parent.parent.parent as Spanner).routeToLeaderEnabled) {
if (this._getSpanner().routeToLeaderEnabled) {
addLeaderAwareRoutingHeader(headers);
}

Expand Down Expand Up @@ -2167,7 +2177,7 @@ export class Transaction extends Dml {
};

const headers = this.resourceHeader_;
if ((this.session.parent.parent.parent as Spanner).routeToLeaderEnabled) {
if (this._getSpanner().routeToLeaderEnabled) {
addLeaderAwareRoutingHeader(headers);
}

Expand Down
7 changes: 6 additions & 1 deletion test/batch-transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import * as extend from 'extend';
import * as proxyquire from 'proxyquire';
import * as sinon from 'sinon';

import {Session, Database} from '../src';
import {Session, Database, Spanner} from '../src';
import * as bt from '../src/batch-transaction';
import {PartialResultStream} from '../src/partial-result-stream';
import {
Expand Down Expand Up @@ -86,6 +86,11 @@ class FakeTransaction {
static encodeParams(): object {
return {};
}

_getSpanner(): Spanner {
return SPANNER as Spanner;
}

run() {}
read() {}
}
Expand Down
9 changes: 6 additions & 3 deletions test/spanner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1492,13 +1492,13 @@ describe('Spanner with mock server', () => {

describe('LeaderAwareRouting', () => {
let spannerWithLARDisabled: Spanner;
let instanceWithEnvVar: Instance;
let instanceWithLARDisabled: Instance;

function newTestDatabaseWithLARDisabled(
options?: SessionPoolOptions,
queryOptions?: IQueryOptions
): Database {
return instanceWithEnvVar.database(
return instanceWithLARDisabled.database(
`database-${dbCounter++}`,
options,
queryOptions
Expand All @@ -1513,7 +1513,7 @@ describe('Spanner with mock server', () => {
routeToLeaderEnabled: false,
});
// Gets a reference to a Cloud Spanner instance and database
instanceWithEnvVar = spannerWithLARDisabled.instance('instance');
instanceWithLARDisabled = spannerWithLARDisabled.instance('instance');
});

it('should execute with leader aware routing enabled in a read/write transaction', async () => {
Expand All @@ -1525,14 +1525,17 @@ describe('Spanner with mock server', () => {
return await tx.commit();
});
await database.close();
let metadataCountWithLAREnabled = 0;
spannerMock.getMetadata().forEach(metadata => {
if (metadata.get(LEADER_AWARE_ROUTING_HEADER)[0] !== undefined) {
metadataCountWithLAREnabled++;
assert.strictEqual(
metadata.get(LEADER_AWARE_ROUTING_HEADER)[0],
surbhigarg92 marked this conversation as resolved.
Show resolved Hide resolved
'true'
);
}
});
assert.notStrictEqual(metadataCountWithLAREnabled, 0);
});

it('should execute with leader aware routing disabled in a read/write transaction', async () => {
Expand Down