diff --git a/benchmark/spanner.ts b/benchmark/spanner.ts index fdbfcbdc9..939e670ee 100644 --- a/benchmark/spanner.ts +++ b/benchmark/spanner.ts @@ -298,7 +298,6 @@ async function burstRead() { const database = newTestDatabase({ min: 100, max: 400, - writes: 0.2, incStep: incStep, }); const pool = database.pool_ as SessionPool; @@ -335,7 +334,6 @@ async function burstWrite() { const database = newTestDatabase({ min: 100, max: 400, - writes: 0.2, incStep: incStep, }); const pool = database.pool_ as SessionPool; @@ -352,7 +350,6 @@ async function burstWrite() { if (incStep) { console.timeEnd(`burstWrite incStep ${incStep}`); console.log(`Current session pool size: ${pool.size}`); - console.log(`Current num write sessions: ${pool.writes}`); } } finally { await database.close(); @@ -374,7 +371,6 @@ async function burstReadAndWrite() { const database = newTestDatabase({ min: 100, max: 400, - writes: 0.2, incStep: incStep, }); const pool = database.pool_ as SessionPool; @@ -397,7 +393,6 @@ async function burstReadAndWrite() { if (incStep) { console.timeEnd(`burstReadAndWrite incStep ${incStep}`); console.log(`Current session pool size: ${pool.size}`); - console.log(`Current num write sessions: ${pool.writes}`); } } finally { await database.close(); @@ -416,7 +411,6 @@ async function multipleWriteBursts() { const database = newTestDatabase({ min: 100, max: 400, - writes: 0.2, incStep: incStep, }); const pool = database.pool_ as SessionPool; @@ -436,7 +430,6 @@ async function multipleWriteBursts() { if (incStep) { console.timeEnd(`multipleWriteBursts incStep ${incStep}`); console.log(`Current session pool size: ${pool.size}`); - console.log(`Current num write sessions: ${pool.writes}`); } } finally { await database.close(); @@ -449,46 +442,39 @@ async function oneReadTransactionPerSecond() { const RND_WAIT_TIME_BETWEEN_REQUESTS = 100000; const NUM_TRANSACTIONS = RND_WAIT_TIME_BETWEEN_REQUESTS / 1000; for (const minSessions of [0, 25]) { - for (const writeFraction of [0, 0.2]) { - const database = newTestDatabase({ - min: minSessions, - writes: writeFraction, - }); - const pool = database.pool_ as SessionPool; - try { - // Execute a batch of write requests to initialize the session pool with only - // write sessions. The dynamic scaling of the session pool should automatically - // change this into an appropriate number of read sessions as the test runs. - await queueWriteOperations(database, pool.options.incStep!, 0); - const readPromises = queueReadOperations( - database, - NUM_TRANSACTIONS, - RND_WAIT_TIME_BETWEEN_REQUESTS, - 0 - ); - readPromises.forEach(p => - p.then(t => { - console.log(`Time taken: ${t}ms`); - }) - ); - const t = await Promise.all(readPromises); - const max = Math.max(...t); - const min = Math.min(...t); - const sum = t.reduce((a, b) => a + b, 0); - const avg = sum / t.length || 0; - const p90 = percentile(t, 0.9); - console.log( - `oneReadTransactionPerSecond, min: ${minSessions}, write: ${writeFraction}` - ); - console.log(`Max: ${max}`); - console.log(`Min: ${min}`); - console.log(`Avg: ${avg}`); - console.log(`P90: ${p90}`); - console.log(`Current session pool size: ${pool.size}`); - console.log(`Current num write sessions: ${pool.writes}`); - } finally { - await database.close(); - } + const database = newTestDatabase({ + min: minSessions, + }); + const pool = database.pool_ as SessionPool; + try { + // Execute a batch of write requests to initialize the session pool with only + // write sessions. The dynamic scaling of the session pool should automatically + // change this into an appropriate number of read sessions as the test runs. + await queueWriteOperations(database, pool.options.incStep!, 0); + const readPromises = queueReadOperations( + database, + NUM_TRANSACTIONS, + RND_WAIT_TIME_BETWEEN_REQUESTS, + 0 + ); + readPromises.forEach(p => + p.then(t => { + console.log(`Time taken: ${t}ms`); + }) + ); + const t = await Promise.all(readPromises); + const max = Math.max(...t); + const min = Math.min(...t); + const sum = t.reduce((a, b) => a + b, 0); + const avg = sum / t.length || 0; + const p90 = percentile(t, 0.9); + console.log(`Max: ${max}`); + console.log(`Min: ${min}`); + console.log(`Avg: ${avg}`); + console.log(`P90: ${p90}`); + console.log(`Current session pool size: ${pool.size}`); + } finally { + await database.close(); } } } @@ -498,43 +484,36 @@ async function oneWriteTransactionPerSecond() { const RND_WAIT_TIME_BETWEEN_REQUESTS = 100000; const NUM_TRANSACTIONS = RND_WAIT_TIME_BETWEEN_REQUESTS / 1000; for (const minSessions of [0, 25]) { - for (const writeFraction of [0, 0.2]) { - const database = newTestDatabase({ - min: minSessions, - writes: writeFraction, - }); - const pool = database.pool_ as SessionPool; - try { - // Execute one read request to initialize the session pool. - await queueReadOperations(database, 1, 0, 0); - const writePromises = queueWriteOperations( - database, - NUM_TRANSACTIONS, - RND_WAIT_TIME_BETWEEN_REQUESTS - ); - writePromises.forEach(p => - p.then(t => { - console.log(`Time taken: ${t}ms`); - }) - ); - const t = await Promise.all(writePromises); - const max = Math.max(...t); - const min = Math.min(...t); - const sum = t.reduce((a, b) => a + b, 0); - const avg = sum / t.length || 0; - const p90 = percentile(t, 0.9); - console.log( - `oneWriteTransactionPerSecond, min: ${minSessions}, write: ${writeFraction}` - ); - console.log(`Max: ${max}`); - console.log(`Min: ${min}`); - console.log(`Avg: ${avg}`); - console.log(`P90: ${p90}`); - console.log(`Current session pool size: ${pool.size}`); - console.log(`Current num write sessions: ${pool.writes}`); - } finally { - await database.close(); - } + const database = newTestDatabase({ + min: minSessions, + }); + const pool = database.pool_ as SessionPool; + try { + // Execute one read request to initialize the session pool. + await queueReadOperations(database, 1, 0, 0); + const writePromises = queueWriteOperations( + database, + NUM_TRANSACTIONS, + RND_WAIT_TIME_BETWEEN_REQUESTS + ); + writePromises.forEach(p => + p.then(t => { + console.log(`Time taken: ${t}ms`); + }) + ); + const t = await Promise.all(writePromises); + const max = Math.max(...t); + const min = Math.min(...t); + const sum = t.reduce((a, b) => a + b, 0); + const avg = sum / t.length || 0; + const p90 = percentile(t, 0.9); + console.log(`Max: ${max}`); + console.log(`Min: ${min}`); + console.log(`Avg: ${avg}`); + console.log(`P90: ${p90}`); + console.log(`Current session pool size: ${pool.size}`); + } finally { + await database.close(); } } } @@ -545,52 +524,45 @@ async function oneReadAndOneWriteTransactionPerSecond() { const NUM_READ_TRANSACTIONS = RND_WAIT_TIME_BETWEEN_REQUESTS / 1000; const NUM_WRITE_TRANSACTIONS = RND_WAIT_TIME_BETWEEN_REQUESTS / 1000; for (const minSessions of [0, 25]) { - for (const writeFraction of [0, 0.2]) { - const database = newTestDatabase({ - min: minSessions, - writes: writeFraction, - }); - const pool = database.pool_ as SessionPool; - try { - const readPromises = queueReadOperations( - database, - NUM_READ_TRANSACTIONS, - RND_WAIT_TIME_BETWEEN_REQUESTS, - 0 - ); - const writePromises = queueWriteOperations( - database, - NUM_WRITE_TRANSACTIONS, - RND_WAIT_TIME_BETWEEN_REQUESTS - ); - readPromises.forEach(p => - p.then(t => { - console.log(`Read tx: ${t}ms`); - }) - ); - writePromises.forEach(p => - p.then(t => { - console.log(`Write tx: ${t}ms`); - }) - ); - const t = await Promise.all(readPromises.concat(writePromises)); - const max = Math.max(...t); - const min = Math.min(...t); - const sum = t.reduce((a, b) => a + b, 0); - const avg = sum / t.length || 0; - const p90 = percentile(t, 0.9); - console.log( - `oneReadAndOneWriteTransactionPerSecond, min: ${minSessions}, write: ${writeFraction}` - ); - console.log(`Max: ${max}`); - console.log(`Min: ${min}`); - console.log(`Avg: ${avg}`); - console.log(`P90: ${p90}`); - console.log(`Current session pool size: ${pool.size}`); - console.log(`Current num write sessions: ${pool.writes}`); - } finally { - await database.close(); - } + const database = newTestDatabase({ + min: minSessions, + }); + const pool = database.pool_ as SessionPool; + try { + const readPromises = queueReadOperations( + database, + NUM_READ_TRANSACTIONS, + RND_WAIT_TIME_BETWEEN_REQUESTS, + 0 + ); + const writePromises = queueWriteOperations( + database, + NUM_WRITE_TRANSACTIONS, + RND_WAIT_TIME_BETWEEN_REQUESTS + ); + readPromises.forEach(p => + p.then(t => { + console.log(`Read tx: ${t}ms`); + }) + ); + writePromises.forEach(p => + p.then(t => { + console.log(`Write tx: ${t}ms`); + }) + ); + const t = await Promise.all(readPromises.concat(writePromises)); + const max = Math.max(...t); + const min = Math.min(...t); + const sum = t.reduce((a, b) => a + b, 0); + const avg = sum / t.length || 0; + const p90 = percentile(t, 0.9); + console.log(`Max: ${max}`); + console.log(`Min: ${min}`); + console.log(`Avg: ${avg}`); + console.log(`P90: ${p90}`); + console.log(`Current session pool size: ${pool.size}`); + } finally { + await database.close(); } } } @@ -605,7 +577,6 @@ async function steadyIncrease() { const database = newTestDatabase({ min: 100, max: 400, - writes: 0.2, incStep: incStep, }); const pool = database.pool_ as SessionPool; diff --git a/src/database.ts b/src/database.ts index 61c80187a..fd2bc0d41 100644 --- a/src/database.ts +++ b/src/database.ts @@ -640,9 +640,9 @@ class Database extends common.GrpcServiceObject { ? (optionsOrCallback as TimestampBounds) : {}; - this.pool_.getReadSession((err, session) => { + this.pool_.getSession((err, session) => { if (err) { - callback!(err, null, undefined); + callback!(err as ServiceError, null, undefined); return; } const transaction = this.batchTransaction({session: session!}, options); @@ -1685,9 +1685,9 @@ class Database extends common.GrpcServiceObject { ? (optionsOrCallback as TimestampBounds) : {}; - this.pool_.getReadSession((err, session) => { + this.pool_.getSession((err, session) => { if (err) { - callback!(err); + callback!(err as ServiceError); return; } @@ -1759,7 +1759,7 @@ class Database extends common.GrpcServiceObject { getTransaction( callback?: GetTransactionCallback ): void | Promise<[Transaction]> { - this.pool_.getWriteSession((err, session, transaction) => { + this.pool_.getSession((err, session, transaction) => { if (!err) { this._releaseOnEnd(session!, transaction!); } @@ -1872,9 +1872,9 @@ class Database extends common.GrpcServiceObject { callback?: PoolRequestCallback ): void | Promise { const pool = this.pool_; - pool.getReadSession((err, session) => { + pool.getSession((err, session) => { if (err) { - callback!(err, null); + callback!(err as ServiceError, null); return; } config.reqOpts.session = session!.formattedName_; @@ -1917,9 +1917,9 @@ class Database extends common.GrpcServiceObject { } } waitForSessionStream.on('reading', () => { - pool.getReadSession((err, session_) => { + pool.getSession((err, session_) => { if (err) { - destroyStream(err); + destroyStream(err as ServiceError); return; } session = session_!; @@ -2277,9 +2277,9 @@ class Database extends common.GrpcServiceObject { query: string | ExecuteSqlRequest, callback?: RunUpdateCallback ): void | Promise<[number]> { - this.pool_.getReadSession((err, session) => { + this.pool_.getSession((err, session) => { if (err) { - callback!(err, 0); + callback!(err as ServiceError, 0); return; } @@ -2449,7 +2449,7 @@ class Database extends common.GrpcServiceObject { ): PartialResultStream { const proxyStream: Transform = through.obj(); - this.pool_.getReadSession((err, session) => { + this.pool_.getSession((err, session) => { if (err) { proxyStream.destroy(err); return; @@ -2604,7 +2604,7 @@ class Database extends common.GrpcServiceObject { ? (optionsOrRunFn as RunTransactionOptions) : {}; - this.pool_.getWriteSession((err, session?, transaction?) => { + this.pool_.getSession((err, session?, transaction?) => { if (err && isSessionNotFoundError(err as grpc.ServiceError)) { this.runTransaction(options, runFn!); return; @@ -2713,13 +2713,13 @@ class Database extends common.GrpcServiceObject { ? (optionsOrRunFn as RunTransactionOptions) : {}; - const getWriteSession = this.pool_.getWriteSession.bind(this.pool_); + const getSession = this.pool_.getSession.bind(this.pool_); // Loop to retry 'Session not found' errors. // (and yes, we like while (true) more than for (;;) here) // eslint-disable-next-line no-constant-condition while (true) { try { - const [session, transaction] = await promisify(getWriteSession)(); + const [session, transaction] = await promisify(getSession)(); transaction.requestOptions = Object.assign( transaction.requestOptions || {}, options.requestOptions diff --git a/src/session-pool.ts b/src/session-pool.ts index 742e8b083..3744b11f7 100644 --- a/src/session-pool.ts +++ b/src/session-pool.ts @@ -19,7 +19,7 @@ import * as is from 'is'; import PQueue from 'p-queue'; import {Database} from './database'; -import {Session, types} from './session'; +import {Session} from './session'; import {Transaction} from './transaction'; import {NormalCallback} from './common'; import {GoogleError, grpc, ServiceError} from 'google-gax'; @@ -33,20 +33,25 @@ export interface SessionPoolCloseCallback { (error?: SessionLeakError): void; } -/** - * @callback GetReadSessionCallback - * @param {?Error} error Request error, if any. - * @param {Session} session The read-only session. - */ +/** @deprecated. Use GetSessionCallback instead. */ export type GetReadSessionCallback = NormalCallback; +/** @deprecated. Use GetSessionCallback instead. */ +export interface GetWriteSessionCallback { + ( + err: Error | null, + session?: Session | null, + transaction?: Transaction | null + ): void; +} + /** - * @callback GetWriteSessionCallback + * @callback GetSessionCallback * @param {?Error} error Request error, if any. * @param {Session} session The read-write session. * @param {Transaction} transaction The transaction object. */ -export interface GetWriteSessionCallback { +export interface GetSessionCallback { ( err: Error | null, session?: Session | null, @@ -81,14 +86,22 @@ export interface SessionPoolInterface extends EventEmitter { * @name SessionPoolInterface#open */ /** - * When called returns a read-only session. + * When called returns a session. + * + * @name SessionPoolInterface#getSession + * @param {GetSessionCallback} callback The callback function. + */ + /** + * When called returns a session. * + * @deprecated Use getSession instead. * @name SessionPoolInterface#getReadSession * @param {GetReadSessionCallback} callback The callback function. */ /** - * When called returns a read-write session with prepared transaction. + * When called returns a session. * + * @deprecated Use getSession instead. * @name SessionPoolInterface#getWriteSession * @param {GetWriteSessionCallback} callback The callback function. */ @@ -100,6 +113,7 @@ export interface SessionPoolInterface extends EventEmitter { */ close(callback: SessionPoolCloseCallback): void; open(): void; + getSession(callback: GetSessionCallback): void; getReadSession(callback: GetReadSessionCallback): void; getWriteSession(callback: GetWriteSessionCallback): void; release(session: Session): void; @@ -128,8 +142,7 @@ export interface SessionPoolInterface extends EventEmitter { * the pool at any given time. * @property {number} [min=0] Minimum number of resources to keep in the pool at * any given time. - * @property {number} [writes=0.0] Percentage of sessions to be pre-allocated as - * write sessions represented as a float. + * @property {number} [writes=0.0]. Deprecated. * @property {number} [incStep=25] The number of new sessions to create when at * least one more session is needed. */ @@ -143,6 +156,10 @@ export interface SessionPoolOptions { max?: number; maxIdle?: number; min?: number; + /** + * @deprecated. Starting from v6.5.0 the same session can be reused for + * different types of transactions. + */ writes?: number; incStep?: number; } @@ -157,7 +174,6 @@ const DEFAULTS: SessionPoolOptions = { max: 100, maxIdle: 1, min: 25, - writes: 0, incStep: 25, }; @@ -302,16 +318,11 @@ const enum errors { } interface SessionInventory { - [types.ReadOnly]: Session[]; - [types.ReadWrite]: Session[]; + sessions: Session[]; borrowed: Set; } -interface Waiters { - [types.ReadOnly]: number; - [types.ReadWrite]: number; -} - +/** @deprecated. */ export interface CreateSessionsOptions { writes?: number; reads?: number; @@ -335,9 +346,7 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface { _inventory: SessionInventory; _onClose!: Promise; _pending = 0; - _pendingPrepare = 0; - _waiters: Waiters; - _numInProcessPrepare = 0; + _waiters = 0; _pingHandle!: NodeJS.Timer; _requests: PQueue; _traces: Map; @@ -366,27 +375,13 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface { * @type {number} */ get available(): number { - const reads = this._inventory[types.ReadOnly]; - const writes = this._inventory[types.ReadWrite]; - - return reads.length + writes.length; + return this._inventory.sessions.length; } - - /** - * Current fraction of write-prepared sessions in the pool. - * @type {number} + /** @deprecated Starting from v6.5.0 the same session can be reused for + * different types of transactions. */ get currentWriteFraction(): number { - if (this.available + this.pendingPrepare === 0) { - // There are no sessions in the pool. Define the current write fraction as - // 0.5. That means that if the user has configured a write fraction >= 0.5 - // the first session to be created will be a write session, while it will - // otherwise be a read-only session. - return 0.5; - } - const writes = - this._inventory[types.ReadWrite].length + this.pendingPrepare; - return writes / (this.available + this.pendingPrepare); + return 0; } /** @@ -405,17 +400,9 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface { return this.size >= this.options.max!; } - /** - * Total number of read sessions. - * @type {number} - */ + /** @deprecated Use `size()` instead. */ get reads(): number { - const available = this._inventory[types.ReadOnly].length; - const borrowed = [...this._inventory.borrowed].filter( - session => session.type === types.ReadOnly - ).length; - - return available + borrowed; + return this.size; } /** @@ -426,28 +413,14 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface { return this.available + this.borrowed; } - /** - * Total number of write sessions. - * @type {number} - */ + /** @deprecated Use `size()` instead. */ get writes(): number { - const available = this._inventory[types.ReadWrite].length; - const borrowed = [...this._inventory.borrowed].filter( - session => session.type === types.ReadWrite - ).length; - - return available + borrowed; + return this.size; } - /** - * Number of sessions currently being prepared for a read/write transaction - * before being released into the pool. This number does not include the - * number of sessions being prepared for a read/write transaction that have - * already been checked out of the pool. - * @type {number} - */ + /** @deprecated Starting v6.5.0 the pending prepare state is obsolete. */ get pendingPrepare(): number { - return this._pendingPrepare; + return 0; } /** @@ -455,23 +428,17 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface { * @type {number} */ get totalPending(): number { - return this._pending + this._pendingPrepare; + return this._pending; } - /** - * Current number of waiters for a read-only session. - * @type {number} - */ + /** @deprecated Use totalWaiters instead. */ get numReadWaiters(): number { - return this._waiters[types.ReadOnly]; + return this.totalWaiters; } - /** - * Current number of waiters for a read/write session. - * @type {number} - */ + /** @deprecated Use totalWaiters instead. */ get numWriteWaiters(): number { - return this._waiters[types.ReadWrite]; + return this.totalWaiters; } /** @@ -479,7 +446,7 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface { * @type {number} */ get totalWaiters(): number { - return this.numReadWaiters + this.numWriteWaiters; + return this._waiters; } /** @@ -498,23 +465,11 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface { this.options = Object.assign({}, DEFAULTS, options); this.options.min = Math.min(this.options.min!, this.options.max!); - const {writes} = this.options; - - if (writes! < 0 || writes! > 1) { - throw new TypeError( - 'Write percentage should be represented as a float between 0.0 and 1.0.' - ); - } - this._inventory = { - [types.ReadOnly]: [], - [types.ReadWrite]: [], + sessions: [], borrowed: new Set(), }; - this._waiters = { - [types.ReadOnly]: 0, - [types.ReadWrite]: 0, - }; + this._waiters = 0; this._requests = new PQueue({ concurrency: this.options.concurrency!, }); @@ -534,8 +489,7 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface { */ close(callback: SessionPoolCloseCallback): void { const sessions: Session[] = [ - ...this._inventory[types.ReadOnly], - ...this._inventory[types.ReadWrite], + ...this._inventory.sessions, ...this._inventory.borrowed, ]; @@ -550,8 +504,7 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface { const leaks = this._getLeaks(); let error; - this._inventory[types.ReadOnly] = []; - this._inventory[types.ReadWrite] = []; + this._inventory.sessions = []; this._inventory.borrowed.clear(); if (leaks.length) { @@ -565,22 +518,32 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface { /** * Retrieve a read session. * + * @deprecated Use getSession instead. * @param {GetReadSessionCallback} callback The callback function. */ getReadSession(callback: GetReadSessionCallback): void { - this._acquire(types.ReadOnly).then( - session => callback(null, session), - callback + this.getSession((error, session) => + callback(error as ServiceError, session) ); } /** * Retrieve a read/write session. * + * @deprecated use getSession instead. * @param {GetWriteSessionCallback} callback The callback function. */ getWriteSession(callback: GetWriteSessionCallback): void { - this._acquire(types.ReadWrite).then( + this.getSession(callback); + } + + /** + * Retrieve a session. + * + * @param {GetSessionCallback} callback The callback function. + */ + getSession(callback: GetSessionCallback): void { + this._acquire().then( session => callback(null, session, session.txn!), callback ); @@ -617,14 +580,14 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface { } /** - * Releases session back into the pool. If the session is a write session it - * will also prepare a new transaction before releasing it. + * Releases session back into the pool. * * @throws {Error} For unknown sessions. * @emits SessionPool#available * @emits SessionPool#error - * @emits SessionPool#readonly-available - * @emits SessionPool#readwrite-available + * @fires SessionPool#session-available + * @fires @deprecated SessionPool#readonly-available + * @fires @deprecated SessionPool#readwrite-available * @param {Session} session The session to release. */ release(session: Session): void { @@ -644,37 +607,14 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface { } session.lastError = undefined; - // Release it into the pool as a read-only session in the following cases: - // 1. There are more waiters than there are sessions available. Releasing it - // into the pool will ensure that a waiter will be unblocked as soon as - // possible. - // 2. The user has not set a write fraction, but this session has been used - // as a read/write session. This is an indication that the application - // needs read/write sessions, and the pool should try to keep that number - // of read/write sessions dynamically. - // 3. The user has set a write fraction and that fraction has been reached. - const shouldBeWrite = - (session.type === types.ReadWrite && this.options.writes === 0) || - (this.options.writes! > 0 && - this.currentWriteFraction < this.options.writes!); - if (this.totalWaiters > this.available || !shouldBeWrite) { - session.type = types.ReadOnly; - this._release(session); - return; - } - // Delete the trace associated with this session to mark the session as checked // back into the pool. This will prevent the session to be marked as leaked if // the pool is closed while the session is being prepared. this._traces.delete(session.id); - this._pendingPrepare++; - session.type = types.ReadWrite; - this._prepareTransaction(session) - .catch(() => (session.type = types.ReadOnly)) - .then(() => { - this._pendingPrepare--; - this._release(session); - }); + // Release it into the pool as a session if there are more waiters than + // there are sessions available. Releasing it will unblock a waiter as soon + // as possible. + this._release(session); } /** @@ -682,10 +622,9 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface { * * @private * - * @param {string} type The desired type to borrow. * @returns {Promise} */ - async _acquire(type: types): Promise { + async _acquire(): Promise { if (!this.isOpen) { throw new GoogleError(errors.Closed); } @@ -704,7 +643,7 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface { throw new GoogleError(errors.Timeout); } - const session = await this._getSession(type, startTime); + const session = await this._getSession(startTime); if (this._isValidSession(session)) { return session; @@ -715,26 +654,7 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface { }; const session = await this._acquires.add(getSession); - - if (type === types.ReadWrite && session.type === types.ReadOnly) { - this._numInProcessPrepare++; - try { - await this._prepareTransaction(session); - } catch (e) { - if (isSessionNotFoundError(e as ServiceError)) { - this._inventory.borrowed.delete(session); - } else { - this._release(session); - } - throw e; - } - } - // Mark the session as the type that was requested. This ensures that the - // fraction of read/write sessions in the pool is kept aligned with the - // actual need if the user has not specified a write fraction in the session - // pool options. - session.type = type; - + this._prepareTransaction(session); this._traces.set(session.id, frames); return session; } @@ -747,24 +667,21 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface { * @param {Session} session The session object. */ _borrow(session: Session): void { - const type = session.type!; - const index = this._inventory[type].indexOf(session); + const index = this._inventory.sessions.indexOf(session); this._inventory.borrowed.add(session); - this._inventory[type].splice(index, 1); + this._inventory.sessions.splice(index, 1); } /** - * Borrows the first session from specific group. This method may only be called if the inventory - * actually contains a session of the desired type. + * Borrows the first session from the inventory. * * @private * - * @param {string} type The desired session type. * @return {Session} */ - _borrowFrom(type: types): Session { - const session = this._inventory[type].pop()!; + _borrowFrom(): Session { + const session = this._inventory.sessions.pop()!; this._inventory.borrowed.add(session); return session; } @@ -774,82 +691,59 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface { * * @private * - * @param {string} type The desired session type. * @returns {Promise} */ - _borrowNextAvailableSession(type: types): Session { - const hasReads = !!this._inventory[types.ReadOnly].length; - - if (type === types.ReadOnly && hasReads) { - return this._borrowFrom(types.ReadOnly); - } - - const hasWrites = !!this._inventory[types.ReadWrite].length; - - if (hasWrites) { - return this._borrowFrom(types.ReadWrite); - } - - return this._borrowFrom(types.ReadOnly); + _borrowNextAvailableSession(): Session { + return this._borrowFrom(); } /** - * Attempts to create a single session of a certain type. + * Attempts to create a single session. * * @private * - * @param {string} type The desired type to create. * @returns {Promise} */ - _createSession(type: types): Promise { - const kind = type === types.ReadOnly ? 'reads' : 'writes'; - const options = {[kind]: 1}; - - return this._createSessions(options); + _createSession(): Promise { + return this._createSessions(1); } /** - * Batch creates sessions and prepares any necessary transactions. + * Batch creates sessions. * * @private * - * @param {object} [options] Config specifying how many sessions to create. + * @param {number} [amount] Config specifying how many sessions to create. * @returns {Promise} * @emits SessionPool#createError */ - async _createSessions({ - reads = 0, - writes = 0, - }: CreateSessionsOptions): Promise { + async _createSessions(amount: number): Promise { const labels = this.options.labels!; - let needed = reads + writes; - if (needed <= 0) { + if (amount <= 0) { return; } - this._pending += needed; + this._pending += amount; // while we can request as many sessions be created as we want, the backend // will return at most 100 at a time, hence the need for a while loop. - while (needed > 0) { + while (amount > 0) { let sessions: Session[] | null = null; try { [sessions] = await this.database.batchCreateSessions({ - count: needed, + count: amount, labels, }); - needed -= sessions.length; + amount -= sessions.length; } catch (e) { - this._pending -= needed; + this._pending -= amount; this.emit('createError', e); throw e; } sessions.forEach((session: Session) => { - session.type = writes-- > 0 ? types.ReadWrite : types.ReadOnly; - setImmediate(() => { this._inventory.borrowed.add(session); this._pending -= 1; @@ -897,10 +791,9 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface { continue; } - const type = session.type!; - const index = this._inventory[type].indexOf(session); + const index = this._inventory.sessions.indexOf(session); - this._inventory[type].splice(index, 1); + this._inventory.sessions.splice(index, 1); this._destroy(session); } } @@ -917,7 +810,7 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface { } try { - await this._createSessions({reads: needed, writes: 0}); + await this._createSessions(needed); } catch (e) { this.emit('error', e); } @@ -932,10 +825,7 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface { */ _getIdleSessions(): Session[] { const idlesAfter = this.options.idlesAfter! * 60000; - const sessions: Session[] = [ - ...this._inventory[types.ReadOnly], - ...this._inventory[types.ReadWrite], - ]; + const sessions: Session[] = this._inventory.sessions; return sessions.filter(session => { return Date.now() - session.lastUsed! >= idlesAfter; @@ -952,47 +842,25 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface { } /** - * Returns true if the pool has a session that is usable for the specified - * type, i.e. if a read-only session is requested, it returns true if the - * pool has a read-only or a read/write session. If a read/write session is - * requested, the method only returns true if the pool has a read/write - * session available. - * @param type The type of session. + * Returns true if the pool has a usable session. * @private */ - _hasSessionUsableFor(type: types): boolean { - return ( - this._inventory[type].length > 0 || - this._inventory[types.ReadWrite].length > 0 - ); + _hasSessionUsableFor(): boolean { + return this._inventory.sessions.length > 0; } /** - * Attempts to get a session of a specific type. If the type is unavailable it - * may try to use a different type. + * Attempts to get a session. * * @private * - * @param {string} type The desired session type. * @param {number} startTime Timestamp to use when determining timeouts. * @returns {Promise} */ - async _getSession(type: types, startTime: number): Promise { - if (this._hasSessionUsableFor(type)) { - return this._borrowNextAvailableSession(type); + async _getSession(startTime: number): Promise { + if (this._hasSessionUsableFor()) { + return this._borrowNextAvailableSession(); } - // If a read/write session is requested and the pool has a read-only session - // available, we should return that session unless there is a session - // currently being prepared for read/write that is not already claimed by - // another requester. - if ( - type === types.ReadWrite && - this._hasSessionUsableFor(types.ReadOnly) && - this.numWriteWaiters >= this.pendingPrepare - ) { - return this._borrowNextAvailableSession(type); - } - if (this.isFull && this.options.fail!) { throw new SessionPoolExhaustedError(this._getLeaks()); } @@ -1000,8 +868,8 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface { let removeOnceCloseListener: Function; let removeListener: Function; - // Wait for the requested type of session to become available. - const availableEvent = type + '-available'; + // Wait for a session to become available. + const availableEvent = 'session-available'; const promises = [ new Promise((_, reject) => { const onceCloseListener = () => reject(new GoogleError(errors.Closed)); @@ -1044,24 +912,24 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface { // Only create a new session if there are more waiters than sessions already // being created. The current requester will be waiter number _numWaiters+1. if (!this.isFull && this.totalPending <= this.totalWaiters) { - let reads = this.options.incStep + let amount = this.options.incStep ? this.options.incStep : DEFAULTS.incStep!; // Create additional sessions if the configured minimum has not been reached. const min = this.options.min ? this.options.min : 0; - if (this.size + this.totalPending + reads < min) { - reads = min - this.size - this.totalPending; + if (this.size + this.totalPending + amount < min) { + amount = min - this.size - this.totalPending; } // Make sure we don't create more sessions than the pool should have. - if (reads + this.size > this.options.max!) { - reads = this.options.max! - this.size; + if (amount + this.size > this.options.max!) { + amount = this.options.max! - this.size; } - if (reads > 0) { - this._pending += reads; + if (amount > 0) { + this._pending += amount; promises.push( new Promise((_, reject) => { - this._pending -= reads; - this._createSessions({reads, writes: 0}).catch(reject); + this._pending -= amount; + this._createSessions(amount).catch(reject); }) ); } @@ -1080,17 +948,17 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface { ); try { - this._waiters[type]++; + this._waiters++; await Promise.race(promises); } finally { - this._waiters[type]--; + this._waiters--; removeOnceCloseListener!(); removeListener!(); removeErrorListener!(); removeTimeoutListener(); } - return this._borrowNextAvailableSession(type); + return this._borrowNextAvailableSession(); } /** @@ -1153,9 +1021,8 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface { * * @param {Session} session The session object. * @param {object} options The transaction options. - * @returns {Promise} */ - async _prepareTransaction(session: Session): Promise { + _prepareTransaction(session: Session): void { const transaction = session.transaction( (session.parent as Database).queryOptions_ ); @@ -1168,36 +1035,20 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface { * @private * * @fires SessionPool#available - * @fires SessionPool#readonly-available - * @fires SessionPool#readwrite-available + * @fires SessionPool#session-available + * @fires @deprecated SessionPool#readonly-available + * @fires @deprecated SessionPool#readwrite-available * @param {Session} session The session object. */ _release(session: Session): void { - const type = session.type!; - - this._inventory[type].push(session); + this._inventory.sessions.push(session); this._inventory.borrowed.delete(session); this._traces.delete(session.id); this.emit('available'); - // Determine the type of waiter to unblock. - let emitType: types; - if ( - type === types.ReadOnly && - !this.numReadWaiters && - this.numWriteWaiters - ) { - emitType = types.ReadWrite; - } else if ( - type === types.ReadWrite && - !this.numWriteWaiters && - this.numReadWaiters - ) { - emitType = types.ReadOnly; - } else { - emitType = type; - } - this.emit(emitType + '-available'); + this.emit('session-available'); + this.emit('readonly-available'); + this.emit('readwrite-available'); } /** diff --git a/src/session.ts b/src/session.ts index ad7cf3e9a..dd031fa6d 100644 --- a/src/session.ts +++ b/src/session.ts @@ -43,7 +43,7 @@ import IRequestOptions = google.spanner.v1.IRequestOptions; export type GetSessionResponse = [Session, r.Response]; /** - * enum to capture the possible session types + * @deprecated. enum to capture the possible session types */ export const enum types { ReadOnly = 'readonly', @@ -102,7 +102,6 @@ export type DeleteSessionCallback = NormalCallback; export class Session extends common.GrpcServiceObject { id!: string; formattedName_?: string; - type?: types; txn?: Transaction; lastUsed?: number; lastError?: grpc.ServiceError; diff --git a/test/database.ts b/test/database.ts index d7bf9cc55..68d0f30de 100644 --- a/test/database.ts +++ b/test/database.ts @@ -98,8 +98,7 @@ class FakeSessionPool extends EventEmitter { this.calledWith_ = arguments; } open() {} - getReadSession() {} - getWriteSession() {} + getSession() {} release() {} } @@ -527,7 +526,7 @@ describe('Database', () => { beforeEach(() => { database.pool_ = { - getReadSession(callback) { + getSession(callback) { callback(null, SESSION); }, }; @@ -537,7 +536,7 @@ describe('Database', () => { const error = new Error('err'); database.pool_ = { - getReadSession(callback) { + getSession(callback) { callback(error); }, }; @@ -1123,7 +1122,7 @@ describe('Database', () => { database.pool_ = POOL; - POOL.getReadSession = callback => { + POOL.getSession = callback => { callback(null, SESSION); }; @@ -1131,7 +1130,7 @@ describe('Database', () => { }); it('should get a session', done => { - POOL.getReadSession = () => { + POOL.getSession = () => { done(); }; @@ -1141,7 +1140,7 @@ describe('Database', () => { it('should return error if it cannot get a session', done => { const error = new Error('Error.'); - POOL.getReadSession = callback => { + POOL.getSession = callback => { callback(error); }; @@ -1220,7 +1219,7 @@ describe('Database', () => { return REQUEST_STREAM; }; - POOL.getReadSession = callback => { + POOL.getSession = callback => { callback(null, SESSION); }; @@ -1228,7 +1227,7 @@ describe('Database', () => { }); it('should get a session when stream opens', done => { - POOL.getReadSession = () => { + POOL.getSession = () => { done(); }; @@ -1239,7 +1238,7 @@ describe('Database', () => { const ERROR = new Error('Error.'); beforeEach(() => { - POOL.getReadSession = callback => { + POOL.getSession = callback => { callback(ERROR); }; }); @@ -1257,7 +1256,7 @@ describe('Database', () => { describe('session retrieved successfully', () => { beforeEach(() => { - POOL.getReadSession = callback => { + POOL.getSession = callback => { callback(null, SESSION); }; }); @@ -1334,7 +1333,7 @@ describe('Database', () => { cancel: util.noop, }; - POOL.getReadSession = callback => { + POOL.getSession = callback => { callback(null, SESSION); }; }); @@ -1468,7 +1467,7 @@ describe('Database', () => { let fakeStream: Transform; let fakeStream2: Transform; - let getReadSessionStub: sinon.SinonStub; + let getSessionStub: sinon.SinonStub; let snapshotStub: sinon.SinonStub; let runStreamStub: sinon.SinonStub; @@ -1481,9 +1480,7 @@ describe('Database', () => { fakeStream = through.obj(); fakeStream2 = through.obj(); - getReadSessionStub = ( - sandbox.stub(fakePool, 'getReadSession') as sinon.SinonStub - ) + getSessionStub = (sandbox.stub(fakePool, 'getSession') as sinon.SinonStub) .onFirstCall() .callsFake(callback => callback(null, fakeSession)) .onSecondCall() @@ -1502,19 +1499,17 @@ describe('Database', () => { sandbox.stub(fakeSnapshot2, 'runStream').returns(fakeStream2); }); - it('should get a read session via `getReadSession`', () => { - getReadSessionStub.callsFake(() => {}); + it('should get a read session via `getSession`', () => { + getSessionStub.callsFake(() => {}); database.runStream(QUERY); - assert.strictEqual(getReadSessionStub.callCount, 1); + assert.strictEqual(getSessionStub.callCount, 1); }); - it('should destroy the stream if `getReadSession` errors', done => { + it('should destroy the stream if `getSession` errors', done => { const fakeError = new Error('err'); - getReadSessionStub - .onFirstCall() - .callsFake(callback => callback(fakeError)); + getSessionStub.onFirstCall().callsFake(callback => callback(fakeError)); database.runStream(QUERY).on('error', err => { assert.strictEqual(err, fakeError); @@ -1791,7 +1786,7 @@ describe('Database', () => { let fakeSnapshot: FakeTransaction; let beginSnapshotStub: sinon.SinonStub; - let getReadSessionStub: sinon.SinonStub; + let getSessionStub: sinon.SinonStub; let snapshotStub: sinon.SinonStub; beforeEach(() => { @@ -1803,8 +1798,8 @@ describe('Database', () => { sandbox.stub(fakeSnapshot, 'begin') as sinon.SinonStub ).callsFake(callback => callback(null)); - getReadSessionStub = ( - sandbox.stub(fakePool, 'getReadSession') as sinon.SinonStub + getSessionStub = ( + sandbox.stub(fakePool, 'getSession') as sinon.SinonStub ).callsFake(callback => callback(null, fakeSession)); snapshotStub = sandbox @@ -1812,18 +1807,18 @@ describe('Database', () => { .returns(fakeSnapshot); }); - it('should call through to `SessionPool#getReadSession`', () => { - getReadSessionStub.callsFake(() => {}); + it('should call through to `SessionPool#getSession`', () => { + getSessionStub.callsFake(() => {}); database.getSnapshot(assert.ifError); - assert.strictEqual(getReadSessionStub.callCount, 1); + assert.strictEqual(getSessionStub.callCount, 1); }); it('should return any pool errors', done => { const fakeError = new Error('err'); - getReadSessionStub.callsFake(callback => callback(fakeError)); + getSessionStub.callsFake(callback => callback(fakeError)); database.getSnapshot(err => { assert.strictEqual(err, fakeError); @@ -1877,7 +1872,7 @@ describe('Database', () => { ); sandbox.stub(fakeSession2, 'snapshot').returns(fakeSnapshot2); - getReadSessionStub + getSessionStub .onFirstCall() .callsFake(callback => callback(null, fakeSession)) .onSecondCall() @@ -1929,32 +1924,32 @@ describe('Database', () => { let fakeSession: FakeSession; let fakeTransaction: FakeTransaction; - let getWriteSessionStub: sinon.SinonStub; + let getSessionStub: sinon.SinonStub; beforeEach(() => { fakePool = database.pool_; fakeSession = new FakeSession(); fakeTransaction = new FakeTransaction(); - getWriteSessionStub = ( - sandbox.stub(fakePool, 'getWriteSession') as sinon.SinonStub + getSessionStub = ( + sandbox.stub(fakePool, 'getSession') as sinon.SinonStub ).callsFake(callback => { callback(null, fakeSession, fakeTransaction); }); }); it('should get a read/write transaction', () => { - getWriteSessionStub.callsFake(() => {}); + getSessionStub.callsFake(() => {}); database.getTransaction(assert.ifError); - assert.strictEqual(getWriteSessionStub.callCount, 1); + assert.strictEqual(getSessionStub.callCount, 1); }); it('should return any pool errors', done => { const fakeError = new Error('err'); - getWriteSessionStub.callsFake(callback => callback(fakeError)); + getSessionStub.callsFake(callback => callback(fakeError)); database.getTransaction(err => { assert.strictEqual(err, fakeError); @@ -2291,7 +2286,7 @@ describe('Database', () => { let fakeSession: FakeSession; let fakePartitionedDml: FakeTransaction; - let getReadSessionStub; + let getSessionStub; let beginStub; let runUpdateStub; @@ -2300,8 +2295,8 @@ describe('Database', () => { fakeSession = new FakeSession(); fakePartitionedDml = new FakeTransaction(); - getReadSessionStub = ( - sandbox.stub(fakePool, 'getReadSession') as sinon.SinonStub + getSessionStub = ( + sandbox.stub(fakePool, 'getSession') as sinon.SinonStub ).callsFake(callback => { callback(null, fakeSession); }); @@ -2318,18 +2313,18 @@ describe('Database', () => { }); it('should get a read only session from the pool', () => { - getReadSessionStub.callsFake(() => {}); + getSessionStub.callsFake(() => {}); database.runPartitionedUpdate(QUERY, assert.ifError); - assert.strictEqual(getReadSessionStub.callCount, 1); + assert.strictEqual(getSessionStub.callCount, 1); }); it('should return any pool errors', () => { const fakeError = new Error('err'); const fakeCallback = sandbox.spy(); - getReadSessionStub.callsFake(callback => callback(fakeError)); + getSessionStub.callsFake(callback => callback(fakeError)); database.runPartitionedUpdate(QUERY, fakeCallback); const [err, rowCount] = fakeCallback.lastCall.args; @@ -2416,7 +2411,7 @@ describe('Database', () => { beforeEach(() => { pool = database.pool_; - (sandbox.stub(pool, 'getWriteSession') as sinon.SinonStub).callsFake( + (sandbox.stub(pool, 'getSession') as sinon.SinonStub).callsFake( callback => { callback(null, SESSION, TRANSACTION); } @@ -2426,7 +2421,7 @@ describe('Database', () => { it('should return any errors getting a session', done => { const fakeErr = new Error('err'); - (pool.getWriteSession as sinon.SinonStub).callsFake(callback => + (pool.getSession as sinon.SinonStub).callsFake(callback => callback(fakeErr) ); @@ -2500,7 +2495,7 @@ describe('Database', () => { beforeEach(() => { pool = database.pool_; - (sandbox.stub(pool, 'getWriteSession') as sinon.SinonStub).callsFake( + (sandbox.stub(pool, 'getSession') as sinon.SinonStub).callsFake( callback => { callback(null, SESSION, TRANSACTION); } diff --git a/test/session-pool.ts b/test/session-pool.ts index 12be4ef15..a350af13c 100644 --- a/test/session-pool.ts +++ b/test/session-pool.ts @@ -25,7 +25,7 @@ import stackTrace = require('stack-trace'); import timeSpan = require('time-span'); import {Database} from '../src/database'; -import {Session, types} from '../src/session'; +import {Session} from '../src/session'; import * as sp from '../src/session-pool'; import {Transaction} from '../src/transaction'; import {grpc} from 'google-gax'; @@ -63,10 +63,6 @@ describe('SessionPool', () => { const createSession = (name = 'id', props?): Session => { props = props || {}; - if (!props.type) { - props.type = types.ReadOnly; - } - return Object.assign(new Session(DATABASE, name), props, { create: sandbox.stub().resolves(), delete: sandbox.stub().resolves(), @@ -137,8 +133,7 @@ describe('SessionPool', () => { describe('available', () => { it('should return the number of available sessions', () => { - inventory[types.ReadOnly] = [createSession()]; - inventory[types.ReadWrite] = [createSession(), createSession()]; + inventory.sessions = [createSession(), createSession(), createSession()]; assert.strictEqual(sessionPool.available, 3); }); @@ -169,34 +164,9 @@ describe('SessionPool', () => { }); }); - describe('reads', () => { - beforeEach(() => { - sessionPool.options.min = 3; - - inventory[types.ReadOnly] = [ - createSession(), - createSession(), - createSession(), - ]; - }); - - it('should get the total number of read sessions', () => { - assert.strictEqual(sessionPool.reads, 3); - }); - - it('should factor in borrowed sessions', () => { - inventory.borrowed.add(createSession()); - - assert.strictEqual(sessionPool.reads, 4); - assert.strictEqual(sessionPool.available, 3); - assert.strictEqual(sessionPool.borrowed, 1); - }); - }); - describe('size', () => { it('should return the size of the pool', () => { - inventory[types.ReadOnly] = [createSession()]; - inventory[types.ReadWrite] = [createSession(), createSession()]; + inventory.sessions = [createSession(), createSession(), createSession()]; inventory.borrowed = new Set([createSession()]); assert.strictEqual(sessionPool.size, 4); @@ -205,23 +175,19 @@ describe('SessionPool', () => { describe('writes', () => { beforeEach(() => { - inventory[types.ReadWrite] = [ - createSession(), - createSession(), - createSession(), - ]; + inventory.sessions = [createSession(), createSession(), createSession()]; }); it('should get the total number of read/write sessions', () => { - assert.strictEqual(sessionPool.writes, 3); + assert.strictEqual(sessionPool.size, 3); }); it('should factor in borrowed sessions', () => { - const session = createSession('id', {type: types.ReadWrite}); + const session = createSession('id', {}); inventory.borrowed.add(session); - assert.strictEqual(sessionPool.writes, 4); + assert.strictEqual(sessionPool.size, 4); assert.strictEqual(sessionPool.available, 3); assert.strictEqual(sessionPool.borrowed, 1); }); @@ -243,7 +209,6 @@ describe('SessionPool', () => { assert.strictEqual(sessionPool.options.min, 25); assert.strictEqual(sessionPool.options.max, 100); assert.strictEqual(sessionPool.options.maxIdle, 1); - assert.strictEqual(sessionPool.options.writes, 0); }); it('should not override user options', () => { @@ -260,23 +225,6 @@ describe('SessionPool', () => { }, minGtMax); }); }); - - describe('writes', () => { - const writeErrReg = - /Write percentage should be represented as a float between 0\.0 and 1\.0\./; - - it('should throw when writes is less than 0', () => { - assert.throws(() => { - return new SessionPool(DATABASE, {writes: -1}); - }, writeErrReg); - }); - - it('should throw when writes is greater than 1', () => { - assert.throws(() => { - return new SessionPool(DATABASE, {writes: 50}); - }, writeErrReg); - }); - }); }); it('should set isOpen to false', () => { @@ -285,8 +233,7 @@ describe('SessionPool', () => { it('should create an inventory object', () => { assert.deepStrictEqual(inventory, { - readonly: [], - readwrite: [], + sessions: [], borrowed: new Set(), }); }); @@ -332,8 +279,7 @@ describe('SessionPool', () => { describe('close', () => { beforeEach(() => { - inventory[types.ReadOnly] = [createSession(), createSession()]; - inventory[types.ReadWrite] = [createSession()]; + inventory.sessions = [createSession(), createSession(), createSession()]; inventory.borrowed = new Set([createSession(), createSession()]); sessionPool._destroy = sandbox.stub().resolves(); }); @@ -363,11 +309,7 @@ describe('SessionPool', () => { }); it('should destroy all the sessions', done => { - const sessions = [ - ...inventory[types.ReadOnly], - ...inventory[types.ReadWrite], - ...inventory.borrowed, - ]; + const sessions = [...inventory.sessions, ...inventory.borrowed]; let destroyed = 0; @@ -409,16 +351,13 @@ describe('SessionPool', () => { }); }); - describe('getReadSession', () => { - it('should acquire a read session', done => { + describe('getSession', () => { + it('should acquire a session', done => { const fakeSession = createSession(); - sandbox - .stub(sessionPool, '_acquire') - .withArgs(types.ReadOnly) - .resolves(fakeSession); + sandbox.stub(sessionPool, '_acquire').resolves(fakeSession); - sessionPool.getReadSession((err, session) => { + sessionPool.getSession((err, session) => { assert.ifError(err); assert.strictEqual(session, fakeSession); done(); @@ -430,43 +369,27 @@ describe('SessionPool', () => { sandbox.stub(sessionPool, '_acquire').rejects(error); - sessionPool.getReadSession(err => { + sessionPool.getSession(err => { assert.strictEqual(err, error); done(); }); }); - }); - describe('getWriteSession', () => { it('should pass back the session and txn', done => { const fakeTxn = new FakeTransaction() as unknown as Transaction; const fakeSession = createSession(); fakeSession.txn = fakeTxn; - sandbox - .stub(sessionPool, '_acquire') - .withArgs(types.ReadWrite) - .resolves(fakeSession); + sandbox.stub(sessionPool, '_acquire').resolves(fakeSession); - sessionPool.getWriteSession((err, session, txn) => { + sessionPool.getSession((err, session, txn) => { assert.ifError(err); assert.strictEqual(session, fakeSession); assert.strictEqual(txn, fakeTxn); done(); }); }); - - it('should pass any errors to the callback', done => { - const error = new Error('err'); - - sandbox.stub(sessionPool, '_acquire').rejects(error); - - sessionPool.getWriteSession(err => { - assert.strictEqual(err, error); - done(); - }); - }); }); describe('open', () => { @@ -580,7 +503,7 @@ describe('SessionPool', () => { }); describe('release', () => { - let prepStub: sinon.SinonStub<[Session], Promise>; + let prepStub: sinon.SinonStub<[Session], void>; beforeEach(() => { prepStub = sandbox.stub(sessionPool, '_prepareTransaction').resolves(); @@ -623,36 +546,14 @@ describe('SessionPool', () => { assert(isAround(session.lastUsed, Date.now())); }); - describe('reads', () => { - it('should release readonly sessions', done => { - const fakeSession = createSession('id', {type: types.ReadOnly}); - - prepStub.rejects(); - sandbox - .stub(sessionPool, '_release') - .withArgs(fakeSession) - .callsFake(() => done()); - - inventory.borrowed.add(fakeSession); - sessionPool.release(fakeSession); - }); - }); - - describe('readwrite', () => { + describe('read and write', () => { let fakeSession; beforeEach(() => { - fakeSession = createSession('id', {type: types.ReadWrite}); + fakeSession = createSession('id'); inventory.borrowed.add(fakeSession); }); - it('should prep a new transaction', done => { - sandbox.stub(sessionPool, '_release'); - prepStub.withArgs(fakeSession).callsFake(async () => done()); - - sessionPool.release(fakeSession); - }); - it('should release the read/write session', done => { prepStub.resolves(); sandbox @@ -662,18 +563,6 @@ describe('SessionPool', () => { sessionPool.release(fakeSession); }); - - it('should convert to a read session if txn fails', done => { - const stub = sandbox.stub(sessionPool, '_release').callsFake(() => { - const session = stub.getCall(0).args[0]; - assert.strictEqual(session, fakeSession); - assert.strictEqual(session.type, types.ReadOnly); - done(); - }); - - prepStub.rejects(); - sessionPool.release(fakeSession); - }); }); }); @@ -687,7 +576,7 @@ describe('SessionPool', () => { sessionPool.isOpen = false; try { - await sessionPool._acquire(types.ReadOnly); + await sessionPool._acquire(); shouldNotBeCalled(); } catch (e) { assert.strictEqual( @@ -705,7 +594,7 @@ describe('SessionPool', () => { }; try { - await sessionPool._acquire(types.ReadOnly); + await sessionPool._acquire(); shouldNotBeCalled(); } catch (e) { assert.strictEqual( @@ -722,10 +611,9 @@ describe('SessionPool', () => { const stub = sandbox .stub(sessionPool, '_getSession') .resolves(fakeSession); - const session = await sessionPool._acquire(types.ReadOnly); - const [type, startTime] = stub.getCall(0).args; + const session = await sessionPool._acquire(); + const [startTime] = stub.getCall(0).args; - assert.strictEqual(type, types.ReadOnly); assert(isAround(startTime, now)); assert.strictEqual(session, fakeSession); }); @@ -743,7 +631,7 @@ describe('SessionPool', () => { stub.onFirstCall().resolves(badSession); stub.onSecondCall().resolves(goodSession); - const session = await sessionPool._acquire(types.ReadOnly); + const session = await sessionPool._acquire(); assert.strictEqual(session, goodSession); assert.strictEqual(sessionPool.size, 1); @@ -758,56 +646,36 @@ describe('SessionPool', () => { sandbox.stub(sessionPool, '_getSession').resolves(fakeSession); sandbox.stub(fakeStackTrace, 'get').returns(fakeTrace); - await sessionPool._acquire(types.ReadOnly); + await sessionPool._acquire(); const trace = sessionPool._traces.get(id); assert.strictEqual(trace, fakeTrace); }); it('should convert read sessions to write sessions', async () => { - const fakeSession = createSession('id', {id: types.ReadOnly}); + const fakeSession = createSession('id'); sandbox.stub(sessionPool, '_getSession').resolves(fakeSession); const prepStub = sandbox .stub(sessionPool, '_prepareTransaction') .withArgs(fakeSession); - const session = await sessionPool._acquire(types.ReadWrite); + const session = await sessionPool._acquire(); assert.strictEqual(session, fakeSession); assert.strictEqual(prepStub.callCount, 1); }); - - it('should propagate error from `_prepareTransaction`', done => { - const fakeSession = createSession('id', {id: types.ReadOnly}); - const fakeError = new Error('err'); - - sandbox.stub(sessionPool, '_getSession').resolves(fakeSession); - sandbox - .stub(sessionPool, '_prepareTransaction') - .withArgs(fakeSession) - .rejects(fakeError); - - sessionPool._acquire(types.ReadWrite).then( - () => {}, - err => { - assert.deepStrictEqual(err, fakeError); - done(); - } - ); - }); }); describe('_borrow', () => { it('should mark the session as borrowed', () => { const fakeSession = createSession(); - fakeSession.type = types.ReadOnly; - inventory[types.ReadOnly].push(fakeSession); + inventory.sessions.push(fakeSession); sessionPool._borrow(fakeSession); - assert.strictEqual(inventory[types.ReadOnly].indexOf(fakeSession), -1); + assert.strictEqual(inventory.sessions.indexOf(fakeSession), -1); assert(inventory.borrowed.has(fakeSession)); }); }); @@ -817,96 +685,45 @@ describe('SessionPool', () => { const fakeSession1 = createSession(); const fakeSession2 = createSession(); - inventory[types.ReadOnly].push(fakeSession1); - inventory[types.ReadOnly].push(fakeSession2); + inventory.sessions.push(fakeSession1); + inventory.sessions.push(fakeSession2); - let session = sessionPool._borrowFrom(types.ReadOnly); + let session = sessionPool._borrowFrom(); assert.strictEqual(session, fakeSession2); - session = sessionPool._borrowFrom(types.ReadOnly); + session = sessionPool._borrowFrom(); assert.strictEqual(session, fakeSession1); }); }); describe('_borrowNextAvailableSession', () => { - it('should borrow from readonly when available', () => { + it('should borrow when available', () => { const fakeSession = createSession(); - inventory[types.ReadOnly].push(fakeSession); - sandbox - .stub(sessionPool, '_borrowFrom') - .withArgs(types.ReadOnly) - .returns(fakeSession); + inventory.sessions.push(fakeSession); + sandbox.stub(sessionPool, '_borrowFrom').returns(fakeSession); - const session = sessionPool._borrowNextAvailableSession(types.ReadOnly); - - assert.strictEqual(session, fakeSession); - }); - - it('should borrow from readwrites when available', () => { - const fakeSession = createSession(); - - inventory[types.ReadWrite].push(fakeSession); - sandbox - .stub(sessionPool, '_borrowFrom') - .withArgs(types.ReadWrite) - .returns(fakeSession); - - const session = sessionPool._borrowNextAvailableSession(types.ReadWrite); - - assert.strictEqual(session, fakeSession); - }); - - it('should borrow from rw when readonly isnt available', () => { - const fakeSession = createSession(); - - inventory[types.ReadWrite].push(fakeSession); - sandbox - .stub(sessionPool, '_borrowFrom') - .withArgs(types.ReadWrite) - .returns(fakeSession); - - const session = sessionPool._borrowNextAvailableSession(types.ReadOnly); - - assert.strictEqual(session, fakeSession); - }); - - it('should borrow from readonly when rw isnt available', () => { - const fakeSession = createSession(); - - inventory[types.ReadOnly].push(fakeSession); - sandbox - .stub(sessionPool, '_borrowFrom') - .withArgs(types.ReadOnly) - .returns(fakeSession); - - const session = sessionPool._borrowNextAvailableSession(types.ReadWrite); + const session = sessionPool._borrowNextAvailableSession(); assert.strictEqual(session, fakeSession); }); }); describe('_createSession', () => { - let stub: sinon.SinonStub<[sp.CreateSessionsOptions], Promise>; + let stub: sinon.SinonStub<[number], Promise>; beforeEach(() => { stub = sandbox.stub(sessionPool, '_createSessions').resolves(); }); - it('should create a single read session', () => { - sessionPool._createSession(types.ReadOnly); - const [options] = stub.lastCall.args; - assert.deepStrictEqual(options, {reads: 1}); - }); - - it('should create a single write session', () => { - sessionPool._createSession(types.ReadWrite); - const [options] = stub.lastCall.args; - assert.deepStrictEqual(options, {writes: 1}); + it('should create a single session', () => { + sessionPool._createSession(); + const [numbers] = stub.lastCall.args; + assert.deepStrictEqual(numbers, 1); }); }); describe('_createSessions', () => { - const OPTIONS = {reads: 1, writes: 2}; + const OPTIONS = 3; const RESPONSE = [[{}, {}, {}]]; let stub; @@ -974,8 +791,7 @@ describe('SessionPool', () => { await sessionPool._createSessions(OPTIONS); setImmediate(() => { - assert.strictEqual(sessionPool.reads, OPTIONS.reads); - assert.strictEqual(sessionPool.writes, OPTIONS.writes); + assert.strictEqual(sessionPool.size, OPTIONS); }); }); }); @@ -1010,19 +826,16 @@ describe('SessionPool', () => { let fakeSessions; beforeEach(() => { - const reads = [createSession('id', {type: types.ReadOnly})]; - const writes = [ - createSession('id', {type: types.ReadWrite}), - createSession('id', {type: types.ReadWrite}), + inventory.sessions = [ + createSession('id'), + createSession('id'), + createSession('id'), ]; - inventory[types.ReadOnly] = reads; - inventory[types.ReadWrite] = writes; - sessionPool.options.maxIdle = 0; sessionPool.options.min = 0; - fakeSessions = [...reads, ...writes]; + fakeSessions = [...inventory.sessions]; sandbox .stub(sessionPool, '_getIdleSessions') @@ -1079,7 +892,7 @@ describe('SessionPool', () => { }); describe('_fill', () => { - let stub: sinon.SinonStub<[sp.CreateSessionsOptions], Promise>; + let stub: sinon.SinonStub<[number], Promise>; beforeEach(() => { stub = sandbox.stub(sessionPool, '_createSessions'); @@ -1089,24 +902,19 @@ describe('SessionPool', () => { it('should create the min number of required sessions', () => { sessionPool._fill(); - const {reads, writes} = stub.lastCall.args[0]; + const amount = stub.lastCall.args[0]; - assert.strictEqual(reads, 8); - assert.strictEqual(writes, 0); + assert.strictEqual(amount, 8); }); it('should respect the current size of the pool', () => { - sessionPool.options.writes = 0.5; - - inventory[types.ReadOnly] = [createSession()]; - inventory[types.ReadWrite] = [createSession(), createSession()]; + inventory.sessions = [createSession(), createSession(), createSession()]; sessionPool._fill(); - const {reads, writes} = stub.lastCall.args[0]; + const amount = stub.lastCall.args[0]; - assert.strictEqual(reads, 5); - assert.strictEqual(writes, 0); + assert.strictEqual(amount, 5); }); it('should noop when no sessions are needed', () => { @@ -1135,16 +943,13 @@ describe('SessionPool', () => { const idlesAfter = (sessionPool.options.idlesAfter = 1); // 1 minute const idleTimestamp = Date.now() - idlesAfter * 60000; - const fakeReads = (inventory[types.ReadOnly] = [ + const fake = (inventory.sessions = [ {lastUsed: Date.now()}, {lastUsed: idleTimestamp}, - ]); - - const fakeWrites = (inventory[types.ReadWrite] = [ {lastUsed: idleTimestamp}, ]); - const expectedSessions = [fakeReads[1], fakeWrites[0]]; + const expectedSessions = [fake[1], fake[2]]; const idleSessions = sessionPool._getIdleSessions(); assert.deepStrictEqual(idleSessions, expectedSessions); @@ -1187,94 +992,21 @@ describe('SessionPool', () => { it('should return a session if one is available', async () => { const fakeSession = createSession(); - inventory[types.ReadOnly] = [fakeSession]; + inventory.sessions = [fakeSession]; sandbox .stub(sessionPool, '_borrowNextAvailableSession') - .withArgs(types.ReadOnly) .returns(fakeSession); - const session = await sessionPool._getSession(types.ReadOnly, startTime); + const session = await sessionPool._getSession(startTime); assert.strictEqual(session, fakeSession); }); - it('should return a read-only session if both read-only and read/write sessions are available', async () => { - const fakeReadSession = createSession(); - const fakeWriteSession = createSession(); - - inventory[types.ReadOnly] = [fakeReadSession]; - inventory[types.ReadWrite] = [fakeWriteSession]; - - const session = await sessionPool._getSession(types.ReadOnly, startTime); - assert.strictEqual(session, fakeReadSession); - }); - - it('should return a read/write session if only read/write sessions are available', async () => { - const fakeWriteSession = createSession(); - - inventory[types.ReadOnly] = []; - inventory[types.ReadWrite] = [fakeWriteSession]; - - const session = await sessionPool._getSession(types.ReadOnly, startTime); - assert.strictEqual(session, fakeWriteSession); - }); - - it('should return a read/write session if both read-only and read/write sessions are available', async () => { - const fakeReadSession = createSession(); - const fakeWriteSession = createSession(); - - inventory[types.ReadOnly] = [fakeReadSession]; - inventory[types.ReadWrite] = [fakeWriteSession]; - - const session = await sessionPool._getSession(types.ReadWrite, startTime); - assert.strictEqual(session, fakeWriteSession); - }); - - it('should return a read-only session if only read-only sessions are available and there are no sessions currently being prepared', async () => { - const fakeReadSession = createSession(); - - inventory[types.ReadOnly] = [fakeReadSession]; - inventory[types.ReadWrite] = []; - sessionPool._waiters[types.ReadWrite] = 0; - sessionPool._pendingPrepare = 0; - - const session = await sessionPool._getSession(types.ReadWrite, startTime); - assert.strictEqual(session, fakeReadSession); - }); - - it('should return a read-write session if only read-only sessions are available and there are more sessions currently being prepared than there are read/write waiters', async () => { - const fakeReadSession = createSession(); - const fakeWriteSession = createSession(); - - inventory[types.ReadOnly] = [fakeReadSession]; - inventory[types.ReadWrite] = []; - sessionPool._waiters[types.ReadWrite] = 0; - sessionPool._pendingPrepare = 1; - - setTimeout(() => { - inventory[types.ReadWrite] = [fakeWriteSession]; - sessionPool.emit('readwrite-available'); - }, 100); - const session = await sessionPool._getSession(types.ReadWrite, startTime); - assert.strictEqual(session, fakeWriteSession); - }); - - it('should return a read-only session if only read-only sessions are available and all sessions currently being prepared are reserved for current read/write waiters', async () => { - const fakeReadSession = createSession(); - - inventory[types.ReadOnly] = [fakeReadSession]; - inventory[types.ReadWrite] = []; - sessionPool._waiters[types.ReadWrite] = 1; - sessionPool._pendingPrepare = 1; - - const session = await sessionPool._getSession(types.ReadWrite, startTime); - assert.strictEqual(session, fakeReadSession); - }); it('should return an error if empty and fail = true', async () => { sessionPool.options.fail = true; try { - await sessionPool._getSession(types.ReadOnly, startTime); + await sessionPool._getSession(startTime); shouldNotBeCalled(); } catch (e) { assert.strictEqual( @@ -1288,7 +1020,7 @@ describe('SessionPool', () => { setTimeout(() => sessionPool.emit('close'), 100); try { - await sessionPool._getSession(types.ReadOnly, startTime); + await sessionPool._getSession(startTime); shouldNotBeCalled(); } catch (e) { assert.strictEqual( @@ -1298,29 +1030,15 @@ describe('SessionPool', () => { } }); - it('should return a read-only session when it becomes available', async () => { - const fakeSession = createSession(); - - sandbox - .stub(sessionPool, '_borrowNextAvailableSession') - .withArgs(types.ReadOnly) - .returns(fakeSession); - setTimeout(() => sessionPool.emit('readonly-available'), 100); - - const session = await sessionPool._getSession(types.ReadOnly, startTime); - assert.strictEqual(session, fakeSession); - }); - - it('should return a read/write session when it becomes available', async () => { + it('should return a session when it becomes available', async () => { const fakeSession = createSession(); sandbox .stub(sessionPool, '_borrowNextAvailableSession') - .withArgs(types.ReadWrite) .returns(fakeSession); - setTimeout(() => sessionPool.emit('readwrite-available'), 100); + setTimeout(() => sessionPool.emit('session-available'), 100); - const session = await sessionPool._getSession(types.ReadWrite, startTime); + const session = await sessionPool._getSession(startTime); assert.strictEqual(session, fakeSession); }); @@ -1329,7 +1047,7 @@ describe('SessionPool', () => { const timeout = (sessionPool.options.acquireTimeout = 100); try { - await sessionPool._getSession(types.ReadOnly, startTime); + await sessionPool._getSession(startTime); shouldNotBeCalled(); } catch (e) { assert(isAround(timeout, end())); @@ -1344,10 +1062,10 @@ describe('SessionPool', () => { const fakeSession = createSession(); const stub = sandbox .stub(sessionPool, '_createSessions') - .withArgs({reads: 1, writes: 0}) + .withArgs(1) .callsFake(() => { // this will fire off via _createSessions - setImmediate(() => sessionPool.emit('readonly-available')); + setImmediate(() => sessionPool.emit('session-available')); return Promise.resolve(); }); @@ -1357,7 +1075,7 @@ describe('SessionPool', () => { .stub(sessionPool, '_borrowNextAvailableSession') .returns(fakeSession); - const session = await sessionPool._getSession(types.ReadOnly, startTime); + const session = await sessionPool._getSession(startTime); assert.strictEqual(session, fakeSession); assert.strictEqual(stub.callCount, 1); @@ -1367,10 +1085,10 @@ describe('SessionPool', () => { const fakeSession = createSession(); const stub = sandbox .stub(sessionPool, '_createSessions') - .withArgs({reads: 20, writes: 0}) + .withArgs(20) .callsFake(() => { // this will fire off via _createSessions - setImmediate(() => sessionPool.emit('readonly-available')); + setImmediate(() => sessionPool.emit('session-available')); return Promise.resolve(); }); @@ -1381,7 +1099,7 @@ describe('SessionPool', () => { .stub(sessionPool, '_borrowNextAvailableSession') .returns(fakeSession); - const session = await sessionPool._getSession(types.ReadOnly, startTime); + const session = await sessionPool._getSession(startTime); assert.strictEqual(session, fakeSession); assert.strictEqual(stub.callCount, 1); @@ -1392,19 +1110,15 @@ describe('SessionPool', () => { sessionPool.options.max = 2; sessionPool._pending = 1; - const stub = sandbox - .stub(sessionPool, '_createSession') - .withArgs(types.ReadOnly) - .callsFake(() => { - return Promise.reject(new Error('should not be called')); - }); + const stub = sandbox.stub(sessionPool, '_createSession').callsFake(() => { + return Promise.reject(new Error('should not be called')); + }); sandbox .stub(sessionPool, '_borrowNextAvailableSession') - .withArgs(types.ReadOnly) .returns(fakeSession); - setTimeout(() => sessionPool.emit('readonly-available'), 100); + setTimeout(() => sessionPool.emit('session-available'), 100); - const session = await sessionPool._getSession(types.ReadOnly, startTime); + const session = await sessionPool._getSession(startTime); assert.strictEqual(session, fakeSession); assert.strictEqual(stub.callCount, 0); }); @@ -1416,7 +1130,7 @@ describe('SessionPool', () => { sandbox.stub(sessionPool, '_createSessions').rejects(error); try { - await sessionPool._getSession(types.ReadOnly, startTime); + await sessionPool._getSession(startTime); shouldNotBeCalled(); } catch (e) { assert.strictEqual(e, error); @@ -1426,15 +1140,15 @@ describe('SessionPool', () => { it('should remove the available listener on error', async () => { sessionPool.options.acquireTimeout = 100; - const promise = sessionPool._getSession(types.ReadOnly, startTime); + const promise = sessionPool._getSession(startTime); - assert.strictEqual(sessionPool.listenerCount('readonly-available'), 1); + assert.strictEqual(sessionPool.listenerCount('session-available'), 1); try { await promise; shouldNotBeCalled(); } catch (e) { - assert.strictEqual(sessionPool.listenerCount('readonly-available'), 0); + assert.strictEqual(sessionPool.listenerCount('available'), 0); } }); }); @@ -1552,33 +1266,24 @@ describe('SessionPool', () => { describe('_release', () => { it('should release the session', () => { - const fakeSession = createSession('id', {type: types.ReadOnly}); + const fakeSession = createSession('id'); inventory.borrowed.add(fakeSession); sessionPool._release(fakeSession); assert.strictEqual(inventory.borrowed.has(fakeSession), false); - assert.strictEqual(inventory[types.ReadOnly].indexOf(fakeSession), 0); + assert.strictEqual(inventory.sessions.indexOf(fakeSession), 0); }); it('should delete any stack traces', () => { const id = 'abc'; - const fakeSession = createSession(id, {type: types.ReadOnly}); + const fakeSession = createSession(id); sessionPool._traces.set(id, []); sessionPool._release(fakeSession); assert.strictEqual(sessionPool._traces.has(id), false); }); - - it('should emit the readonly-available event for a read-only session when there are both read-only and read/write waiters', done => { - const fakeSession = createSession('id', {type: types.ReadOnly}); - sessionPool._waiters[types.ReadOnly] = 1; - sessionPool._waiters[types.ReadWrite] = 1; - - sessionPool.on('readonly-available', done); - sessionPool._release(fakeSession); - }); }); describe('_startHouseKeeping', () => { diff --git a/test/spanner.ts b/test/spanner.ts index 76cba2213..181cb1968 100644 --- a/test/spanner.ts +++ b/test/spanner.ts @@ -37,7 +37,6 @@ import {TEST_INSTANCE_NAME} from './mockserver/mockinstanceadmin'; import * as mockDatabaseAdmin from './mockserver/mockdatabaseadmin'; import * as sinon from 'sinon'; import {google} from '../protos/protos'; -import {types} from '../src/session'; import {ExecuteSqlRequest, RunResponse} from '../src/transaction'; import {Row} from '../src/partial-result-stream'; import {GetDatabaseOperationsOptions} from '../src/instance'; @@ -177,7 +176,7 @@ describe('Spanner with mock server', () => { it('should return different database instances when the same database is requested twice with different session pool options', async () => { const dbWithDefaultOptions = newTestDatabase(); const dbWithWriteSessions = instance.database(dbWithDefaultOptions.id!, { - writes: 1.0, + fail: false, }); assert.notStrictEqual(dbWithDefaultOptions, dbWithWriteSessions); }); @@ -1987,12 +1986,12 @@ describe('Spanner with mock server', () => { }); }); - it('should retry "Session not found" errors for a query on a write-session on Database.runTransaction()', done => { - const db = newTestDatabase({min: 1, incStep: 1, writes: 1.0}); + it('should retry "Session not found" errors for a query on a session on Database.runTransaction()', done => { + const db = newTestDatabase({min: 1, incStep: 1}); const pool = db.pool_ as SessionPool; // Wait until one session with a transaction has been created. pool.once('available', () => { - assert.strictEqual(pool.writes, 1); + assert.strictEqual(pool.size, 1); spannerMock.setExecutionTime( spannerMock.executeStreamingSql, SimulatedExecutionTime.ofError({ @@ -2025,12 +2024,12 @@ describe('Spanner with mock server', () => { }); } - it('should retry "Session not found" errors for Commit on a write-session on Database.runTransaction()', done => { - const db = newTestDatabase({min: 1, incStep: 1, writes: 1.0}); + it('should retry "Session not found" errors for Commit on a session on Database.runTransaction()', done => { + const db = newTestDatabase({min: 1, incStep: 1}); const pool = db.pool_ as SessionPool; // Wait until one session with a transaction has been created. pool.once('available', () => { - assert.strictEqual(pool.writes, 1); + assert.strictEqual(pool.size, 1); spannerMock.setExecutionTime( spannerMock.commit, SimulatedExecutionTime.ofError({ @@ -2078,12 +2077,12 @@ describe('Spanner with mock server', () => { .catch(done); }); - it('should retry "Session not found" errors for runUpdate on a write-session on Database.runTransaction()', done => { - const db = newTestDatabase({min: 1, incStep: 1, writes: 1.0}); + it('should retry "Session not found" errors for runUpdate on a session on Database.runTransaction()', done => { + const db = newTestDatabase({min: 1, incStep: 1}); const pool = db.pool_ as SessionPool; // Wait until one session with a transaction has been created. pool.once('available', () => { - assert.strictEqual(pool.writes, 1); + assert.strictEqual(pool.size, 1); spannerMock.setExecutionTime( spannerMock.executeStreamingSql, SimulatedExecutionTime.ofError({ @@ -2109,12 +2108,12 @@ describe('Spanner with mock server', () => { }); }); - it('should retry "Session not found" errors for executeBatchDml on a write-session on Database.runTransaction()', done => { - const db = newTestDatabase({min: 1, incStep: 1, writes: 1.0}); + it('should retry "Session not found" errors for executeBatchDml on a session on Database.runTransaction()', done => { + const db = newTestDatabase({min: 1, incStep: 1}); const pool = db.pool_ as SessionPool; // Wait until one session with a transaction has been created. pool.once('available', () => { - assert.strictEqual(pool.writes, 1); + assert.strictEqual(pool.size, 1); spannerMock.setExecutionTime( spannerMock.executeBatchDml, SimulatedExecutionTime.ofError({ @@ -2143,12 +2142,12 @@ describe('Spanner with mock server', () => { }); }); - it('should retry "Session not found" errors for a query on a write-session on Database.runTransactionAsync()', done => { - const db = newTestDatabase({min: 1, incStep: 1, writes: 1.0}); + it('should retry "Session not found" errors for a query on a session on Database.runTransactionAsync()', done => { + const db = newTestDatabase({min: 1, incStep: 1}); const pool = db.pool_ as SessionPool; // Wait until one session with a transaction has been created. pool.once('available', () => { - assert.strictEqual(pool.writes, 1); + assert.strictEqual(pool.size, 1); spannerMock.setExecutionTime( spannerMock.executeStreamingSql, SimulatedExecutionTime.ofError({ @@ -2180,12 +2179,12 @@ describe('Spanner with mock server', () => { } } - it('should retry "Session not found" errors for Commit on a write-session on Database.runTransactionAsync()', done => { - const db = newTestDatabase({min: 1, incStep: 1, writes: 1.0}); + it('should retry "Session not found" errors for Commit on a session on Database.runTransactionAsync()', done => { + const db = newTestDatabase({min: 1, incStep: 1}); const pool = db.pool_ as SessionPool; // Wait until one session with a transaction has been created. pool.once('available', async () => { - assert.strictEqual(pool.writes, 1); + assert.strictEqual(pool.size, 1); spannerMock.setExecutionTime( spannerMock.commit, SimulatedExecutionTime.ofError({ @@ -2211,12 +2210,12 @@ describe('Spanner with mock server', () => { }); }); - it('should retry "Session not found" errors for runUpdate on a write-session on Database.runTransactionAsync()', done => { - const db = newTestDatabase({min: 1, incStep: 1, writes: 1.0}); + it('should retry "Session not found" errors for runUpdate on a session on Database.runTransactionAsync()', done => { + const db = newTestDatabase({min: 1, incStep: 1}); const pool = db.pool_ as SessionPool; // Wait until one session with a transaction has been created. pool.once('available', async () => { - assert.strictEqual(pool.writes, 1); + assert.strictEqual(pool.size, 1); spannerMock.setExecutionTime( spannerMock.executeStreamingSql, SimulatedExecutionTime.ofError({ @@ -2243,12 +2242,12 @@ describe('Spanner with mock server', () => { }); }); - it('should retry "Session not found" errors for executeBatchDml on a write-session on Database.runTransactionAsync()', done => { - const db = newTestDatabase({min: 1, incStep: 1, writes: 1.0}); + it('should retry "Session not found" errors for executeBatchDml on a session on Database.runTransactionAsync()', done => { + const db = newTestDatabase({min: 1, incStep: 1}); const pool = db.pool_ as SessionPool; // Wait until one session with a transaction has been created. pool.once('available', async () => { - assert.strictEqual(pool.writes, 1); + assert.strictEqual(pool.size, 1); spannerMock.setExecutionTime( spannerMock.executeBatchDml, SimulatedExecutionTime.ofError({ @@ -2339,7 +2338,6 @@ describe('Spanner with mock server', () => { max: 10, incStep: 1, concurrency: 5, - writes: 0.1, fail: true, }); try { @@ -2362,9 +2360,9 @@ describe('Spanner with mock server', () => { rows.forEach(() => {}); assert.strictEqual(pool.size, 1); if (i > 0) { - assert.strictEqual(pool._inventory[types.ReadOnly][0].id, sessionId); + assert.strictEqual(pool._inventory.sessions[0].id, sessionId); } - sessionId = pool._inventory[types.ReadOnly][0].id; + sessionId = pool._inventory.sessions[0].id; } } @@ -2485,7 +2483,6 @@ describe('Spanner with mock server', () => { max: 10, incStep: 1, concurrency: 5, - writes: 0.1, fail: true, }); try { @@ -2519,8 +2516,6 @@ describe('Spanner with mock server', () => { const r = database.run(selectSql); await Promise.all([w, r]); assert.strictEqual(pool.size, 1); - assert.strictEqual(pool.writes, 0); - assert.strictEqual(pool.reads, 1); } finally { await database.close(); } @@ -2552,76 +2547,32 @@ describe('Spanner with mock server', () => { } }); - it('should not create unnecessary write-prepared sessions', async () => { - const query = { - sql: selectSql, - }; - const update = { - sql: insertSql, - }; - const database = newTestDatabase({ - writes: 0.2, - min: 100, - }); - const pool = database.pool_ as SessionPool; - try { - // First execute three consecutive read/write transactions. - const promises: Array> = []; - for (let i = 0; i < 3; i++) { - promises.push(executeSimpleUpdate(database, update)); - const ms = Math.floor(Math.random() * 5) + 1; - await sleep(ms); - } - let maxWriteSessions = 0; - for (let i = 0; i < 500; i++) { - if (Math.random() < 0.8) { - promises.push(database.run(query)); - } else { - promises.push(executeSimpleUpdate(database, update)); - } - maxWriteSessions = Math.max(maxWriteSessions, pool.writes); - const ms = Math.floor(Math.random() * 5) + 1; - await sleep(ms); - } - await Promise.all(promises); - } finally { - await database.close(); - } - }); - it('should pre-fill session pool', async () => { const database = newTestDatabase({ - writes: 0.2, min: 100, max: 200, }); const pool = database.pool_ as SessionPool; - const expectedWrites = pool.options.min! * pool.options.writes!; - const expectedReads = pool.options.min! - expectedWrites; - assert.strictEqual(pool.size, expectedReads + expectedWrites); + const expectedAmount = pool.options.min!; + assert.strictEqual(pool.size, expectedAmount); // Wait until all sessions have been created and prepared. const started = new Date().getTime(); while ( - (pool._inventory[types.ReadOnly].length < expectedReads || - pool._inventory[types.ReadWrite].length < expectedWrites) && + pool._inventory.sessions.length < expectedAmount && new Date().getTime() - started < 1000 ) { await sleep(1); } - assert.strictEqual(pool.reads, expectedReads); - assert.strictEqual(pool.writes, expectedWrites); await database.close(); }); it('should use pre-filled session pool', async () => { const database = newTestDatabase({ - writes: 0.2, min: 100, max: 200, }); const pool = database.pool_ as SessionPool; - const expectedWrites = pool.options.min! * pool.options.writes!; - const expectedReads = pool.options.min! - expectedWrites; + const expectedAmount = pool.options.min!; // Start executing a query. This query should use one of the sessions that // has been pre-filled into the pool. const [rows] = await database.run(selectSql); @@ -2629,18 +2580,13 @@ describe('Spanner with mock server', () => { // Wait until all sessions have been created and prepared. const started = new Date().getTime(); while ( - (pool._inventory[types.ReadOnly].length < expectedReads || - pool._inventory[types.ReadWrite].length < expectedWrites) && + pool._inventory.sessions.length < expectedAmount && new Date().getTime() - started < 1000 ) { await sleep(1); } - assert.strictEqual(pool.size, expectedReads + expectedWrites); - assert.strictEqual( - pool._inventory[types.ReadWrite].length, - expectedWrites - ); - assert.strictEqual(pool._inventory[types.ReadOnly].length, expectedReads); + assert.strictEqual(pool.size, expectedAmount); + assert.strictEqual(pool._inventory.sessions.length, expectedAmount); await database.close(); }); @@ -2752,40 +2698,6 @@ describe('Spanner with mock server', () => { await database.close(); }); - it('should use pre-filled write sessions', async () => { - const database = newTestDatabase({ - writes: 0.2, - min: 100, - max: 200, - }); - const pool = database.pool_ as SessionPool; - const expectedWrites = pool.options.min! * pool.options.writes!; - const expectedReads = pool.options.min! - expectedWrites; - // Execute an update. - const [count] = await database.runTransactionAsync( - (transaction): Promise<[number]> => { - transaction.begin(); - return transaction.runUpdate(insertSql).then(updateCount => { - transaction.commit(); - return updateCount; - }); - } - ); - assert.strictEqual(count, 1); - // Wait until all sessions have been created and prepared. - const started = new Date().getTime(); - while ( - (pool._pending > 0 || pool._pendingPrepare > 0) && - new Date().getTime() - started < 1000 - ) { - await sleep(1); - } - assert.strictEqual(pool.reads, expectedReads); - assert.strictEqual(pool.writes, expectedWrites); - assert.strictEqual(pool.size, expectedReads + expectedWrites); - await database.close(); - }); - it('should respect options.incStep', async () => { const database = newTestDatabase({ min: 100,